From eb8fd314f5eff55003b8c18bfe6b88da0f98acb8 Mon Sep 17 00:00:00 2001 From: Janis Date: Thu, 26 Jun 2025 17:26:55 +0200 Subject: [PATCH] renamed heartbeatlatch to mutexlatch --- distaff/src/context.rs | 10 +++--- distaff/src/latch.rs | 67 ++++++++++++++++--------------------- distaff/src/scope.rs | 6 ++-- distaff/src/workerthread.rs | 14 -------- 4 files changed, 37 insertions(+), 60 deletions(-) diff --git a/distaff/src/context.rs b/distaff/src/context.rs index 5902a7e..e09e22e 100644 --- a/distaff/src/context.rs +++ b/distaff/src/context.rs @@ -14,18 +14,18 @@ use parking_lot::{Condvar, Mutex}; use crate::{ job::{HeapJob, Job, StackJob}, - latch::{AsCoreLatch, HeartbeatLatch, LatchRef, UnsafeWakeLatch}, + latch::{AsCoreLatch, MutexLatch, LatchRef, UnsafeWakeLatch}, workerthread::{HeartbeatThread, WorkerThread}, }; pub struct Heartbeat { - pub latch: HeartbeatLatch, + pub latch: MutexLatch, } impl Heartbeat { pub fn new() -> NonNull> { let ptr = Box::new(CachePadded::new(Self { - latch: HeartbeatLatch::new(), + latch: MutexLatch::new(), })); Box::into_non_null(ptr) @@ -225,10 +225,10 @@ impl Context { F: FnOnce(&WorkerThread) -> T + Send, T: Send, { - use crate::latch::HeartbeatLatch; + use crate::latch::MutexLatch; // current thread isn't a worker thread, create job and inject into global context - let latch = HeartbeatLatch::new(); + let latch = MutexLatch::new(); let job = StackJob::new( move || { diff --git a/distaff/src/latch.rs b/distaff/src/latch.rs index 989fed0..b3628f7 100644 --- a/distaff/src/latch.rs +++ b/distaff/src/latch.rs @@ -254,14 +254,14 @@ impl AsCoreLatch for CountLatch { } } -pub struct HeartbeatLatch { - inner: UnsafeCell, +pub struct MutexLatch { + inner: AtomicLatch, lock: Mutex<()>, condvar: Condvar, } -unsafe impl Send for HeartbeatLatch {} -unsafe impl Sync for HeartbeatLatch {} +unsafe impl Send for MutexLatch {} +unsafe impl Sync for MutexLatch {} #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum WakeResult { @@ -270,11 +270,11 @@ pub(crate) enum WakeResult { Set, } -impl HeartbeatLatch { +impl MutexLatch { #[inline] pub const fn new() -> Self { Self { - inner: UnsafeCell::new(AtomicLatch::new()), + inner: AtomicLatch::new(), lock: Mutex::new(()), condvar: Condvar::new(), } @@ -284,20 +284,19 @@ impl HeartbeatLatch { pub fn reset(&self) { let _guard = self.lock.lock(); // SAFETY: inner is atomic, so we can safely access it. - unsafe { self.inner.as_mut_unchecked().unset() }; + self.inner.reset(); } pub fn wait_and_reset(&self) -> WakeResult { // SAFETY: inner is locked by the mutex, so we can safely access it. - let value = unsafe { + let value = { let mut guard = self.lock.lock(); - let inner = self.inner.as_ref_unchecked(); - inner.set_sleeping(); - while inner.get() & !AtomicLatch::SLEEPING == AtomicLatch::UNSET { + self.inner.set_sleeping(); + while self.inner.get() & !AtomicLatch::SLEEPING == AtomicLatch::UNSET { self.condvar.wait(&mut guard); } - inner.reset() + self.inner.reset() }; if value & AtomicLatch::SET == AtomicLatch::SET { @@ -319,42 +318,34 @@ impl HeartbeatLatch { pub fn signal_heartbeat(&self) { let mut _guard = self.lock.lock(); - // SAFETY: inner is locked by the mutex, so we can safely access it. - unsafe { - let inner = self.inner.as_ref_unchecked(); - inner.set_heartbeat(); + self.inner.set_heartbeat(); - // If the latch was sleeping, notify the waiting thread. - if inner.is_sleeping() { - self.condvar.notify_all(); - } + // If the latch was sleeping, notify the waiting thread. + if self.inner.is_sleeping() { + self.condvar.notify_all(); } } pub fn signal_job_shared(&self) { let mut _guard = self.lock.lock(); - // SAFETY: inner is locked by the mutex, so we can safely access it. - unsafe { - self.inner.as_ref_unchecked().set_wakeup(); - if self.inner.as_ref_unchecked().is_sleeping() { - self.condvar.notify_all(); - } + self.inner.set_wakeup(); + if self.inner.is_sleeping() { + self.condvar.notify_all(); } } pub fn signal_job_finished(&self) { let mut _guard = self.lock.lock(); - // SAFETY: inner is locked by the mutex, so we can safely access it. unsafe { - CoreLatch::set(self.inner.get()); - if self.inner.as_ref_unchecked().is_sleeping() { + CoreLatch::set(&self.inner); + if self.inner.is_sleeping() { self.condvar.notify_all(); } } } } -impl Latch for HeartbeatLatch { +impl Latch for MutexLatch { #[inline] unsafe fn set_raw(this: *const Self) { // SAFETY: `this` is valid until the guard is dropped. @@ -362,27 +353,27 @@ impl Latch for HeartbeatLatch { let this = &*this; let _guard = this.lock.lock(); Latch::set_raw(this.inner.get() as *const AtomicLatch); - if this.inner.as_ref_unchecked().is_sleeping() { + if this.inner.is_sleeping() { this.condvar.notify_all(); } } } } -impl Probe for HeartbeatLatch { +impl Probe for MutexLatch { #[inline] fn probe(&self) -> bool { let _guard = self.lock.lock(); // SAFETY: inner is atomic, so we can safely access it. - unsafe { self.inner.as_ref_unchecked().probe() } + self.inner.probe() } } -impl AsCoreLatch for HeartbeatLatch { +impl AsCoreLatch for MutexLatch { #[inline] fn as_core_latch(&self) -> &CoreLatch { // SAFETY: inner is atomic, so we can safely access it. - unsafe { self.inner.as_ref_unchecked() } + self.inner.as_core_latch() } } @@ -439,13 +430,13 @@ impl AsCoreLatch for WakeLatch { /// A latch that can be set from any thread, but must be created with a valid waker. pub struct UnsafeWakeLatch { - waker: *const HeartbeatLatch, + waker: *const MutexLatch, } impl UnsafeWakeLatch { /// # Safety /// The `waker` must be valid until the latch is set. - pub unsafe fn new(waker: *const HeartbeatLatch) -> Self { + pub unsafe fn new(waker: *const MutexLatch) -> Self { Self { waker } } } @@ -556,7 +547,7 @@ mod tests { #[test] #[traced_test] fn mutex_latch() { - let latch = Arc::new(HeartbeatLatch::new()); + let latch = Arc::new(MutexLatch::new()); assert!(!latch.probe()); latch.set(); assert!(latch.probe()); diff --git a/distaff/src/scope.rs b/distaff/src/scope.rs index c474ff4..c198c36 100644 --- a/distaff/src/scope.rs +++ b/distaff/src/scope.rs @@ -13,14 +13,14 @@ use async_task::Runnable; use crate::{ context::Context, job::{HeapJob, Job}, - latch::{AsCoreLatch, CountLatch, HeartbeatLatch, WakeLatch}, + latch::{AsCoreLatch, CountLatch, MutexLatch, WakeLatch}, util::{DropGuard, SendPtr}, workerthread::WorkerThread, }; pub struct Scope<'scope, 'env: 'scope> { // latch to wait on before the scope finishes - job_counter: CountLatch, + job_counter: CountLatch, // local threadpool context: Arc, // panic error @@ -258,7 +258,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { unsafe fn from_context(context: Arc) -> Self { Self { context, - job_counter: CountLatch::new(HeartbeatLatch::new()), + job_counter: CountLatch::new(MutexLatch::new()), panic: AtomicPtr::new(ptr::null_mut()), _scope: PhantomData, _env: PhantomData, diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index 6533c14..4e10118 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -305,20 +305,6 @@ impl WorkerThread { continue 'outer; } None => { - // TODO: wait on latch? if we have something that can - // signal being done, e.g. can be waited on instead of - // shared jobs, we should wait on it instead, but we - // would also want to receive shared jobs still? - // Spin? probably just wastes CPU time. - // self.context.shared_job.wait(&mut guard); - // if spin.spin() { - // // wait for more shared jobs. - // // self.context.shared_job.wait(&mut guard); - // return; - // } - // Yield? same as spinning, really, so just exit and let the upstream use wait - // std::thread::yield_now(); - tracing::trace!("thread {:?} is sleeping", self.index); match self.heartbeat().latch.wait_and_reset() {