diff --git a/distaff/src/job.rs b/distaff/src/job.rs index 5d55c4b..7576587 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -1140,6 +1140,8 @@ impl JobReceiver { return None; } + tracing::trace!("received job ({:?}) result", &raw const *self); + // SAFETY: if we received a non-EMPTY tag, the value must be initialized. // because we atomically set the taag to EMPTY, we can be sure that we're the only ones accessing the value. let slot = unsafe { (&mut *self.channel.value.get()).assume_init_mut() }; diff --git a/distaff/src/latch.rs b/distaff/src/latch.rs index cdcb605..5af58cd 100644 --- a/distaff/src/latch.rs +++ b/distaff/src/latch.rs @@ -412,16 +412,17 @@ impl WorkerLatch { **guard = false; // reset the mutex to false after waking up } - fn wait_with_lock_internal(&self, other: &mut parking_lot::MutexGuard<'_, T>) { - let key = &self.condvar as *const _ as usize; - let lock_addr = &self.mutex as *const _ as usize; + fn wait_with_lock_internal( + condvar: &AtomicUsize, + mutex: &mut parking_lot::MutexGuard<'_, bool>, + other: &mut parking_lot::MutexGuard<'_, T>, + ) { + **mutex = true; + let key = condvar as *const _ as usize; + let lock_addr = parking_lot::MutexGuard::mutex(mutex) as *const _ as usize; let mut requeued = false; - let mut guard = self.mutex.lock(); - - let state = unsafe { AtomicUsize::from_ptr(&self.condvar as *const _ as *mut usize) }; - - *guard = true; // set the mutex to true to indicate that the worker is waiting + let state = condvar; unsafe { let token = parking_lot_core::park( @@ -437,7 +438,7 @@ impl WorkerLatch { true }, || { - drop(guard); // drop the guard to release the lock + parking_lot::MutexGuard::mutex(&mutex).force_unlock(); parking_lot::MutexGuard::mutex(&other).force_unlock(); }, |k, was_last_thread| { @@ -455,35 +456,55 @@ impl WorkerLatch { token ); } - // relock - let mut other2 = parking_lot::MutexGuard::mutex(&other).lock(); - tracing::trace!("WorkerLatch wait_with_lock_internal: relocked other"); - // because `other` is logically unlocked, we swap it with `other2` and then forget `other2` + let mut other2 = parking_lot::MutexGuard::mutex(&other).lock(); core::mem::swap(&mut other2, other); core::mem::forget(other2); - let mut guard = self.mutex.lock(); - tracing::trace!("WorkerLatch wait_with_lock_internal: relocked self"); + // because `other` is logically unlocked, we swap it with `other2` and then forget `other2` + let mut mutex2 = parking_lot::MutexGuard::mutex(&mutex).lock(); + core::mem::swap(&mut mutex2, mutex); + core::mem::forget(mutex2); - *guard = false; // reset the mutex to false after waking up + **mutex = false; } - #[tracing::instrument(level = "trace", skip(other))] + #[tracing::instrument(level = "trace", skip_all, fields( + this = self as *const Self as usize, + ))] pub fn wait_with_lock(&self, other: &mut parking_lot::MutexGuard<'_, T>) { - self.wait_with_lock_internal(other); + Self::wait_with_lock_internal(&self.condvar, &mut self.mutex.lock(), other); + } + + #[tracing::instrument(level = "trace", skip_all, fields( + this = self as *const Self as usize, + ))] + pub fn wait_with_lock_unless( + &self, + other: &mut parking_lot::MutexGuard<'_, T>, + mut pred: F, + ) where + F: FnMut(&mut T) -> bool, + { + let mut guard = self.mutex.lock(); + if !pred(other.deref_mut()) { + Self::wait_with_lock_internal(&self.condvar, &mut guard, other); + } } pub fn wait_with_lock_while(&self, other: &mut parking_lot::MutexGuard<'_, T>, mut f: F) where F: FnMut(&mut T) -> bool, { + let mut guard = self.mutex.lock(); while f(other.deref_mut()) { - self.wait_with_lock_internal(other); + Self::wait_with_lock_internal(&self.condvar, &mut guard, other); } } - #[tracing::instrument(level = "trace", skip(f))] + #[tracing::instrument(level = "trace", skip_all, fields( + this = self as *const Self as usize, + ))] pub fn wait_until(&self, mut f: F) -> T where F: FnMut() -> Option, @@ -501,13 +522,32 @@ impl WorkerLatch { *self.mutex.lock() } - #[tracing::instrument(level = "trace")] + #[tracing::instrument(level = "trace", skip_all, fields( + this = self as *const Self as usize, + ))] fn notify(&self) { - let key = &self.condvar as *const _ as usize; + let from = &self.condvar as *const _ as usize; + let to = &self.mutex as *const _ as usize; + + let validate = || { + if self.condvar.load(Ordering::Relaxed) != to { + return parking_lot_core::RequeueOp::Abort; + } + + self.condvar.store(0, Ordering::Relaxed); + + parking_lot_core::RequeueOp::UnparkOneRequeueRest + }; + + let callback = |_op: parking_lot_core::RequeueOp, + _result: parking_lot_core::UnparkResult| { + parking_lot_core::DEFAULT_UNPARK_TOKEN + }; unsafe { - let n = parking_lot_core::unpark_all(key, parking_lot_core::DEFAULT_UNPARK_TOKEN); - tracing::trace!("WorkerLatch notify_one: unparked {} threads", n); + //let n = parking_lot_core::unpark_requeue(from, to, validate, callback); + let n = parking_lot_core::unpark_all(from, parking_lot_core::DEFAULT_UNPARK_TOKEN); + tracing::trace!("WorkerLatch notify_one: unparked {:?}", n); } } diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index a3b525a..a56fdf4 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -41,7 +41,9 @@ impl WorkerThread { } impl WorkerThread { - #[tracing::instrument(level = "trace", skip_all)] + #[tracing::instrument(level = "trace", skip_all, fields( + worker = self.heartbeat.index(), + ))] pub fn run(self: Box, barrier: Arc) { let this = Box::into_raw(self); unsafe { @@ -114,7 +116,7 @@ impl WorkerThread { } #[tracing::instrument(level = "trace", skip_all)] - pub(crate) fn find_work_or_wait_unless(&self, mut pred: F) -> Option> + pub(crate) fn find_work_or_wait_unless(&self, pred: F) -> Option> where F: FnMut(&mut crate::context::Shared) -> bool, { @@ -128,12 +130,12 @@ impl WorkerThread { // this is very important, because the lock must be held when // notifying us of the result of a job we scheduled. // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - if !pred(std::ops::DerefMut::deref_mut(&mut guard)) { - // no jobs found, wait for a heartbeat or a new job - tracing::trace!("WorkerThread::find_work_or_wait_unless: waiting for new job"); - self.heartbeat.latch().wait_with_lock(&mut guard); - tracing::trace!("WorkerThread::find_work_or_wait_unless: woken up from wait"); - } + // no jobs found, wait for a heartbeat or a new job + // tracing::trace!(worker = self.heartbeat.index(), "waiting for new job"); + self.heartbeat + .latch() + .wait_with_lock_unless(&mut guard, pred); + // tracing::trace!(worker = self.heartbeat.index(), "woken up from wait"); None }