renamed heartbeatlatch to mutexlatch

This commit is contained in:
Janis 2025-06-26 17:26:55 +02:00
parent c3eb71dbb1
commit eb8fd314f5
4 changed files with 37 additions and 60 deletions

View file

@ -14,18 +14,18 @@ use parking_lot::{Condvar, Mutex};
use crate::{ use crate::{
job::{HeapJob, Job, StackJob}, job::{HeapJob, Job, StackJob},
latch::{AsCoreLatch, HeartbeatLatch, LatchRef, UnsafeWakeLatch}, latch::{AsCoreLatch, MutexLatch, LatchRef, UnsafeWakeLatch},
workerthread::{HeartbeatThread, WorkerThread}, workerthread::{HeartbeatThread, WorkerThread},
}; };
pub struct Heartbeat { pub struct Heartbeat {
pub latch: HeartbeatLatch, pub latch: MutexLatch,
} }
impl Heartbeat { impl Heartbeat {
pub fn new() -> NonNull<CachePadded<Self>> { pub fn new() -> NonNull<CachePadded<Self>> {
let ptr = Box::new(CachePadded::new(Self { let ptr = Box::new(CachePadded::new(Self {
latch: HeartbeatLatch::new(), latch: MutexLatch::new(),
})); }));
Box::into_non_null(ptr) Box::into_non_null(ptr)
@ -225,10 +225,10 @@ impl Context {
F: FnOnce(&WorkerThread) -> T + Send, F: FnOnce(&WorkerThread) -> T + Send,
T: Send, T: Send,
{ {
use crate::latch::HeartbeatLatch; use crate::latch::MutexLatch;
// current thread isn't a worker thread, create job and inject into global context // current thread isn't a worker thread, create job and inject into global context
let latch = HeartbeatLatch::new(); let latch = MutexLatch::new();
let job = StackJob::new( let job = StackJob::new(
move || { move || {

View file

@ -254,14 +254,14 @@ impl<L: Latch + AsCoreLatch> AsCoreLatch for CountLatch<L> {
} }
} }
pub struct HeartbeatLatch { pub struct MutexLatch {
inner: UnsafeCell<AtomicLatch>, inner: AtomicLatch,
lock: Mutex<()>, lock: Mutex<()>,
condvar: Condvar, condvar: Condvar,
} }
unsafe impl Send for HeartbeatLatch {} unsafe impl Send for MutexLatch {}
unsafe impl Sync for HeartbeatLatch {} unsafe impl Sync for MutexLatch {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum WakeResult { pub(crate) enum WakeResult {
@ -270,11 +270,11 @@ pub(crate) enum WakeResult {
Set, Set,
} }
impl HeartbeatLatch { impl MutexLatch {
#[inline] #[inline]
pub const fn new() -> Self { pub const fn new() -> Self {
Self { Self {
inner: UnsafeCell::new(AtomicLatch::new()), inner: AtomicLatch::new(),
lock: Mutex::new(()), lock: Mutex::new(()),
condvar: Condvar::new(), condvar: Condvar::new(),
} }
@ -284,20 +284,19 @@ impl HeartbeatLatch {
pub fn reset(&self) { pub fn reset(&self) {
let _guard = self.lock.lock(); let _guard = self.lock.lock();
// SAFETY: inner is atomic, so we can safely access it. // SAFETY: inner is atomic, so we can safely access it.
unsafe { self.inner.as_mut_unchecked().unset() }; self.inner.reset();
} }
pub fn wait_and_reset(&self) -> WakeResult { pub fn wait_and_reset(&self) -> WakeResult {
// SAFETY: inner is locked by the mutex, so we can safely access it. // SAFETY: inner is locked by the mutex, so we can safely access it.
let value = unsafe { let value = {
let mut guard = self.lock.lock(); let mut guard = self.lock.lock();
let inner = self.inner.as_ref_unchecked(); self.inner.set_sleeping();
inner.set_sleeping(); while self.inner.get() & !AtomicLatch::SLEEPING == AtomicLatch::UNSET {
while inner.get() & !AtomicLatch::SLEEPING == AtomicLatch::UNSET {
self.condvar.wait(&mut guard); self.condvar.wait(&mut guard);
} }
inner.reset() self.inner.reset()
}; };
if value & AtomicLatch::SET == AtomicLatch::SET { if value & AtomicLatch::SET == AtomicLatch::SET {
@ -319,42 +318,34 @@ impl HeartbeatLatch {
pub fn signal_heartbeat(&self) { pub fn signal_heartbeat(&self) {
let mut _guard = self.lock.lock(); let mut _guard = self.lock.lock();
// SAFETY: inner is locked by the mutex, so we can safely access it. self.inner.set_heartbeat();
unsafe {
let inner = self.inner.as_ref_unchecked();
inner.set_heartbeat();
// If the latch was sleeping, notify the waiting thread. // If the latch was sleeping, notify the waiting thread.
if inner.is_sleeping() { if self.inner.is_sleeping() {
self.condvar.notify_all(); self.condvar.notify_all();
}
} }
} }
pub fn signal_job_shared(&self) { pub fn signal_job_shared(&self) {
let mut _guard = self.lock.lock(); let mut _guard = self.lock.lock();
// SAFETY: inner is locked by the mutex, so we can safely access it. self.inner.set_wakeup();
unsafe { if self.inner.is_sleeping() {
self.inner.as_ref_unchecked().set_wakeup(); self.condvar.notify_all();
if self.inner.as_ref_unchecked().is_sleeping() {
self.condvar.notify_all();
}
} }
} }
pub fn signal_job_finished(&self) { pub fn signal_job_finished(&self) {
let mut _guard = self.lock.lock(); let mut _guard = self.lock.lock();
// SAFETY: inner is locked by the mutex, so we can safely access it.
unsafe { unsafe {
CoreLatch::set(self.inner.get()); CoreLatch::set(&self.inner);
if self.inner.as_ref_unchecked().is_sleeping() { if self.inner.is_sleeping() {
self.condvar.notify_all(); self.condvar.notify_all();
} }
} }
} }
} }
impl Latch for HeartbeatLatch { impl Latch for MutexLatch {
#[inline] #[inline]
unsafe fn set_raw(this: *const Self) { unsafe fn set_raw(this: *const Self) {
// SAFETY: `this` is valid until the guard is dropped. // SAFETY: `this` is valid until the guard is dropped.
@ -362,27 +353,27 @@ impl Latch for HeartbeatLatch {
let this = &*this; let this = &*this;
let _guard = this.lock.lock(); let _guard = this.lock.lock();
Latch::set_raw(this.inner.get() as *const AtomicLatch); Latch::set_raw(this.inner.get() as *const AtomicLatch);
if this.inner.as_ref_unchecked().is_sleeping() { if this.inner.is_sleeping() {
this.condvar.notify_all(); this.condvar.notify_all();
} }
} }
} }
} }
impl Probe for HeartbeatLatch { impl Probe for MutexLatch {
#[inline] #[inline]
fn probe(&self) -> bool { fn probe(&self) -> bool {
let _guard = self.lock.lock(); let _guard = self.lock.lock();
// SAFETY: inner is atomic, so we can safely access it. // SAFETY: inner is atomic, so we can safely access it.
unsafe { self.inner.as_ref_unchecked().probe() } self.inner.probe()
} }
} }
impl AsCoreLatch for HeartbeatLatch { impl AsCoreLatch for MutexLatch {
#[inline] #[inline]
fn as_core_latch(&self) -> &CoreLatch { fn as_core_latch(&self) -> &CoreLatch {
// SAFETY: inner is atomic, so we can safely access it. // SAFETY: inner is atomic, so we can safely access it.
unsafe { self.inner.as_ref_unchecked() } self.inner.as_core_latch()
} }
} }
@ -439,13 +430,13 @@ impl AsCoreLatch for WakeLatch {
/// A latch that can be set from any thread, but must be created with a valid waker. /// A latch that can be set from any thread, but must be created with a valid waker.
pub struct UnsafeWakeLatch { pub struct UnsafeWakeLatch {
waker: *const HeartbeatLatch, waker: *const MutexLatch,
} }
impl UnsafeWakeLatch { impl UnsafeWakeLatch {
/// # Safety /// # Safety
/// The `waker` must be valid until the latch is set. /// The `waker` must be valid until the latch is set.
pub unsafe fn new(waker: *const HeartbeatLatch) -> Self { pub unsafe fn new(waker: *const MutexLatch) -> Self {
Self { waker } Self { waker }
} }
} }
@ -556,7 +547,7 @@ mod tests {
#[test] #[test]
#[traced_test] #[traced_test]
fn mutex_latch() { fn mutex_latch() {
let latch = Arc::new(HeartbeatLatch::new()); let latch = Arc::new(MutexLatch::new());
assert!(!latch.probe()); assert!(!latch.probe());
latch.set(); latch.set();
assert!(latch.probe()); assert!(latch.probe());

View file

@ -13,14 +13,14 @@ use async_task::Runnable;
use crate::{ use crate::{
context::Context, context::Context,
job::{HeapJob, Job}, job::{HeapJob, Job},
latch::{AsCoreLatch, CountLatch, HeartbeatLatch, WakeLatch}, latch::{AsCoreLatch, CountLatch, MutexLatch, WakeLatch},
util::{DropGuard, SendPtr}, util::{DropGuard, SendPtr},
workerthread::WorkerThread, workerthread::WorkerThread,
}; };
pub struct Scope<'scope, 'env: 'scope> { pub struct Scope<'scope, 'env: 'scope> {
// latch to wait on before the scope finishes // latch to wait on before the scope finishes
job_counter: CountLatch<HeartbeatLatch>, job_counter: CountLatch<MutexLatch>,
// local threadpool // local threadpool
context: Arc<Context>, context: Arc<Context>,
// panic error // panic error
@ -258,7 +258,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
unsafe fn from_context(context: Arc<Context>) -> Self { unsafe fn from_context(context: Arc<Context>) -> Self {
Self { Self {
context, context,
job_counter: CountLatch::new(HeartbeatLatch::new()), job_counter: CountLatch::new(MutexLatch::new()),
panic: AtomicPtr::new(ptr::null_mut()), panic: AtomicPtr::new(ptr::null_mut()),
_scope: PhantomData, _scope: PhantomData,
_env: PhantomData, _env: PhantomData,

View file

@ -305,20 +305,6 @@ impl WorkerThread {
continue 'outer; continue 'outer;
} }
None => { None => {
// TODO: wait on latch? if we have something that can
// signal being done, e.g. can be waited on instead of
// shared jobs, we should wait on it instead, but we
// would also want to receive shared jobs still?
// Spin? probably just wastes CPU time.
// self.context.shared_job.wait(&mut guard);
// if spin.spin() {
// // wait for more shared jobs.
// // self.context.shared_job.wait(&mut guard);
// return;
// }
// Yield? same as spinning, really, so just exit and let the upstream use wait
// std::thread::yield_now();
tracing::trace!("thread {:?} is sleeping", self.index); tracing::trace!("thread {:?} is sleeping", self.index);
match self.heartbeat().latch.wait_and_reset() { match self.heartbeat().latch.wait_and_reset() {