From 38ce1de3acb11139a79a8675688c30e3fcb3ddfa Mon Sep 17 00:00:00 2001 From: Janis Date: Tue, 1 Jul 2025 02:04:25 +0200 Subject: [PATCH] like.. it doesn't appear to deadlock anymore? --- distaff/Cargo.toml | 4 +- distaff/src/channel.rs | 208 +++++++++++++++++++++ distaff/src/context.rs | 64 +++---- distaff/src/heartbeat.rs | 26 +-- distaff/src/job.rs | 349 ++++++++++-------------------------- distaff/src/join.rs | 22 ++- distaff/src/latch.rs | 76 +------- distaff/src/lib.rs | 3 + distaff/src/metrics.rs | 12 ++ distaff/src/scope.rs | 44 ++--- distaff/src/workerthread.rs | 143 ++++++--------- 11 files changed, 459 insertions(+), 492 deletions(-) create mode 100644 distaff/src/channel.rs create mode 100644 distaff/src/metrics.rs diff --git a/distaff/Cargo.toml b/distaff/Cargo.toml index eb54ac8..08bb343 100644 --- a/distaff/Cargo.toml +++ b/distaff/Cargo.toml @@ -4,11 +4,13 @@ version = "0.1.0" edition = "2024" [features] -default = [] +default = ["metrics"] std = [] +metrics = [] [dependencies] parking_lot = {version = "0.12.3"} +atomic-wait = "1.1.0" tracing = "0.1.40" parking_lot_core = "0.9.10" crossbeam-utils = "0.8.21" diff --git a/distaff/src/channel.rs b/distaff/src/channel.rs new file mode 100644 index 0000000..a6e74c8 --- /dev/null +++ b/distaff/src/channel.rs @@ -0,0 +1,208 @@ +use std::{ + cell::UnsafeCell, + ptr::NonNull, + sync::{ + Arc, + atomic::{AtomicU8, AtomicU32, Ordering}, + }, + thread, +}; + +enum State { + Pending, + Waiting, + Ready, + Taken, +} + +#[derive(Debug)] +#[repr(transparent)] +pub struct Parker { + mutex: AtomicU32, +} + +impl Parker { + const PARKED: u32 = u32::MAX; + const EMPTY: u32 = 0; + const NOTIFIED: u32 = 1; + pub fn new() -> Self { + Self { + mutex: AtomicU32::new(Self::EMPTY), + } + } + + pub fn is_parked(&self) -> bool { + self.mutex.load(Ordering::Acquire) == Self::PARKED + } + + pub fn park(&self) { + if self.mutex.fetch_sub(1, Ordering::Acquire) == Self::NOTIFIED { + // The thread was notified, so we can return immediately. + return; + } + + loop { + atomic_wait::wait(&self.mutex, Self::PARKED); + + // We check whether we were notified or woke up spuriously with + // acquire ordering in order to make-visible any writes made by the + // thread that notified us. + if self.mutex.swap(Self::EMPTY, Ordering::Acquire) == Self::NOTIFIED { + // The thread was notified, so we can return immediately. + return; + } else { + // spurious wakeup, so we need to re-park. + continue; + } + } + } + + pub fn unpark(&self) { + // write with Release ordering to ensure that any writes made by this + // thread are made-available to the unparked thread. + if self.mutex.swap(Self::NOTIFIED, Ordering::Release) == Self::PARKED { + // The thread was parked, so we need to notify it. + atomic_wait::wake_one(&self.mutex); + } else { + // The thread was not parked, so we don't need to do anything. + } + } +} + +#[derive(Debug)] +#[repr(C)] +struct Channel { + state: AtomicU8, + /// Can only be written only by the `Receiver` and read by the `Sender` if + /// `state` is `State::Waiting`. + waiting_thread: NonNull, + /// Can only be written only by the `Sender` and read by the `Receiver` if + /// `state` is `State::Ready`. + val: UnsafeCell>>>, +} + +impl Channel { + fn new(waiting_thread: NonNull) -> Self { + Self { + state: AtomicU8::new(State::Pending as u8), + waiting_thread, + val: UnsafeCell::new(None), + } + } +} + +#[derive(Debug)] +pub struct Receiver(Arc>); + +impl Receiver { + pub fn is_empty(&self) -> bool { + self.0.state.load(Ordering::Acquire) != State::Ready as u8 + } + + pub fn wait(&self) { + match self.0.state.compare_exchange( + State::Pending as u8, + State::Waiting as u8, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // SAFETY: + // The `waiting_thread` is set to the current thread's parker + // before we park it. + unsafe { + let thread = self.0.waiting_thread.as_ref(); + thread.park(); + } + } + Err(state) if state == State::Ready as u8 => { + // The channel is ready, so we can return immediately. + return; + } + _ => { + panic!("Receiver is already waiting or consumed."); + } + } + } + + pub fn poll(&self) -> Option> { + if self + .0 + .state + .compare_exchange( + State::Ready as u8, + State::Taken as u8, + Ordering::AcqRel, + Ordering::Acquire, + ) + .is_ok() + { + unsafe { Some(self.take()) } + } else { + None + } + } + + pub fn recv(self) -> thread::Result { + if self + .0 + .state + .compare_exchange( + State::Pending as u8, + State::Waiting as u8, + Ordering::AcqRel, + Ordering::Acquire, + ) + .is_ok() + { + unsafe { + let thread = self.0.waiting_thread.as_ref(); + thread.park(); + } + } + + // SAFETY: + // To arrive here, either `state` is `State::Ready` or the above + // `compare_exchange` succeeded, the thread was parked and then + // unparked by the `Sender` *after* the `state` was set to + // `State::Ready`. + // + // In either case, this thread now has unique access to `val`. + unsafe { self.take() } + } + + unsafe fn take(&self) -> thread::Result { + unsafe { (*self.0.val.get()).take().map(|b| *b).unwrap() } + } +} + +#[derive(Debug)] +#[repr(transparent)] +pub struct Sender(Arc>); + +impl Sender { + pub fn send(self, val: thread::Result) { + // SAFETY: + // Only this thread can write to `val` and none can read it + // yet. + unsafe { + *self.0.val.get() = Some(Box::new(val)); + } + + if self.0.state.swap(State::Ready as u8, Ordering::AcqRel) == State::Waiting as u8 { + // SAFETY: + // A `Receiver` already wrote its thread to `waiting_thread` + // *before* setting the `state` to `State::Waiting`. + unsafe { + let thread = self.0.waiting_thread.as_ref(); + thread.unpark(); + } + } + } +} + +pub fn channel(thread: NonNull) -> (Sender, Receiver) { + let channel = Arc::new(Channel::new(thread)); + + (Sender(channel.clone()), Receiver(channel)) +} diff --git a/distaff/src/context.rs b/distaff/src/context.rs index 9e93ce6..28fc791 100644 --- a/distaff/src/context.rs +++ b/distaff/src/context.rs @@ -1,5 +1,5 @@ use std::{ - ptr::{self, NonNull}, + ptr::NonNull, sync::{ Arc, OnceLock, atomic::{AtomicBool, Ordering}, @@ -13,9 +13,10 @@ use crossbeam_utils::CachePadded; use parking_lot::{Condvar, Mutex}; use crate::{ + channel::{Parker, Sender}, heartbeat::HeartbeatList, - job::{HeapJob, JobSender, QueuedJob as Job, StackJob}, - latch::{AsCoreLatch, MutexLatch, NopLatch, WorkerLatch}, + job::{HeapJob, Job2 as Job, SharedJob, StackJob}, + latch::{AsCoreLatch, MutexLatch, NopLatch}, util::DropGuard, workerthread::{HeartbeatThread, WorkerThread}, }; @@ -50,14 +51,14 @@ pub struct Context { } pub(crate) struct Shared { - pub jobs: BTreeMap>, - injected_jobs: Vec>, + pub jobs: BTreeMap, + injected_jobs: Vec, } unsafe impl Send for Shared {} impl Shared { - pub fn pop_job(&mut self) -> Option> { + pub fn pop_job(&mut self) -> Option { // this is unlikely, so make the function cold? // TODO: profile this if !self.injected_jobs.is_empty() { @@ -69,7 +70,7 @@ impl Shared { } #[cold] - unsafe fn pop_injected_job(&mut self) -> NonNull { + unsafe fn pop_injected_job(&mut self) -> SharedJob { self.injected_jobs.pop().unwrap() } } @@ -146,7 +147,7 @@ impl Context { GLOBAL_CONTEXT.get_or_init(|| Self::new()) } - pub fn inject_job(&self, job: NonNull) { + pub fn inject_job(&self, job: SharedJob) { let mut shared = self.shared.lock(); shared.injected_jobs.push(job); @@ -190,11 +191,11 @@ impl Context { NopLatch, ); - let job = Job::from_stackjob(&job, worker.heartbeat.raw_latch()); + let job = Job::from_stackjob(&job); - self.inject_job(Into::into(&job)); + self.inject_job(job.share(Some(worker.heartbeat.parker()))); - let t = worker.wait_until_queued_job(&job).unwrap(); + let t = worker.wait_until_shared_job(&job).unwrap(); crate::util::unwrap_or_panic(t) } @@ -206,7 +207,7 @@ impl Context { T: Send, { // current thread isn't a worker thread, create job and inject into context - let latch = WorkerLatch::new(); + let parker = Parker::new(); let job = StackJob::new( move || { @@ -218,12 +219,13 @@ impl Context { NopLatch, ); - let job = Job::from_stackjob(&job, &raw const latch); + let job = Job::from_stackjob(&job); - self.inject_job(Into::into(&job)); - let recv = unsafe { job.as_receiver::() }; + self.inject_job(job.share(Some(&parker))); - crate::util::unwrap_or_panic(latch.wait_until(|| recv.poll())) + let recv = job.take_receiver().unwrap(); + + crate::util::unwrap_or_panic(recv.recv()) } /// Run closure in this context. @@ -262,9 +264,9 @@ impl Context { where F: FnOnce() + Send + 'static, { - let job = Job::from_heapjob(Box::new(HeapJob::new(f)), ptr::null()); + let job = Job::from_heapjob(Box::new(HeapJob::new(f))); tracing::trace!("Context::spawn: spawning job: {:?}", job); - self.inject_job(job); + self.inject_job(job.share(None)); } pub fn spawn_future(self: &Arc, future: F) -> async_task::Task @@ -274,24 +276,16 @@ impl Context { { let schedule = move |runnable: Runnable| { #[align(8)] - unsafe fn harness(this: *const (), job: *const JobSender, _: *const WorkerLatch) { + unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option) { unsafe { - let runnable = - Runnable::<()>::from_raw(NonNull::new_unchecked(this.cast_mut())); + let runnable = Runnable::<()>::from_raw(this); runnable.run(); - - // SAFETY: job was turned into raw - drop(Box::from_raw(job.cast::>().cast_mut())); } } - let job = Box::into_non_null(Box::new(Job::from_harness( - harness::, - runnable.into_raw(), - ptr::null(), - ))); + let job = Job::::from_harness(harness::, runnable.into_raw()); - self.inject_job(job); + self.inject_job(job.share(None)); }; let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; @@ -386,7 +380,7 @@ mod tests { let ctx = Context::new_with_threads(1); let counter = Arc::new(AtomicU8::new(0)); - let waker = WorkerLatch::new(); + let parker = Parker::new(); let job = StackJob::new( { @@ -401,7 +395,7 @@ mod tests { NopLatch, ); - let job = Job::from_stackjob(&job, &raw const waker); + let job = Job::from_stackjob(&job); // wait for the worker to sleep std::thread::sleep(std::time::Duration::from_millis(100)); @@ -414,11 +408,11 @@ mod tests { assert!(heartbeat.is_waiting()); }); - ctx.inject_job(Into::into(&job)); + ctx.inject_job(job.share(Some(&parker))); // Wait for the job to be executed - let recv = unsafe { job.as_receiver::() }; - let result = waker.wait_until(|| recv.poll()); + let recv = job.take_receiver().unwrap(); + let result = recv.recv(); let result = crate::util::unwrap_or_panic(result); assert_eq!(result, 42); assert_eq!(counter.load(Ordering::SeqCst), 1); diff --git a/distaff/src/heartbeat.rs b/distaff/src/heartbeat.rs index 733f0c0..0c83202 100644 --- a/distaff/src/heartbeat.rs +++ b/distaff/src/heartbeat.rs @@ -12,7 +12,7 @@ use std::{ use parking_lot::Mutex; -use crate::latch::WorkerLatch; +use crate::{channel::Parker, latch::WorkerLatch}; #[derive(Debug, Clone)] pub struct HeartbeatList { @@ -125,13 +125,13 @@ impl Drop for OwnedHeartbeatReceiver { #[derive(Debug)] pub struct Heartbeat { - ptr: NonNull<(AtomicBool, WorkerLatch)>, + ptr: NonNull<(AtomicBool, Parker)>, i: u64, } #[derive(Debug)] pub struct HeartbeatReceiver { - ptr: NonNull<(AtomicBool, WorkerLatch)>, + ptr: NonNull<(AtomicBool, Parker)>, i: u64, } @@ -149,7 +149,7 @@ impl Drop for Heartbeat { #[derive(Debug)] pub struct HeartbeatSender { - ptr: NonNull<(AtomicBool, WorkerLatch)>, + ptr: NonNull<(AtomicBool, Parker)>, pub last_heartbeat: Instant, } @@ -161,7 +161,7 @@ impl Heartbeat { // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. let ptr = NonNull::new(Box::into_raw(Box::new(( AtomicBool::new(true), - WorkerLatch::new(), + Parker::new(), )))) .unwrap(); Self { ptr, i } @@ -200,15 +200,7 @@ impl HeartbeatReceiver { } } - pub fn wait(&self) { - unsafe { self.ptr.as_ref().1.wait() }; - } - - pub fn raw_latch(&self) -> *const WorkerLatch { - unsafe { &raw const self.ptr.as_ref().1 } - } - - pub fn latch(&self) -> &WorkerLatch { + pub fn parker(&self) -> &Parker { unsafe { &self.ptr.as_ref().1 } } @@ -226,13 +218,13 @@ impl HeartbeatSender { // SAFETY: // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. unsafe { self.ptr.as_ref().0.store(true, Ordering::Relaxed) }; - self.last_heartbeat = Instant::now(); + // self.last_heartbeat = Instant::now(); } pub fn is_waiting(&self) -> bool { - unsafe { self.ptr.as_ref().1.is_waiting() } + unsafe { self.ptr.as_ref().1.is_parked() } } pub fn wake(&self) { - unsafe { self.ptr.as_ref().1.wake() }; + unsafe { self.ptr.as_ref().1.unpark() }; } } diff --git a/distaff/src/job.rs b/distaff/src/job.rs index 84b0aa5..e9dab83 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -20,6 +20,8 @@ use parking_lot::{Condvar, Mutex}; use parking_lot_core::SpinWait; use crate::{ + WorkerThread, + channel::{Parker, Sender}, latch::{Probe, WorkerLatch}, util::{DropGuard, SmallBox, TaggedAtomicPtr}, }; @@ -990,234 +992,107 @@ mod tests { } } -// A job, whether a `StackJob` or `HeapJob`, is turned into a `QueuedJob` when it is pushed to the job queue. -#[repr(C)] -pub struct QueuedJob { - /// The job's harness and state. - harness: TaggedAtomicPtr, - // This is later invalidated by the Receiver/Sender, so it must be wrapped in a `MaybeUninit`. - // I'm not sure if it also must be inside of an `UnsafeCell`.. - inner: Cell>, -} - -impl Debug for QueuedJob { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("QueuedJob") - .field("harness", &self.harness) - .field("inner", unsafe { - (&*self.inner.as_ptr()).assume_init_ref() - }) - .finish() - } -} +type JobHarness = + unsafe fn(&WorkerThread, this: NonNull<()>, sender: Option); #[repr(C)] -#[derive(Debug, Copy, Clone)] -struct QueueJobInner { - /// The job's value or `this` pointer. This is either a `StackJob` or `HeapJob`. +pub struct Job2 { + harness: JobHarness, this: NonNull<()>, - /// The mutex to wake when the job is finished executing. - mutex: *const WorkerLatch, + receiver: Cell>>, } -/// A union that allows us to store either a `T` or a `U` without needing to know which one it is at runtime. -/// The state must be tracked separately. -union UnsafeVariant { - t: ManuallyDrop, - u: ManuallyDrop, -} - -// The processed job is the result of executing a job, it contains the result of the job or an error. -#[repr(C)] -struct JobChannel { - tag: TaggedAtomicPtr, - value: UnsafeCell, Box>>>, -} - -#[repr(transparent)] -pub struct JobSender { - channel: JobChannel, -} -#[repr(transparent)] -pub struct JobReceiver { - channel: JobChannel, -} - -#[repr(C)] -struct Job2 {} - -const EMPTY: usize = 0; -const SHARED: usize = 1 << 2; -const FINISHED: usize = 1 << 0; -const ERROR: usize = 1 << 1; - -impl JobSender { - #[tracing::instrument(level = "trace", skip_all)] - pub fn send(&self, result: std::thread::Result, mutex: *const WorkerLatch) { - tracing::trace!("sending job ({:?}) result", &raw const *self); - // We want to lock here so that we can be sure that we wake the worker - // only if it was waiting, and not immediately after having received the - // result and waiting for further work: - // | thread 1 | thread 2 | - // | | | | | - // | send-> | | | - // | FINISHED | | | - // | | | poll() | - // | | | sleep() | - // | wake() | | - // | | | !woken! | // the worker has already received the result - // | | | | | // and is waiting for more work, it shouldn't - // | | | | | // be woken up here. - // | <-send | | | - // - // if we lock, it looks like this: - // | thread 1 | thread 2 | - // | | | | | - // | send-> | | | - // | lock() | | | - // | FINISHED | | | - // | | | lock()-> | // thread 2 tries to lock. - // | wake() | | // the wake signal is ignored - // | | | | - // | unlock() | | - // | | | l=lock() | // thread2 wakes up and receives the lock - // | | | poll() | - // | <-send | sleep(l) | // thread 2 is now sleeping - // - // This concludes my TED talk on why we need to lock here. - - let _guard = unsafe { mutex.as_ref() }.map(|mutex| { - let guard = mutex.lock(); - DropGuard::new(move || { - // // SAFETY: we forget the guard here so we no longer borrow the mutex. - // mem::forget(guard); - _ = guard; - mutex.wake(); - // // SAFETY: we can safely unlock the mutex here, as we are the only ones holding it. - // mutex.force_unlock(); - }) - }); - - assert!(self.channel.tag.tag(Ordering::Acquire) & FINISHED == 0); - - match result { - Ok(value) => { - let slot = unsafe { &mut *self.channel.value.get() }; - - slot.write(UnsafeVariant { - t: ManuallyDrop::new(SmallBox::new(value)), - }); - - self.channel.tag.fetch_or_tag(FINISHED, Ordering::Release); - } - Err(payload) => { - let slot = unsafe { &mut *self.channel.value.get() }; - - slot.write(UnsafeVariant { - u: ManuallyDrop::new(payload), - }); - - self.channel - .tag - .fetch_or_tag(FINISHED | ERROR, Ordering::Release); - } - } - - // wake the worker waiting on the mutex and drop the guard +impl Debug for Job2 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Job2") + .field("harness", &self.harness) + .field("this", &self.this) + .finish_non_exhaustive() } } -impl JobReceiver { - #[tracing::instrument(level = "trace", skip_all)] - pub fn is_finished(&self) -> bool { - self.channel.tag.tag(Ordering::Acquire) & FINISHED != 0 - } - - #[tracing::instrument(level = "trace", skip_all)] - pub fn poll(&self) -> Option> { - let tag = self.channel.tag.take_tag(Ordering::Acquire); - - if tag & FINISHED == 0 { - return None; - } - - tracing::trace!("received job ({:?}) result", &raw const *self); - - // SAFETY: if we received a non-EMPTY tag, the value must be initialized. - // because we atomically set the taag to EMPTY, we can be sure that we're the only ones accessing the value. - let slot = unsafe { (&mut *self.channel.value.get()).assume_init_mut() }; - - if tag & ERROR != 0 { - // job failed, return the error - let err = unsafe { ManuallyDrop::take(&mut slot.u) }; - Some(Err(err)) - } else { - // job succeeded, return the value - let value = unsafe { ManuallyDrop::take(&mut slot.t) }; - Some(Ok(value.into_inner())) - } - } +#[derive(Debug)] +pub struct SharedJob { + harness: JobHarness, + this: NonNull<()>, + sender: Option, } -impl QueuedJob { - fn new( - harness: TaggedAtomicPtr, - this: NonNull<()>, - mutex: *const WorkerLatch, - ) -> Self { +impl Job2 { + fn new(harness: JobHarness, this: NonNull<()>) -> Self { let this = Self { harness, - inner: Cell::new(MaybeUninit::new(QueueJobInner { this, mutex })), + this, + receiver: Cell::new(None), }; - tracing::trace!("new queued job: {:?}", this); + tracing::trace!("new job: {:?}", this); this } - pub fn from_stackjob(job: &StackJob, mutex: *const WorkerLatch) -> Self + + pub fn share(&self, parker: Option<&Parker>) -> SharedJob { + tracing::trace!("sharing job: {:?}", self); + + let (sender, receiver) = parker + .map(|parker| crate::channel::channel::(parker.into())) + .unzip(); + + self.receiver.set(receiver); + + SharedJob { + harness: self.harness, + this: self.this, + sender: unsafe { mem::transmute(sender) }, + } + } + + pub fn take_receiver(&self) -> Option> { + self.receiver.take() + } + + pub fn from_stackjob(job: &StackJob) -> Self where F: FnOnce() -> T + Send, - T: Send, { #[align(8)] #[tracing::instrument(level = "trace", skip_all, name = "stack_job_harness")] unsafe fn harness( - this: *const (), - sender: *const JobSender, - mutex: *const WorkerLatch, + _worker: &WorkerThread, + this: NonNull<()>, + sender: Option, ) where F: FnOnce() -> T + Send, T: Send, { use std::panic::{AssertUnwindSafe, catch_unwind}; - let f = unsafe { (*this.cast::>()).unwrap() }; - let result = catch_unwind(AssertUnwindSafe(|| f())); + let f = unsafe { this.cast::>().as_ref().unwrap() }; + let sender: Sender = unsafe { mem::transmute(sender) }; - unsafe { - (&*(sender as *const JobSender)).send(result, mutex); - } + // #[cfg(feature = "metrics")] + // if worker.heartbeat.parker() == mutex { + // worker + // .metrics + // .num_sent_to_self + // .fetch_add(1, Ordering::Relaxed); + // tracing::trace!("job sent to self"); + // } + + sender.send(catch_unwind(AssertUnwindSafe(|| f()))); } - Self::new( - TaggedAtomicPtr::new(harness:: as *mut usize, EMPTY), - unsafe { NonNull::new_unchecked(job as *const _ as *mut ()) }, - mutex, - ) + Self::new(harness::, NonNull::from(job).cast()) } - pub fn from_heapjob(job: Box>, mutex: *const WorkerLatch) -> NonNull + pub fn from_heapjob(job: Box>) -> Self where F: FnOnce() -> T + Send, - T: Send, { #[align(8)] #[tracing::instrument(level = "trace", skip_all, name = "heap_job_harness")] - unsafe fn harness( - this: *const (), - sender: *const JobSender, - mutex: *const WorkerLatch, - ) where + unsafe fn harness(_worker: &WorkerThread, this: NonNull<()>, sender: Option) + where F: FnOnce() -> T + Send, T: Send, { @@ -1226,98 +1101,62 @@ impl QueuedJob { // expect MIRI to complain about this, but it is actually correct. // because I am so much smarter than MIRI, naturally, obviously. // unbox the job, which was allocated at (2) - let f = unsafe { (*Box::from_raw(this.cast::>().cast_mut())).into_inner() }; + let f = unsafe { (*Box::from_non_null(this.cast::>())).into_inner() }; + let result = catch_unwind(AssertUnwindSafe(|| f())); - - unsafe { - (&*(sender as *const JobSender)).send(result, mutex); + let sender: Option> = unsafe { mem::transmute(sender) }; + if let Some(sender) = sender { + sender.send(result); } - - // drop the job, which was allocated at (1) - _ = unsafe { Box::>::from_raw(sender as *mut _) }; } // (1) allocate box for job - Box::into_non_null(Box::new(Self::new( - TaggedAtomicPtr::new(harness:: as *mut usize, EMPTY), - // (2) convert job into a pointer - unsafe { NonNull::new_unchecked(Box::into_raw(job) as *mut ()) }, - mutex, - ))) - } - - pub fn from_harness( - harness: unsafe fn(*const (), *const JobSender, *const WorkerLatch), - this: NonNull<()>, - mutex: *const WorkerLatch, - ) -> Self { Self::new( - TaggedAtomicPtr::new(harness as *mut usize, EMPTY), - this, - mutex, + harness::, + // (2) convert job into a pointer + Box::into_non_null(job).cast(), ) } - pub fn set_shared(&self) { - self.harness.fetch_or_tag(SHARED, Ordering::Relaxed); + pub fn from_harness(harness: JobHarness, this: NonNull<()>) -> Self { + Self::new(harness, this) } pub fn is_shared(&self) -> bool { - self.harness.tag(Ordering::Relaxed) & SHARED != 0 + unsafe { (&*self.receiver.as_ptr()).is_some() } } +} - pub unsafe fn as_receiver(&self) -> &JobReceiver { - unsafe { mem::transmute::<&QueuedJob, &JobReceiver>(self) } - } +impl SharedJob { + pub unsafe fn execute(self, worker: &WorkerThread) { + tracing::trace!("executing shared job: {:?}", self); - /// this function will drop `_self` and execute the job. - #[tracing::instrument(level = "trace", skip_all)] - pub unsafe fn execute(_self: *mut Self) { - let (harness, this, sender, mutex) = unsafe { - let job = &*_self; - tracing::debug!("executing queued job: {:?}", job); - - let harness: unsafe fn(*const (), *const JobSender, *const WorkerLatch) = - mem::transmute(job.harness.ptr(Ordering::Relaxed)); - let sender = mem::transmute::<*const Self, *const JobSender>(_self); - - let QueueJobInner { this, mutex } = - job.inner.replace(MaybeUninit::uninit()).assume_init(); - - (harness, this, sender, mutex) - }; + let Self { + harness, + this, + sender, + } = self; unsafe { - // past this point, `_self` may no longer be a valid pointer to a `QueuedJob`. - (harness)(this.as_ptr(), sender, mutex); + (harness)(worker, this, sender); } } } -impl Probe for QueuedJob { - fn probe(&self) -> bool { - self.harness.tag(Ordering::Relaxed) & FINISHED != 0 - } -} - -impl Probe for JobReceiver { - fn probe(&self) -> bool { - self.channel.tag.tag(Ordering::Relaxed) & FINISHED != 0 - } -} - pub use queuedjobqueue::JobQueue; mod queuedjobqueue { //! Basically `JobVec`, but for `QueuedJob`s. - use std::collections::VecDeque; + // TODO: use non-null's here and rely on Into/From for &T - use super::*; + use std::{collections::VecDeque, ptr::NonNull}; + + use super::Job2 as Job; #[derive(Debug)] pub struct JobQueue { - jobs: VecDeque>, + jobs: VecDeque>, } impl JobQueue { @@ -1327,21 +1166,21 @@ mod queuedjobqueue { } } - pub fn push_front(&mut self, job: *const QueuedJob) { + pub fn push_front(&mut self, job: *const Job) { self.jobs .push_front(unsafe { NonNull::new_unchecked(job as *mut _) }); } - pub fn push_back(&mut self, job: *const QueuedJob) { + pub fn push_back(&mut self, job: *const Job) { self.jobs .push_back(unsafe { NonNull::new_unchecked(job as *mut _) }); } - pub fn pop_front(&mut self) -> Option> { + pub fn pop_front(&mut self) -> Option> { self.jobs.pop_front() } - pub fn pop_back(&mut self) -> Option> { + pub fn pop_back(&mut self) -> Option> { self.jobs.pop_back() } diff --git a/distaff/src/join.rs b/distaff/src/join.rs index a9a33b2..49edaf5 100644 --- a/distaff/src/join.rs +++ b/distaff/src/join.rs @@ -1,8 +1,10 @@ +#[cfg(feature = "metrics")] +use std::sync::atomic::Ordering; use std::{hint::cold_path, sync::Arc}; use crate::{ context::Context, - job::{QueuedJob as Job, StackJob}, + job::{Job2 as Job, StackJob}, latch::NopLatch, workerthread::WorkerThread, }; @@ -71,8 +73,11 @@ impl WorkerThread { { use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind}; + #[cfg(feature = "metrics")] + self.metrics.num_joins.fetch_add(1, Ordering::Relaxed); + let a = StackJob::new(a, NopLatch); - let job = Job::from_stackjob(&a, self.heartbeat.raw_latch()); + let job = Job::from_stackjob(&a); self.push_back(&job); @@ -83,8 +88,17 @@ impl WorkerThread { Err(payload) => { tracing::debug!("join_heartbeat: b panicked, waiting for a to finish"); cold_path(); + // if b panicked, we need to wait for a to finish - self.wait_until_latch(&job); + let mut receiver = job.take_receiver(); + self.wait_until_pred(|| match &receiver { + Some(recv) => recv.poll().is_some(), + None => { + receiver = job.take_receiver(); + false + } + }); + resume_unwind(payload); } }; @@ -97,7 +111,7 @@ impl WorkerThread { // a is allowed to panic here, because we already finished b. unsafe { a.unwrap()() } } else { - match self.wait_until_queued_job(&job) { + match self.wait_until_shared_job(&job) { Some(t) => crate::util::unwrap_or_panic(t), None => { tracing::trace!( diff --git a/distaff/src/latch.rs b/distaff/src/latch.rs index 80a745c..39b9b7c 100644 --- a/distaff/src/latch.rs +++ b/distaff/src/latch.rs @@ -14,7 +14,7 @@ use std::{ use parking_lot::{Condvar, Mutex}; -use crate::{WorkerThread, context::Context}; +use crate::{WorkerThread, channel::Parker, context::Context}; pub trait Latch { unsafe fn set_raw(this: *const Self); @@ -199,21 +199,20 @@ impl Probe for NopLatch { pub struct CountLatch { count: AtomicUsize, - inner: AtomicPtr, + inner: AtomicPtr, } impl CountLatch { #[inline] - pub const fn new(inner: *const WorkerLatch) -> Self { + pub const fn new(inner: *const Parker) -> Self { Self { count: AtomicUsize::new(0), - inner: AtomicPtr::new(inner as *mut WorkerLatch), + inner: AtomicPtr::new(inner as *mut Parker), } } - pub fn set_inner(&self, inner: *const WorkerLatch) { - self.inner - .store(inner as *mut WorkerLatch, Ordering::Relaxed); + pub fn set_inner(&self, inner: *const Parker) { + self.inner.store(inner as *mut Parker, Ordering::Relaxed); } pub fn count(&self) -> usize { @@ -242,7 +241,7 @@ impl Latch for CountLatch { // If the count was 1, we need to set the inner latch. let inner = (*this).inner.load(Ordering::Relaxed); if !inner.is_null() { - (&*inner).wake(); + (&*inner).unpark(); } } } @@ -378,6 +377,7 @@ impl WorkerLatch { fn wait_internal(condvar: &Condvar, guard: &mut parking_lot::MutexGuard<'_, bool>) { **guard = true; // set the mutex to true to indicate that the worker is waiting + //condvar.wait_for(guard, std::time::Duration::from_micros(100)); condvar.wait(guard); **guard = false; } @@ -436,66 +436,6 @@ mod tests { use super::*; - #[test] - #[cfg_attr(not(miri), traced_test)] - fn worker_latch() { - let latch = Arc::new(WorkerLatch::new()); - let barrier = Arc::new(Barrier::new(2)); - let mutex = Arc::new(parking_lot::Mutex::new(false)); - - let count = Arc::new(AtomicUsize::new(0)); - - let thread = std::thread::spawn({ - let latch = latch.clone(); - let mutex = mutex.clone(); - let barrier = barrier.clone(); - let count = count.clone(); - - move || { - tracing::info!("Thread waiting on barrier"); - let mut guard = mutex.lock(); - barrier.wait(); - - tracing::info!("Thread waiting on latch"); - latch.wait(); - count.fetch_add(1, Ordering::SeqCst); - tracing::info!("Thread woke up from latch"); - barrier.wait(); - barrier.wait(); - tracing::info!("Thread finished waiting on barrier"); - count.fetch_add(1, Ordering::SeqCst); - } - }); - - assert!(!latch.is_waiting(), "Latch should not be waiting yet"); - barrier.wait(); - tracing::info!("Main thread finished waiting on barrier"); - // lock mutex and notify the thread that isn't yet waiting. - { - let guard = mutex.lock(); - tracing::info!("Main thread acquired mutex, waking up thread"); - assert!(latch.is_waiting(), "Latch should be waiting now"); - - latch.wake(); - tracing::info!("Main thread woke up thread"); - } - assert_eq!(count.load(Ordering::SeqCst), 0, "Count should still be 0"); - barrier.wait(); - assert_eq!( - count.load(Ordering::SeqCst), - 1, - "Count should be 1 after waking up" - ); - barrier.wait(); - - thread.join().expect("Thread should join successfully"); - assert_eq!( - count.load(Ordering::SeqCst), - 2, - "Count should be 2 after thread has finished" - ); - } - #[test] fn test_atomic_latch() { let latch = AtomicLatch::new(); diff --git a/distaff/src/lib.rs b/distaff/src/lib.rs index c2f0afd..283ad79 100644 --- a/distaff/src/lib.rs +++ b/distaff/src/lib.rs @@ -14,11 +14,14 @@ extern crate alloc; +mod channel; mod context; mod heartbeat; mod job; mod join; mod latch; +#[cfg(feature = "metrics")] +mod metrics; mod scope; mod threadpool; pub mod util; diff --git a/distaff/src/metrics.rs b/distaff/src/metrics.rs new file mode 100644 index 0000000..a6dd561 --- /dev/null +++ b/distaff/src/metrics.rs @@ -0,0 +1,12 @@ +use std::sync::atomic::AtomicU32; + +#[derive(Debug, Default)] +pub(crate) struct WorkerMetrics { + pub(crate) num_jobs_shared: AtomicU32, + pub(crate) num_heartbeats: AtomicU32, + pub(crate) num_joins: AtomicU32, + pub(crate) num_jobs_reclaimed: AtomicU32, + pub(crate) num_jobs_executed: AtomicU32, + pub(crate) num_jobs_stolen: AtomicU32, + pub(crate) num_sent_to_self: AtomicU32, +} diff --git a/distaff/src/scope.rs b/distaff/src/scope.rs index f7d7293..f334e0e 100644 --- a/distaff/src/scope.rs +++ b/distaff/src/scope.rs @@ -11,9 +11,10 @@ use std::{ use async_task::Runnable; use crate::{ + channel::Sender, context::Context, - job::{HeapJob, JobSender, QueuedJob as Job}, - latch::{CountLatch, WorkerLatch}, + job::{HeapJob, Job2 as Job}, + latch::{CountLatch, Probe, WorkerLatch}, util::{DropGuard, SendPtr}, workerthread::WorkerThread, }; @@ -88,7 +89,7 @@ where impl<'scope, 'env> Scope<'scope, 'env> { #[tracing::instrument(level = "trace", skip_all)] fn wait_for_jobs(&self, worker: &WorkerThread) { - self.job_counter.set_inner(worker.heartbeat.raw_latch()); + self.job_counter.set_inner(worker.heartbeat.parker()); if self.job_counter.count() > 0 { tracing::trace!("waiting for {} jobs to finish.", self.job_counter.count()); tracing::trace!( @@ -98,7 +99,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { ); // set worker index in the job counter - worker.wait_until_latch(&self.job_counter); + worker.wait_until_pred(|| self.job_counter.probe()); } } @@ -173,18 +174,15 @@ impl<'scope, 'env> Scope<'scope, 'env> { let this = SendPtr::new_const(self).unwrap(); - let job = Job::from_heapjob( - Box::new(HeapJob::new(move || unsafe { - use std::panic::{AssertUnwindSafe, catch_unwind}; - if let Err(payload) = catch_unwind(AssertUnwindSafe(|| f(this.as_ref()))) { - this.as_unchecked_ref().panicked(payload); - } - this.as_unchecked_ref().job_counter.decrement(); - })), - ptr::null(), - ); + let job = Job::from_heapjob(Box::new(HeapJob::new(move || unsafe { + use std::panic::{AssertUnwindSafe, catch_unwind}; + if let Err(payload) = catch_unwind(AssertUnwindSafe(|| f(this.as_ref()))) { + this.as_unchecked_ref().panicked(payload); + } + this.as_unchecked_ref().job_counter.decrement(); + }))); - self.context.inject_job(job); + self.context.inject_job(job.share(None)); // WorkerThread::current_ref() // .expect("spawn is run in workerthread.") // .push_front(job.as_ptr()); @@ -233,25 +231,17 @@ impl<'scope, 'env> Scope<'scope, 'env> { let schedule = move |runnable: Runnable| { #[align(8)] - unsafe fn harness(this: *const (), job: *const JobSender, _: *const WorkerLatch) { + unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option) { unsafe { - let runnable = - Runnable::<()>::from_raw(NonNull::new_unchecked(this.cast_mut())); + let runnable = Runnable::<()>::from_raw(this.cast()); runnable.run(); - - // SAFETY: job was turned into raw - drop(Box::from_raw(job.cast_mut())); } } - let job = Box::into_non_null(Box::new(Job::from_harness( - harness, - runnable.into_raw(), - ptr::null(), - ))); + let job = Job::<()>::from_harness(harness, runnable.into_raw()); // casting into Job<()> here - self.context.inject_job(job); + self.context.inject_job(job.share(None)); // WorkerThread::current_ref() // .expect("spawn_async_internal is run in workerthread.") // .push_front(job); diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index 1cef6f6..c3b12c3 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -1,6 +1,7 @@ +#[cfg(feature = "metrics")] +use std::sync::atomic::Ordering; use std::{ cell::{Cell, UnsafeCell}, - hint::cold_path, ptr::NonNull, sync::{Arc, Barrier}, time::Duration, @@ -9,10 +10,10 @@ use std::{ use crossbeam_utils::CachePadded; use crate::{ - context::{Context, Heartbeat}, + context::Context, heartbeat::OwnedHeartbeatReceiver, - job::{JobQueue as JobList, JobResult, QueuedJob as Job, QueuedJob, StackJob}, - latch::{AsCoreLatch, CoreLatch, Probe, WorkerLatch}, + job::{Job2 as Job, JobQueue as JobList, SharedJob}, + latch::Probe, util::DropGuard, }; @@ -21,6 +22,8 @@ pub struct WorkerThread { pub(crate) queue: UnsafeCell, pub(crate) heartbeat: OwnedHeartbeatReceiver, pub(crate) join_count: Cell, + #[cfg(feature = "metrics")] + pub(crate) metrics: CachePadded, } thread_local! { @@ -36,6 +39,8 @@ impl WorkerThread { queue: UnsafeCell::new(JobList::new()), heartbeat, join_count: Cell::new(0), + #[cfg(feature = "metrics")] + metrics: CachePadded::new(crate::metrics::WorkerMetrics::default()), } } } @@ -63,6 +68,11 @@ impl WorkerThread { (&*this).run_inner(); } + #[cfg(feature = "metrics")] + unsafe { + eprintln!("{:?}", (&*this).metrics); + } + tracing::trace!("WorkerThread::run: worker thread finished"); } @@ -89,33 +99,29 @@ impl WorkerThread { } impl WorkerThread { - pub(crate) fn find_work(&self) -> Option> { - self.find_work_inner() - } - /// Looks for work in the local queue, then in the shared context, and if no /// work is found, waits for the thread to be notified of a new job, after /// which it returns `None`. /// The caller should then check for `should_exit` to determine if the /// thread should exit, or look for work again. #[tracing::instrument(level = "trace", skip_all)] - pub(crate) fn find_work_or_wait(&self) -> Option> { - if let Some(job) = self.find_work_inner() { + pub(crate) fn find_work_or_wait(&self) -> Option { + if let Some(job) = self.find_work() { return Some(job); } tracing::trace!("waiting for new job"); - self.heartbeat.latch().wait(); + self.heartbeat.parker().park(); tracing::trace!("woken up from wait"); None } #[tracing::instrument(level = "trace", skip_all)] - pub(crate) fn find_work_or_wait_unless(&self, pred: F) -> Option> + pub(crate) fn find_work_or_wait_unless(&self, mut pred: F) -> Option where F: FnMut() -> bool, { - if let Some(job) = self.find_work_inner() { + if let Some(job) = self.find_work() { return Some(job); } // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! @@ -125,24 +131,21 @@ impl WorkerThread { // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // no jobs found, wait for a heartbeat or a new job tracing::trace!(worker = self.heartbeat.index(), "waiting for new job"); - self.heartbeat.latch().wait_unless(pred); + if !pred() { + self.heartbeat.parker().park(); + } tracing::trace!(worker = self.heartbeat.index(), "woken up from wait"); None } #[inline] - fn find_work_inner(&self) -> Option> { - // first check the local queue for jobs - if let Some(job) = self.pop_front() { - tracing::trace!("WorkerThread::find_work_inner: found local job: {:?}", job); - return Some(job); - } - - // then check the shared context for jobs + fn find_work(&self) -> Option { let mut guard = self.context.shared(); if let Some(job) = guard.pop_job() { + #[cfg(feature = "metrics")] + self.metrics.num_jobs_stolen.fetch_add(1, Ordering::Relaxed); tracing::trace!("WorkerThread::find_work_inner: found shared job: {:?}", job); return Some(job); } @@ -153,6 +156,8 @@ impl WorkerThread { #[inline(always)] pub(crate) fn tick(&self) { if self.heartbeat.take() { + #[cfg(feature = "metrics")] + self.metrics.num_heartbeats.fetch_add(1, Ordering::Relaxed); tracing::trace!( "received heartbeat, thread id: {:?}", self.heartbeat.index() @@ -163,8 +168,8 @@ impl WorkerThread { #[inline] #[tracing::instrument(level = "trace", skip(self))] - fn execute(&self, job: NonNull) { - unsafe { Job::execute(job.as_ptr()) }; + fn execute(&self, job: SharedJob) { + unsafe { SharedJob::execute(job, self) }; self.tick(); } @@ -174,10 +179,16 @@ impl WorkerThread { if !guard.jobs.contains_key(&self.heartbeat.id()) { if let Some(job) = self.pop_back() { - Job::set_shared(unsafe { job.as_ref() }); tracing::trace!("heartbeat: sharing job: {:?}", job); - guard.jobs.insert(self.heartbeat.id(), job); + + #[cfg(feature = "metrics")] + self.metrics.num_jobs_shared.fetch_add(1, Ordering::Relaxed); + unsafe { + guard.jobs.insert( + self.heartbeat.id(), + job.as_ref().share(Some(self.heartbeat.parker())), + ); // SAFETY: we are holding the lock on the shared context. self.context.notify_job_shared(); } @@ -193,19 +204,18 @@ impl WorkerThread { } #[inline] - pub fn push_back(&self, job: *const Job) { - unsafe { self.queue.as_mut_unchecked().push_back(job) } + pub fn push_back(&self, job: *const Job) { + unsafe { self.queue.as_mut_unchecked().push_back(job.cast()) } + } + #[inline] + pub fn push_front(&self, job: *const Job) { + unsafe { self.queue.as_mut_unchecked().push_front(job.cast()) } } #[inline] pub fn pop_front(&self) -> Option> { unsafe { self.queue.as_mut_unchecked().pop_front() } } - - #[inline] - pub fn push_front(&self, job: *const Job) { - unsafe { self.queue.as_mut_unchecked().push_front(job) } - } } impl WorkerThread { @@ -293,51 +303,15 @@ impl HeartbeatThread { impl WorkerThread { #[tracing::instrument(level = "trace", skip(self))] - pub fn wait_until_queued_job( - &self, - job: *const QueuedJob, - ) -> Option> { - let recv = unsafe { (*job).as_receiver::() }; - // we've already checked that the job was popped from the queue - // check if shared job is our job - - // skip checking if the job hasn't yet been claimed, because the - // overhead of waking a thread is so much bigger that it might never get - // the chance to actually claim it. - - // if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) { - // if core::ptr::eq(shared_job.as_ptr(), job as *const Job as _) { - // // this is the job we are looking for, so we want to - // // short-circuit and call it inline - // tracing::trace!( - // thread = self.heartbeat.index(), - // "reclaiming shared job: {:?}", - // shared_job - // ); - - // return None; - // } else { - // // this isn't the job we are looking for, but we still need to - // // execute it - // tracing::trace!( - // thread = self.heartbeat.index(), - // "executing reclaimed shared job: {:?}", - // shared_job - // ); - - // unsafe { Job::execute(shared_job.as_ptr()) }; - // } - // } + pub fn wait_until_shared_job(&self, job: &Job) -> Option> { + let recv = (*job).take_receiver()?; let mut out = recv.poll(); while std::hint::unlikely(out.is_none()) { - if let Some(job) = self.find_work_or_wait_unless(|| { - out = recv.poll(); - out.is_some() - }) { + if let Some(job) = self.find_work() { unsafe { - Job::execute(job.as_ptr()); + SharedJob::execute(job, self); } } @@ -348,20 +322,20 @@ impl WorkerThread { } #[tracing::instrument(level = "trace", skip_all)] - pub fn wait_until_latch(&self, latch: &L) + pub fn wait_until_pred(&self, mut pred: F) where - L: Probe, + F: FnMut() -> bool, { - if !latch.probe() { - tracing::trace!("thread {:?} waiting on latch", self.heartbeat.index()); - self.wait_until_latch_cold(latch); + if !pred() { + tracing::trace!("thread {:?} waiting on predicate", self.heartbeat.index()); + self.wait_until_latch_cold(pred); } } #[cold] - fn wait_until_latch_cold(&self, latch: &L) + fn wait_until_latch_cold(&self, mut pred: F) where - L: Probe, + F: FnMut() -> bool, { if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) { tracing::trace!( @@ -369,18 +343,17 @@ impl WorkerThread { self.heartbeat.index(), shared_job ); - unsafe { Job::execute(shared_job.as_ptr()) }; + unsafe { SharedJob::execute(shared_job, self) }; } // do the usual thing and wait for the job's latch // do the usual thing??? chatgipity really said this.. - while !latch.probe() { + while !pred() { // check local jobs before locking shared context - if let Some(job) = self.find_work_or_wait_unless(|| latch.probe()) { + if let Some(job) = self.find_work() { unsafe { - Job::execute(job.as_ptr()); + SharedJob::execute(job, self); } - continue; } } }