fix deadlock?

This commit is contained in:
Janis 2025-06-30 15:00:57 +02:00
parent 6fe5351e59
commit 228aa4d544
4 changed files with 45 additions and 43 deletions

View file

@ -156,7 +156,7 @@ impl Context {
} }
} }
// caller should hold the shared lock while calling this /// caller should hold the shared lock while calling this
pub unsafe fn notify_job_shared(&self) { pub unsafe fn notify_job_shared(&self) {
if let Some((i, sender)) = self if let Some((i, sender)) = self
.heartbeats .heartbeats

View file

@ -1076,12 +1076,12 @@ impl<T> JobSender<T> {
// | send-> | | | // | send-> | | |
// | lock() | | | // | lock() | | |
// | FINISHED | | | // | FINISHED | | |
// | | | poll() |
// | | | lock()-> | // thread 2 tries to lock. // | | | lock()-> | // thread 2 tries to lock.
// | wake() | | // the wake signal is ignored // | wake() | | // the wake signal is ignored
// | | | | // | | | |
// | unlock() | | // | unlock() | |
// | | | l=lock() | // thread2 wakes up and receives the lock // | | | l=lock() | // thread2 wakes up and receives the lock
// | | | poll() |
// | <-send | sleep(l) | // thread 2 is now sleeping // | <-send | sleep(l) | // thread 2 is now sleeping
// //
// This concludes my TED talk on why we need to lock here. // This concludes my TED talk on why we need to lock here.

View file

@ -502,6 +502,19 @@ impl WorkerLatch {
} }
} }
#[tracing::instrument(level = "trace", skip_all, fields(
this = self as *const Self as usize,
))]
pub fn wait_unless<F>(&self, mut f: F)
where
F: FnMut() -> bool,
{
let mut guard = self.mutex.lock();
if !f() {
Self::wait_internal(&self.condvar, &mut guard);
}
}
#[tracing::instrument(level = "trace", skip_all, fields( #[tracing::instrument(level = "trace", skip_all, fields(
this = self as *const Self as usize, this = self as *const Self as usize,
))] ))]

View file

@ -90,7 +90,7 @@ impl WorkerThread {
impl WorkerThread { impl WorkerThread {
pub(crate) fn find_work(&self) -> Option<NonNull<Job>> { pub(crate) fn find_work(&self) -> Option<NonNull<Job>> {
self.find_work_inner().left() self.find_work_inner()
} }
/// Looks for work in the local queue, then in the shared context, and if no /// Looks for work in the local queue, then in the shared context, and if no
@ -100,31 +100,24 @@ impl WorkerThread {
/// thread should exit, or look for work again. /// thread should exit, or look for work again.
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn find_work_or_wait(&self) -> Option<NonNull<Job>> { pub(crate) fn find_work_or_wait(&self) -> Option<NonNull<Job>> {
match self.find_work_inner() { if let Some(job) = self.find_work_inner() {
either::Either::Left(job) => {
return Some(job); return Some(job);
} }
either::Either::Right(mut guard) => {
// no jobs found, wait for a heartbeat or a new job
tracing::trace!("WorkerThread::find_work_or_wait: waiting for new job");
self.heartbeat.latch().wait_with_lock(&mut guard);
tracing::trace!("WorkerThread::find_work_or_wait: woken up from wait");
tracing::trace!("waiting for new job");
self.heartbeat.latch().wait();
tracing::trace!("woken up from wait");
None None
} }
}
}
#[tracing::instrument(level = "trace", skip_all)] #[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn find_work_or_wait_unless<F>(&self, pred: F) -> Option<NonNull<Job>> pub(crate) fn find_work_or_wait_unless<F>(&self, pred: F) -> Option<NonNull<Job>>
where where
F: FnMut(&mut crate::context::Shared) -> bool, F: FnMut() -> bool,
{ {
match self.find_work_inner() { if let Some(job) = self.find_work_inner() {
either::Either::Left(job) => {
return Some(job); return Some(job);
} }
either::Either::Right(mut guard) => {
// check the predicate while holding the lock // check the predicate while holding the lock
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// this is very important, because the lock must be held when // this is very important, because the lock must be held when
@ -132,24 +125,18 @@ impl WorkerThread {
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// no jobs found, wait for a heartbeat or a new job // no jobs found, wait for a heartbeat or a new job
// tracing::trace!(worker = self.heartbeat.index(), "waiting for new job"); // tracing::trace!(worker = self.heartbeat.index(), "waiting for new job");
self.heartbeat self.heartbeat.latch().wait_unless(pred);
.latch()
.wait_with_lock_unless(&mut guard, pred);
// tracing::trace!(worker = self.heartbeat.index(), "woken up from wait"); // tracing::trace!(worker = self.heartbeat.index(), "woken up from wait");
None None
} }
}
}
#[inline] #[inline]
fn find_work_inner( fn find_work_inner(&self) -> Option<NonNull<Job>> {
&self,
) -> either::Either<NonNull<Job>, parking_lot::MutexGuard<'_, crate::context::Shared>> {
// first check the local queue for jobs // first check the local queue for jobs
if let Some(job) = self.pop_front() { if let Some(job) = self.pop_front() {
tracing::trace!("WorkerThread::find_work_inner: found local job: {:?}", job); tracing::trace!("WorkerThread::find_work_inner: found local job: {:?}", job);
return either::Either::Left(job); return Some(job);
} }
// then check the shared context for jobs // then check the shared context for jobs
@ -157,10 +144,10 @@ impl WorkerThread {
if let Some(job) = guard.pop_job() { if let Some(job) = guard.pop_job() {
tracing::trace!("WorkerThread::find_work_inner: found shared job: {:?}", job); tracing::trace!("WorkerThread::find_work_inner: found shared job: {:?}", job);
return either::Either::Left(job); return Some(job);
} }
either::Either::Right(guard) None
} }
#[inline(always)] #[inline(always)]
@ -345,7 +332,7 @@ impl WorkerThread {
let mut out = recv.poll(); let mut out = recv.poll();
while std::hint::unlikely(out.is_none()) { while std::hint::unlikely(out.is_none()) {
if let Some(job) = self.find_work_or_wait_unless(|_| { if let Some(job) = self.find_work_or_wait_unless(|| {
out = recv.poll(); out = recv.poll();
out.is_some() out.is_some()
}) { }) {
@ -354,7 +341,9 @@ impl WorkerThread {
} }
} }
self.heartbeat.latch().lock();
out = recv.poll(); out = recv.poll();
self.heartbeat.latch().unlock();
} }
out out
@ -389,7 +378,7 @@ impl WorkerThread {
// do the usual thing??? chatgipity really said this.. // do the usual thing??? chatgipity really said this..
while !latch.probe() { while !latch.probe() {
// check local jobs before locking shared context // check local jobs before locking shared context
if let Some(job) = self.find_work_or_wait_unless(|_| latch.probe()) { if let Some(job) = self.find_work_or_wait_unless(|| latch.probe()) {
unsafe { unsafe {
Job::execute(job.as_ptr()); Job::execute(job.as_ptr());
} }