From 228aa4d5449877ad52b3a29c1125cbddcc254506 Mon Sep 17 00:00:00 2001 From: Janis Date: Mon, 30 Jun 2025 15:00:57 +0200 Subject: [PATCH] fix deadlock? --- distaff/src/context.rs | 2 +- distaff/src/job.rs | 2 +- distaff/src/latch.rs | 13 +++++++ distaff/src/workerthread.rs | 71 ++++++++++++++++--------------------- 4 files changed, 45 insertions(+), 43 deletions(-) diff --git a/distaff/src/context.rs b/distaff/src/context.rs index 2cc55a0..9e93ce6 100644 --- a/distaff/src/context.rs +++ b/distaff/src/context.rs @@ -156,7 +156,7 @@ impl Context { } } - // caller should hold the shared lock while calling this + /// caller should hold the shared lock while calling this pub unsafe fn notify_job_shared(&self) { if let Some((i, sender)) = self .heartbeats diff --git a/distaff/src/job.rs b/distaff/src/job.rs index 7576587..db82b2d 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -1076,12 +1076,12 @@ impl JobSender { // | send-> | | | // | lock() | | | // | FINISHED | | | - // | | | poll() | // | | | lock()-> | // thread 2 tries to lock. // | wake() | | // the wake signal is ignored // | | | | // | unlock() | | // | | | l=lock() | // thread2 wakes up and receives the lock + // | | | poll() | // | <-send | sleep(l) | // thread 2 is now sleeping // // This concludes my TED talk on why we need to lock here. diff --git a/distaff/src/latch.rs b/distaff/src/latch.rs index 5af58cd..0d56a6e 100644 --- a/distaff/src/latch.rs +++ b/distaff/src/latch.rs @@ -502,6 +502,19 @@ impl WorkerLatch { } } + #[tracing::instrument(level = "trace", skip_all, fields( + this = self as *const Self as usize, + ))] + pub fn wait_unless(&self, mut f: F) + where + F: FnMut() -> bool, + { + let mut guard = self.mutex.lock(); + if !f() { + Self::wait_internal(&self.condvar, &mut guard); + } + } + #[tracing::instrument(level = "trace", skip_all, fields( this = self as *const Self as usize, ))] diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index a56fdf4..758aeb4 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -90,7 +90,7 @@ impl WorkerThread { impl WorkerThread { pub(crate) fn find_work(&self) -> Option> { - self.find_work_inner().left() + self.find_work_inner() } /// Looks for work in the local queue, then in the shared context, and if no @@ -100,56 +100,43 @@ impl WorkerThread { /// thread should exit, or look for work again. #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn find_work_or_wait(&self) -> Option> { - match self.find_work_inner() { - either::Either::Left(job) => { - return Some(job); - } - either::Either::Right(mut guard) => { - // no jobs found, wait for a heartbeat or a new job - tracing::trace!("WorkerThread::find_work_or_wait: waiting for new job"); - self.heartbeat.latch().wait_with_lock(&mut guard); - tracing::trace!("WorkerThread::find_work_or_wait: woken up from wait"); - - None - } + if let Some(job) = self.find_work_inner() { + return Some(job); } + + tracing::trace!("waiting for new job"); + self.heartbeat.latch().wait(); + tracing::trace!("woken up from wait"); + None } #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn find_work_or_wait_unless(&self, pred: F) -> Option> where - F: FnMut(&mut crate::context::Shared) -> bool, + F: FnMut() -> bool, { - match self.find_work_inner() { - either::Either::Left(job) => { - return Some(job); - } - either::Either::Right(mut guard) => { - // check the predicate while holding the lock - // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - // this is very important, because the lock must be held when - // notifying us of the result of a job we scheduled. - // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - // no jobs found, wait for a heartbeat or a new job - // tracing::trace!(worker = self.heartbeat.index(), "waiting for new job"); - self.heartbeat - .latch() - .wait_with_lock_unless(&mut guard, pred); - // tracing::trace!(worker = self.heartbeat.index(), "woken up from wait"); - - None - } + if let Some(job) = self.find_work_inner() { + return Some(job); } + // check the predicate while holding the lock + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // this is very important, because the lock must be held when + // notifying us of the result of a job we scheduled. + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // no jobs found, wait for a heartbeat or a new job + // tracing::trace!(worker = self.heartbeat.index(), "waiting for new job"); + self.heartbeat.latch().wait_unless(pred); + // tracing::trace!(worker = self.heartbeat.index(), "woken up from wait"); + + None } #[inline] - fn find_work_inner( - &self, - ) -> either::Either, parking_lot::MutexGuard<'_, crate::context::Shared>> { + fn find_work_inner(&self) -> Option> { // first check the local queue for jobs if let Some(job) = self.pop_front() { tracing::trace!("WorkerThread::find_work_inner: found local job: {:?}", job); - return either::Either::Left(job); + return Some(job); } // then check the shared context for jobs @@ -157,10 +144,10 @@ impl WorkerThread { if let Some(job) = guard.pop_job() { tracing::trace!("WorkerThread::find_work_inner: found shared job: {:?}", job); - return either::Either::Left(job); + return Some(job); } - either::Either::Right(guard) + None } #[inline(always)] @@ -345,7 +332,7 @@ impl WorkerThread { let mut out = recv.poll(); while std::hint::unlikely(out.is_none()) { - if let Some(job) = self.find_work_or_wait_unless(|_| { + if let Some(job) = self.find_work_or_wait_unless(|| { out = recv.poll(); out.is_some() }) { @@ -354,7 +341,9 @@ impl WorkerThread { } } + self.heartbeat.latch().lock(); out = recv.poll(); + self.heartbeat.latch().unlock(); } out @@ -389,7 +378,7 @@ impl WorkerThread { // do the usual thing??? chatgipity really said this.. while !latch.probe() { // check local jobs before locking shared context - if let Some(job) = self.find_work_or_wait_unless(|_| latch.probe()) { + if let Some(job) = self.find_work_or_wait_unless(|| latch.probe()) { unsafe { Job::execute(job.as_ptr()); }