diff --git a/Cargo.toml b/Cargo.toml index 4fe3ac4..c7de9d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ parking_lot = {version = "0.12.3"} thread_local = "1.1.8" crossbeam = "0.8.4" st3 = "0.4" -chili = "0.2.0" +chili = "0.2.1" async-task = "4.7.1" diff --git a/distaff/src/context.rs b/distaff/src/context.rs index b2350bb..d17ce6d 100644 --- a/distaff/src/context.rs +++ b/distaff/src/context.rs @@ -14,13 +14,13 @@ use parking_lot::{Condvar, Mutex}; use crate::{ job::{HeapJob, Job, StackJob}, - latch::{LatchRef, MutexLatch, UnsafeWakeLatch}, + latch::{AsCoreLatch, HeartbeatLatch, LatchRef, UnsafeWakeLatch}, workerthread::{HeartbeatThread, WorkerThread}, }; pub struct Heartbeat { heartbeat: AtomicU8, - pub latch: MutexLatch, + pub latch: HeartbeatLatch, } impl Heartbeat { @@ -31,25 +31,15 @@ impl Heartbeat { pub fn new() -> (Arc>, Weak>) { let strong = Arc::new(CachePadded::new(Self { heartbeat: AtomicU8::new(Self::CLEAR), - latch: MutexLatch::new(), + latch: HeartbeatLatch::new(), })); let weak = Arc::downgrade(&strong); (strong, weak) } - /// returns true if the heartbeat was previously sleeping. - pub fn set_pending(&self) -> bool { - let old = self.heartbeat.swap(Self::PENDING, Ordering::Relaxed); - old == Self::SLEEPING - } - - pub fn clear(&self) { - self.heartbeat.store(Self::CLEAR, Ordering::Relaxed); - } - pub fn is_pending(&self) -> bool { - self.heartbeat.load(Ordering::Relaxed) == Self::PENDING + self.latch.as_core_latch().poll_heartbeat() } } @@ -80,6 +70,10 @@ impl Shared { (strong, index) } + pub(crate) fn remove_heartbeat(&mut self, index: usize) { + self.heartbeats.remove(&index); + } + pub fn pop_job(&mut self) -> Option> { // this is unlikely, so make the function cold? // TODO: profile this @@ -96,6 +90,17 @@ impl Shared { self.injected_jobs.pop().unwrap() } + pub fn notify_job_shared(&self) { + _ = self.heartbeats.iter().find(|(_, heartbeat)| { + if let Some(heartbeat) = heartbeat.upgrade() { + heartbeat.latch.signal_job_shared(); + true + } else { + false + } + }); + } + pub fn should_exit(&self) -> bool { self.should_exit } @@ -162,7 +167,7 @@ impl Context { shared.should_exit = true; for (_, heartbeat) in shared.heartbeats.iter() { if let Some(heartbeat) = heartbeat.upgrade() { - heartbeat.latch.set(); + heartbeat.latch.signal_job_shared(); } } self.shared_job.notify_all(); @@ -181,11 +186,8 @@ impl Context { pub fn inject_job(&self, job: NonNull) { let mut shared = self.shared.lock(); shared.injected_jobs.push(job); - self.notify_shared_job(); - } - pub fn notify_shared_job(&self) { - self.shared_job.notify_one(); + shared.notify_job_shared(); } /// Runs closure in this context, processing the other context's worker's jobs while waiting for the result. @@ -227,10 +229,10 @@ impl Context { F: FnOnce(&WorkerThread) -> T + Send, T: Send, { - use crate::latch::MutexLatch; + use crate::latch::HeartbeatLatch; // current thread isn't a worker thread, create job and inject into global context - let latch = MutexLatch::new(); + let latch = HeartbeatLatch::new(); let job = StackJob::new( move || { @@ -246,7 +248,7 @@ impl Context { job.set_pending(); self.inject_job(Into::into(&job)); - latch.wait(); + latch.wait_and_reset(); let t = unsafe { job.transmute_ref::().wait().into_result() }; diff --git a/distaff/src/job.rs b/distaff/src/job.rs index 85e5bdd..d0a66d3 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -650,7 +650,7 @@ mod stackjob { let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); - tracing::trace!("job completed: {:?}", job); + tracing::trace!("stack job completed: {:?}", job); let job = unsafe { &*job.cast::>() }; job.complete(result); @@ -703,13 +703,20 @@ mod heapjob { let this = unsafe { Box::from_raw(this.cast::>().cast_mut()) }; let f = this.into_inner(); - _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); + { + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); + + let job = unsafe { &*job.cast::>() }; + job.complete(result); + } // drop job (this is fine because the job of a HeapJob is pure POD). unsafe { ptr::drop_in_place(job); } + tracing::trace!("heap job completed: {:?}", job); + // free box that was allocated at (1) _ = unsafe { Box::>>::from_raw(job.cast()) }; } diff --git a/distaff/src/join.rs b/distaff/src/join.rs index 5f13b3f..ce2f2d0 100644 --- a/distaff/src/join.rs +++ b/distaff/src/join.rs @@ -67,20 +67,12 @@ impl WorkerThread { // because we will be waiting on it. let latch = unsafe { UnsafeWakeLatch::new(&raw const (*self.heartbeat).latch) }; - let a = StackJob::new( - move || { - // TODO: bench whether tick'ing here is good. - // turns out this actually costs a lot of time, likely because of the thread local check. - // WorkerThread::current_ref() - // .expect("stackjob is run in workerthread.") - // .tick(); - a() - }, - LatchRef::new(&latch), - ); + let a = StackJob::new(a, LatchRef::new(&latch)); let job = a.as_job(); - self.push_front(&job); + self.push_back(&job); + + self.tick(); let rb = match catch_unwind(AssertUnwindSafe(|| b())) { Ok(val) => val, @@ -97,9 +89,12 @@ impl WorkerThread { // remove job from the queue, so it doesn't get run again. // job.unlink(); //SAFETY: we are in a worker thread, so we can safely access the queue. - unsafe { - self.queue.as_mut_unchecked().remove(&job); - } + // unsafe { + // self.queue.as_mut_unchecked().remove(&job); + // } + + // we pushed the job to the back of the queue, any `join`s called by `b` on this worker thread will have already popped their job, or seen it be executed. + self.pop_back(); // a is allowed to panic here, because we already finished b. unsafe { a.unwrap()() } diff --git a/distaff/src/latch.rs b/distaff/src/latch.rs index def9d72..a3b6740 100644 --- a/distaff/src/latch.rs +++ b/distaff/src/latch.rs @@ -2,7 +2,10 @@ use core::{ marker::PhantomData, sync::atomic::{AtomicUsize, Ordering}, }; -use std::sync::{Arc, atomic::AtomicU8}; +use std::{ + cell::UnsafeCell, + sync::{Arc, atomic::AtomicU8}, +}; use parking_lot::{Condvar, Mutex}; @@ -30,6 +33,8 @@ impl AtomicLatch { pub const UNSET: u8 = 0; pub const SET: u8 = 1; pub const SLEEPING: u8 = 2; + pub const WAKEUP: u8 = 4; + pub const HEARTBEAT: u8 = 8; #[inline] pub const fn new() -> Self { @@ -45,24 +50,58 @@ impl AtomicLatch { } #[inline] - pub fn reset(&self) { - self.inner.store(Self::UNSET, Ordering::Release); + pub fn unset(&self) { + self.inner.fetch_and(!Self::SET, Ordering::Release); + } + + pub fn reset(&self) -> u8 { + self.inner.swap(Self::UNSET, Ordering::Release) } pub fn get(&self) -> u8 { self.inner.load(Ordering::Acquire) } - pub fn set_sleeping(&self) { - self.inner.store(Self::SLEEPING, Ordering::Release); + pub fn poll_heartbeat(&self) -> bool { + self.inner.fetch_and(!Self::HEARTBEAT, Ordering::Relaxed) & Self::HEARTBEAT + == Self::HEARTBEAT + } + + /// returns true if the latch was already set. + pub fn set_sleeping(&self) -> bool { + self.inner.fetch_or(Self::SLEEPING, Ordering::Relaxed) & Self::SET == Self::SET + } + + pub fn is_sleeping(&self) -> bool { + self.inner.load(Ordering::Relaxed) & Self::SLEEPING == Self::SLEEPING + } + + pub fn is_heartbeat(&self) -> bool { + self.inner.load(Ordering::Relaxed) & Self::HEARTBEAT == Self::HEARTBEAT + } + + pub fn is_wakeup(&self) -> bool { + self.inner.load(Ordering::Relaxed) & Self::WAKEUP == Self::WAKEUP + } + + pub fn is_set(&self) -> bool { + self.inner.load(Ordering::Relaxed) & Self::SET == Self::SET + } + + pub fn set_wakeup(&self) { + self.inner.fetch_or(Self::WAKEUP, Ordering::Relaxed); + } + + pub fn set_heartbeat(&self) { + self.inner.fetch_or(Self::HEARTBEAT, Ordering::Relaxed); } /// returns true if the latch was previously sleeping. #[inline] pub unsafe fn set(this: *const Self) -> bool { unsafe { - let old = (*this).inner.swap(Self::SET, Ordering::Release); - old == Self::SLEEPING + let old = (*this).inner.fetch_or(Self::SET, Ordering::Relaxed); + old & Self::SLEEPING == Self::SLEEPING } } } @@ -79,7 +118,7 @@ impl Latch for AtomicLatch { impl Probe for AtomicLatch { #[inline] fn probe(&self) -> bool { - self.inner.load(Ordering::Acquire) == Self::SET + self.inner.load(Ordering::Relaxed) & Self::SET == Self::SET } } impl AsCoreLatch for AtomicLatch { @@ -153,58 +192,6 @@ impl Probe for NopLatch { } } -pub struct ThreadWakeLatch { - waker: Mutex>, -} - -impl ThreadWakeLatch { - #[inline] - pub const fn new() -> Self { - Self { - waker: Mutex::new(None), - } - } - - #[inline] - pub fn reset(&self) { - let mut waker = self.waker.lock(); - *waker = None; - } - - #[inline] - pub fn set_waker(&self, thread: std::thread::Thread) { - let mut waker = self.waker.lock(); - *waker = Some(thread); - } - - pub unsafe fn wait(&self) { - assert!( - self.waker.lock().replace(std::thread::current()).is_none(), - "ThreadWakeLatch can only be waited once per thread" - ); - - std::thread::park(); - } -} - -impl Latch for ThreadWakeLatch { - #[inline] - unsafe fn set_raw(this: *const Self) { - unsafe { - if let Some(thread) = (&*this).waker.lock().take() { - thread.unpark(); - } - } - } -} - -impl Probe for ThreadWakeLatch { - #[inline] - fn probe(&self) -> bool { - self.waker.lock().is_some() - } -} - pub struct CountLatch { count: AtomicUsize, inner: L, @@ -234,10 +221,8 @@ impl CountLatch { #[inline] pub fn decrement(&self) { - if self.count.fetch_sub(1, Ordering::Release) == 1 { - unsafe { - Latch::set_raw(&self.inner); - } + unsafe { + Latch::set_raw(self); } } } @@ -246,8 +231,11 @@ impl Latch for CountLatch { #[inline] unsafe fn set_raw(this: *const Self) { unsafe { - let this = &*this; - this.decrement(); + if (&*this).count.fetch_sub(1, Ordering::Relaxed) == 1 { + tracing::trace!("CountLatch set_raw: count was 1, setting inner latch"); + // If the count was 1, we need to set the inner latch. + Latch::set_raw(&(*this).inner); + } } } } @@ -266,30 +254,60 @@ impl AsCoreLatch for CountLatch { } } -pub struct MutexLatch { - inner: Mutex, +pub struct HeartbeatLatch { + inner: UnsafeCell, + lock: Mutex<()>, condvar: Condvar, } -impl MutexLatch { +unsafe impl Send for HeartbeatLatch {} +unsafe impl Sync for HeartbeatLatch {} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum WakeResult { + Wake, + Heartbeat, + Set, +} + +impl HeartbeatLatch { #[inline] pub const fn new() -> Self { Self { - inner: Mutex::new(false), + inner: UnsafeCell::new(AtomicLatch::new()), + lock: Mutex::new(()), condvar: Condvar::new(), } } #[inline] pub fn reset(&self) { - let mut guard = self.inner.lock(); - *guard = false; + let _guard = self.lock.lock(); + // SAFETY: inner is atomic, so we can safely access it. + unsafe { self.inner.as_mut_unchecked().unset() }; } - pub fn wait(&self) { - let mut guard = self.inner.lock(); - while !*guard { - self.condvar.wait(&mut guard); + pub fn wait_and_reset(&self) -> WakeResult { + // SAFETY: inner is locked by the mutex, so we can safely access it. + let value = unsafe { + let mut guard = self.lock.lock(); + let inner = self.inner.as_ref_unchecked(); + inner.set_sleeping(); + while inner.get() & !AtomicLatch::SLEEPING == AtomicLatch::UNSET { + self.condvar.wait(&mut guard); + } + + inner.reset() + }; + + if value & AtomicLatch::SET == AtomicLatch::SET { + WakeResult::Set + } else if value & AtomicLatch::WAKEUP == AtomicLatch::WAKEUP { + WakeResult::Wake + } else if value & AtomicLatch::HEARTBEAT == AtomicLatch::HEARTBEAT { + WakeResult::Heartbeat + } else { + panic!("MutexLatch was not set correctly"); } } @@ -299,29 +317,72 @@ impl MutexLatch { } } - pub fn wait_and_reset(&self) { - let mut guard = self.inner.lock(); - while !*guard { - self.condvar.wait(&mut guard); + pub fn signal_heartbeat(&self) { + let mut _guard = self.lock.lock(); + // SAFETY: inner is locked by the mutex, so we can safely access it. + unsafe { + let inner = self.inner.as_ref_unchecked(); + inner.set_heartbeat(); + + // If the latch was sleeping, notify the waiting thread. + if inner.is_sleeping() { + self.condvar.notify_all(); + } + } + } + + pub fn signal_job_shared(&self) { + let mut _guard = self.lock.lock(); + // SAFETY: inner is locked by the mutex, so we can safely access it. + unsafe { + self.inner.as_ref_unchecked().set_wakeup(); + if self.inner.as_ref_unchecked().is_sleeping() { + self.condvar.notify_all(); + } + } + } + + pub fn signal_job_finished(&self) { + let mut _guard = self.lock.lock(); + // SAFETY: inner is locked by the mutex, so we can safely access it. + unsafe { + CoreLatch::set(self.inner.get()); + if self.inner.as_ref_unchecked().is_sleeping() { + self.condvar.notify_all(); + } } - *guard = false; } } -impl Latch for MutexLatch { +impl Latch for HeartbeatLatch { #[inline] unsafe fn set_raw(this: *const Self) { + // SAFETY: `this` is valid until the guard is dropped. unsafe { - *(&*this).inner.lock() = true; - (&*this).condvar.notify_all(); + let this = &*this; + let _guard = this.lock.lock(); + Latch::set_raw(this.inner.get() as *const AtomicLatch); + if this.inner.as_ref_unchecked().is_sleeping() { + this.condvar.notify_all(); + } } } } -impl Probe for MutexLatch { +impl Probe for HeartbeatLatch { #[inline] fn probe(&self) -> bool { - *self.inner.lock() + let _guard = self.lock.lock(); + // SAFETY: inner is atomic, so we can safely access it. + unsafe { self.inner.as_ref_unchecked().probe() } + } +} + +impl AsCoreLatch for HeartbeatLatch { + #[inline] + fn as_core_latch(&self) -> &CoreLatch { + // SAFETY: inner is atomic, so we can safely access it. + unsafe { self.inner.as_ref_unchecked() } } } @@ -354,8 +415,10 @@ impl Latch for WakeLatch { let ctx = WorkerThread::current_ref().unwrap().context.clone(); // If the latch was sleeping, wake the worker thread ctx.shared().heartbeats.get(&worker_index).and_then(|weak| { - weak.upgrade() - .map(|heartbeat| Latch::set_raw(&heartbeat.latch)) + weak.upgrade().map(|heartbeat| { + // we set the latch to wake the worker so it knows to check the heartbeat + heartbeat.latch.signal_job_finished() + }) }); } } @@ -376,19 +439,16 @@ impl AsCoreLatch for WakeLatch { } } +/// A latch that can be set from any thread, but must be created with a valid waker. pub struct UnsafeWakeLatch { - inner: AtomicLatch, - waker: *const MutexLatch, + waker: *const HeartbeatLatch, } impl UnsafeWakeLatch { /// # Safety /// The `waker` must be valid until the latch is set. - pub unsafe fn new(waker: *const MutexLatch) -> Self { - Self { - inner: AtomicLatch::new(), - waker, - } + pub unsafe fn new(waker: *const HeartbeatLatch) -> Self { + Self { waker } } } @@ -397,9 +457,7 @@ impl Latch for UnsafeWakeLatch { unsafe fn set_raw(this: *const Self) { unsafe { let waker = (*this).waker; - if CoreLatch::set(&(&*this).inner) { - Latch::set_raw(waker); - } + Latch::set_raw(waker); } } } @@ -407,14 +465,22 @@ impl Latch for UnsafeWakeLatch { impl Probe for UnsafeWakeLatch { #[inline] fn probe(&self) -> bool { - self.inner.probe() + // SAFETY: waker is valid as per the constructor contract. + unsafe { + let waker = &*self.waker; + waker.probe() + } } } impl AsCoreLatch for UnsafeWakeLatch { #[inline] fn as_core_latch(&self) -> &CoreLatch { - &self.inner + // SAFETY: waker is valid as per the constructor contract. + unsafe { + let waker = &*self.waker; + waker.as_core_latch() + } } } @@ -437,7 +503,7 @@ mod tests { } assert_eq!(latch.get(), AtomicLatch::SET); assert!(latch.probe()); - latch.reset(); + latch.unset(); assert_eq!(latch.get(), AtomicLatch::UNSET); } @@ -451,7 +517,7 @@ mod tests { assert!(!latch.probe()); assert!(AtomicLatch::set(&latch)); } - assert_eq!(latch.get(), AtomicLatch::SET); + assert_eq!(latch.get(), AtomicLatch::SET | AtomicLatch::SLEEPING); assert!(latch.probe()); latch.reset(); assert_eq!(latch.get(), AtomicLatch::UNSET); @@ -465,32 +531,6 @@ mod tests { ); } - #[test] - fn thread_wake_latch() { - let latch = Arc::new(ThreadWakeLatch::new()); - let main = Arc::new(ThreadWakeLatch::new()); - - let handle = std::thread::spawn({ - let latch = latch.clone(); - let main = main.clone(); - move || unsafe { - Latch::set_raw(&*main); - latch.wait(); - } - }); - - unsafe { - main.wait(); - Latch::set_raw(&*latch); - } - - handle.join().expect("Thread should join successfully"); - assert!( - !latch.probe() && !main.probe(), - "Latch should be set after waiting thread wakes up" - ); - } - #[test] fn count_latch() { let latch = CountLatch::new(AtomicLatch::new()); @@ -516,8 +556,9 @@ mod tests { } #[test] + #[traced_test] fn mutex_latch() { - let latch = Arc::new(MutexLatch::new()); + let latch = Arc::new(HeartbeatLatch::new()); assert!(!latch.probe()); latch.set(); assert!(latch.probe()); @@ -527,7 +568,7 @@ mod tests { // Test wait functionality let latch_clone = latch.clone(); let handle = std::thread::spawn(move || { - latch_clone.wait(); + assert_eq!(latch_clone.wait_and_reset(), WakeResult::Set); }); // Give the thread time to block @@ -535,11 +576,11 @@ mod tests { assert!(!latch.probe()); latch.set(); - assert!(latch.probe()); handle.join().expect("Thread should join successfully"); } #[test] + #[traced_test] fn wake_latch() { let context = Context::new_with_threads(1); let count = Arc::new(AtomicUsize::new(0)); diff --git a/distaff/src/scope.rs b/distaff/src/scope.rs index 06625ca..d5e5d3b 100644 --- a/distaff/src/scope.rs +++ b/distaff/src/scope.rs @@ -13,14 +13,14 @@ use async_task::Runnable; use crate::{ context::Context, job::{HeapJob, Job}, - latch::{AsCoreLatch, CountLatch, WakeLatch}, + latch::{AsCoreLatch, CountLatch, HeartbeatLatch, WakeLatch}, util::{DropGuard, SendPtr}, workerthread::WorkerThread, }; pub struct Scope<'scope, 'env: 'scope> { // latch to wait on before the scope finishes - job_counter: CountLatch, + job_counter: CountLatch, // local threadpool context: Arc, // panic error @@ -61,7 +61,6 @@ impl<'scope, 'env> Scope<'scope, 'env> { }); // set worker index in the job counter - self.job_counter.inner().set_worker_index(worker.index); worker.wait_until_latch(self.job_counter.as_core_latch()); } } @@ -146,23 +145,23 @@ impl<'scope, 'env> Scope<'scope, 'env> { where F: FnOnce(&'scope Self) + Send, { - self.context.run_in_worker(|worker| { - self.job_counter.increment(); + self.job_counter.increment(); - let this = SendPtr::new_const(self).unwrap(); + let this = SendPtr::new_const(self).unwrap(); - let job = Box::new(HeapJob::new(move || unsafe { - _ = f(this.as_ref()); - this.as_ref().job_counter.decrement(); - })) - .into_boxed_job(); + let job = Box::new(HeapJob::new(move || unsafe { + _ = f(this.as_ref()); + this.as_unchecked_ref().job_counter.decrement(); + })) + .into_boxed_job(); - tracing::trace!("allocated heapjob"); + tracing::trace!("allocated heapjob"); - worker.push_front(job); + WorkerThread::current_ref() + .expect("spawn is run in workerthread.") + .push_front(job as _); - tracing::trace!("leaked heapjob"); - }); + tracing::trace!("leaked heapjob"); } pub fn spawn_future(&'scope self, future: F) -> async_task::Task @@ -259,7 +258,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { unsafe fn from_context(context: Arc) -> Self { Self { context, - job_counter: CountLatch::new(WakeLatch::new(0)), + job_counter: CountLatch::new(HeartbeatLatch::new()), panic: AtomicPtr::new(ptr::null_mut()), _scope: PhantomData, _env: PhantomData, @@ -291,7 +290,6 @@ mod tests { } #[test] - #[traced_test] fn join() { let pool = ThreadPool::new_with_threads(1); diff --git a/distaff/src/threadpool.rs b/distaff/src/threadpool.rs index 89c5223..86d2af0 100644 --- a/distaff/src/threadpool.rs +++ b/distaff/src/threadpool.rs @@ -53,14 +53,18 @@ impl ThreadPool { #[cfg(test)] mod tests { + use tracing_test::traced_test; + use super::*; #[test] + #[traced_test] fn spawn_borrow() { let pool = ThreadPool::new_with_threads(1); let mut x = 0; pool.scope(|scope| { scope.spawn(|_| { + tracing::info!("Incrementing x"); x += 1; }); }); diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index edec7eb..b497ac3 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -22,6 +22,13 @@ pub struct WorkerThread { pub(crate) join_count: Cell, } +impl Drop for WorkerThread { + fn drop(&mut self) { + // remove the current worker thread from the heartbeat list + self.context.shared().remove_heartbeat(self.index); + } +} + thread_local! { static WORKER: UnsafeCell>> = const { UnsafeCell::new(None) }; } @@ -65,49 +72,38 @@ impl WorkerThread { fn run_inner(&self) { let mut job = self.context.shared().pop_job(); 'outer: loop { - let mut guard = loop { - if let Some(job) = job.take() { - self.execute(job); + while let Some(j) = job { + self.execute(j); + + let mut guard = self.context.shared(); + if guard.should_exit() { + // if the context is stopped, break out of the outer loop which + // will exit the thread. + break 'outer; } // we executed the shared job, now we want to check for any // local jobs which this job might have spawned. - let next = self - .pop_front() - .map(|job| (Some(job), None)) - .unwrap_or_else(|| { - let mut guard = self.context.shared(); - (guard.pop_job(), Some(guard)) - }); + job = self.pop_front().or_else(|| guard.pop_job()); + } - match next { - // no job, but guard => check if we should exit - (None, Some(guard)) => { - tracing::trace!("worker: no local job, waiting for shared job"); - - if guard.should_exit() { - // if the context is stopped, break out of the outer loop which - // will exit the thread. - break 'outer; - } - - // no local jobs, wait for shared job - break guard; + // no more jobs, wait to be notified of a new job or a heartbeat. + match self.heartbeat.latch.wait_and_reset() { + crate::latch::WakeResult::Wake => { + let mut guard = self.context.shared(); + if guard.should_exit() { + break 'outer; } - // some job => drop guard, continue inner loop - (Some(next), _) => { - tracing::trace!("worker: executing job: {:?}", next); - job = Some(next); - continue; - } - // no job, no guard ought to be unreachable. - _ => unreachable!(), + + job = guard.pop_job(); } - }; - - self.context.shared_job.wait(&mut guard); - // a job was shared and we were notified, so we want to execute that job before any possible local jobs. - job = guard.pop_job(); + crate::latch::WakeResult::Heartbeat => { + self.tick(); + } + crate::latch::WakeResult::Set => { + panic!("this thread shouldn't be woken by a finished job") + } + } } } } @@ -138,11 +134,9 @@ impl WorkerThread { job.as_ref().set_pending(); } guard.jobs.insert(self.index, job); - self.context.notify_shared_job(); + guard.notify_job_shared(); } } - - self.heartbeat.clear(); } } @@ -236,9 +230,7 @@ impl HeartbeatThread { b.upgrade() .inspect(|heartbeat| { if n == i { - if heartbeat.set_pending() { - heartbeat.latch.set(); - } + heartbeat.latch.signal_heartbeat(); } n += 1; }) @@ -267,60 +259,97 @@ impl HeartbeatThread { impl WorkerThread { #[cold] fn wait_until_latch_cold(&self, latch: &CoreLatch) { - // does this optimise? - assert!(!latch.probe()); - 'outer: while !latch.probe() { // process local jobs before locking shared context while let Some(job) = self.pop_front() { + tracing::trace!("thread {:?} executing local job: {:?}", self.index, job); unsafe { job.as_ref().set_pending(); } - self.execute(job); + Job::execute(job); + tracing::trace!("thread {:?} finished local job: {:?}", self.index, job); } // take a shared job, if it exists - if let Some(shared_job) = self.context.shared().jobs.remove(&self.index) { - self.execute(shared_job); - } + 'inner: loop { + if let Some(shared_job) = self.context.shared().jobs.remove(&self.index) { + tracing::trace!( + "thread {:?} executing shared job: {:?}", + self.index, + shared_job + ); + Job::execute(shared_job); + } - while !latch.probe() { - let job = { - let mut guard = self.context.shared(); - guard.jobs.remove(&self.index).or_else(|| guard.pop_job()) - }; + while !latch.probe() { + tracing::trace!("thread {:?} looking for shared jobs", self.index); - match job { - Some(job) => { - self.execute(job); + let job = { + let mut guard = self.context.shared(); + guard.jobs.remove(&self.index).or_else(|| guard.pop_job()) + }; - continue 'outer; - } - None => { - // TODO: wait on latch? if we have something that can - // signal being done, e.g. can be waited on instead of - // shared jobs, we should wait on it instead, but we - // would also want to receive shared jobs still? - // Spin? probably just wastes CPU time. - // self.context.shared_job.wait(&mut guard); - // if spin.spin() { - // // wait for more shared jobs. - // // self.context.shared_job.wait(&mut guard); - // return; - // } - // Yield? same as spinning, really, so just exit and let the upstream use wait - // std::thread::yield_now(); + match job { + Some(job) => { + tracing::trace!("thread {:?} found job: {:?}", self.index, job); + Job::execute(job); - tracing::trace!("thread {:?} is sleeping", self.index); + continue 'outer; + } + None => { + // TODO: wait on latch? if we have something that can + // signal being done, e.g. can be waited on instead of + // shared jobs, we should wait on it instead, but we + // would also want to receive shared jobs still? + // Spin? probably just wastes CPU time. + // self.context.shared_job.wait(&mut guard); + // if spin.spin() { + // // wait for more shared jobs. + // // self.context.shared_job.wait(&mut guard); + // return; + // } + // Yield? same as spinning, really, so just exit and let the upstream use wait + // std::thread::yield_now(); - latch.set_sleeping(); - self.heartbeat.latch.wait_and_reset(); - // since we were sleeping, the shared job can't be populated, - // so resuming the inner loop is fine. + tracing::trace!("thread {:?} is sleeping", self.index); + + match self.heartbeat.latch.wait_and_reset() { + // why were we woken up? + // 1. the heartbeat thread ticked and set the + // latch, so we should see if we have any work + // to share. + // 2. a job was shared and we were notified, so + // we should execute it. + // 3. the job we were waiting on was completed, + // so we should return it. + crate::latch::WakeResult::Set => { + break 'outer; // we were woken up by a job being set, so we should exit the loop. + } + crate::latch::WakeResult::Wake => { + // skip checking for local jobs, since we + // were woken up to check for shared jobs. + continue 'inner; + } + crate::latch::WakeResult::Heartbeat => { + self.tick(); + continue 'outer; + } + } + // since we were sleeping, the shared job can't be populated, + // so resuming the inner loop is fine. + } } } + + break; } } + + tracing::trace!( + "thread {:?} finished waiting on latch {:?}", + self.index, + latch + ); return; } @@ -335,7 +364,7 @@ impl WorkerThread { } else { // this isn't the job we are looking for, but we still need to // execute it - self.execute(shared_job); + Job::execute(shared_job); } } @@ -353,6 +382,7 @@ impl WorkerThread { { let latch = latch.as_core_latch(); if !latch.probe() { + tracing::trace!("thread {:?} waiting on latch {:?}", self.index, latch); self.wait_until_latch_cold(latch) } }