diff --git a/distaff/src/job.rs b/distaff/src/job.rs index d0a66d3..c2b33d8 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -7,11 +7,16 @@ use core::{ ptr::{self, NonNull}, sync::atomic::Ordering, }; +use std::{ + marker::PhantomData, + sync::atomic::{AtomicU8, AtomicU32, AtomicUsize}, +}; use alloc::boxed::Box; +use parking_lot::{Condvar, Mutex}; use parking_lot_core::SpinWait; -use crate::util::{SmallBox, TaggedAtomicPtr}; +use crate::util::{DropGuard, SmallBox, TaggedAtomicPtr}; #[repr(u8)] #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -977,3 +982,254 @@ mod tests { assert!(vec.is_empty()); } } + +// The worker waits on this latch whenever it has nothing to do. +pub struct WorkerLatch { + mutex: Mutex<()>, + condvar: Condvar, +} + +impl WorkerLatch { + pub fn lock(&self) { + mem::forget(self.mutex.lock()); + } + pub fn unlock(&self) { + unsafe { + self.mutex.force_unlock(); + } + } + pub fn wait(&self) { + let mut guard = self.mutex.lock(); + self.condvar.wait(&mut guard); + } + pub fn wake(&self) { + self.condvar.notify_one(); + } +} + +// A job, whether a `StackJob` or `HeapJob`, is turned into a `QueuedJob` when it is pushed to the job queue. +#[repr(C)] +pub struct QueuedJob { + /// The job's harness and state. + harness: TaggedAtomicPtr, + /// The job's value or `this` pointer. This is either a `StackJob` or `HeapJob`. + this: NonNull<()>, + /// The mutex to wake when the job is finished executing. + mutex: *const WorkerLatch, +} + +/// A union that allows us to store either a `T` or a `U` without needing to know which one it is at runtime. +/// The state must be tracked separately. +union UnsafeVariant { + t: ManuallyDrop, + u: ManuallyDrop, +} + +// The processed job is the result of executing a job, it contains the result of the job or an error. +#[repr(C)] +struct JobChannel { + tag: AtomicUsize, + value: UnsafeCell, Box>>, +} + +#[repr(transparent)] +pub struct JobSender { + channel: JobChannel, +} +#[repr(transparent)] +pub struct JobReceiver { + channel: JobChannel, +} + +#[repr(C)] +struct Job2 {} + +const EMPTY: usize = 0; +const FINISHED: usize = 1 << 0; +const ERROR: usize = 1 << 1; + +impl JobSender { + pub fn send(&self, result: std::thread::Result, mutex: *const WorkerLatch) { + // We want to lock here so that we can be sure that we wake the worker + // only if it was waiting, and not immediately after having received the + // result and waiting for further work: + // | thread 1 | thread 2 | + // | | | | | + // | send-> | | | + // | FINISHED | | | + // | | | poll() | + // | | | sleep() | + // | wake() | | + // | | | !woken! | // the worker has already received the result + // | | | | | // and is waiting for more work, it shouldn't + // | | | | | // be woken up here. + // | <-send | | | + // + // if we lock, it looks like this: + // | thread 1 | thread 2 | + // | | | | | + // | send-> | | | + // | lock() | | | + // | FINISHED | | | + // | | | poll() | + // | | | lock()-> | // thread 2 tries to lock. + // | wake() | | // the wake signal is ignored + // | | | | + // | unlock() | | + // | | | l=lock() | // thread2 wakes up and receives the lock + // | <-send | sleep(l) | // thread 2 is now sleeping + // + // This concludes my TED talk on why we need to lock here. + + unsafe { + (&*mutex).lock(); + } + let _guard = DropGuard::new(|| unsafe { (&*mutex).unlock() }); + + match result { + Ok(value) => { + let value = SmallBox::new(value); + let slot = unsafe { &mut *self.channel.value.get() }; + + slot.t = ManuallyDrop::new(value); + self.channel.tag.store(FINISHED, Ordering::Release) + } + Err(payload) => { + let slot = unsafe { &mut *self.channel.value.get() }; + + slot.u = ManuallyDrop::new(payload); + self.channel.tag.store(FINISHED | ERROR, Ordering::Release) + } + } + + // wake the worker waiting on the mutex + unsafe { + (&*mutex).wake(); + } + } +} + +impl JobReceiver { + pub fn poll(&self) -> Option> { + let tag = self.channel.tag.swap(EMPTY, Ordering::Acquire); + + if tag == EMPTY { + return None; + } + + // SAFETY: if we received a non-EMPTY tag, the value must be initialized. + // because we atomically set the taag to EMPTY, we can be sure that we're the only ones accessing the value. + let slot = unsafe { &mut *self.channel.value.get() }; + + if tag & ERROR != 0 { + // job failed, return the error + let err = unsafe { ManuallyDrop::take(&mut slot.u) }; + Some(Err(err)) + } else { + // job succeeded, return the value + let value = unsafe { ManuallyDrop::take(&mut slot.t) }; + Some(Ok(value.into_inner())) + } + } +} + +impl QueuedJob { + pub fn from_stackjob(job: &StackJob, mutex: *const WorkerLatch) -> Self + where + F: FnOnce() -> T + Send, + T: Send, + { + #[align(8)] + unsafe fn harness( + this: *const (), + sender: *const JobSender, + mutex: *const WorkerLatch, + ) where + F: FnOnce() -> T + Send, + T: Send, + { + use std::panic::{AssertUnwindSafe, catch_unwind}; + + let f = unsafe { (*this.cast::>()).unwrap() }; + let result = catch_unwind(AssertUnwindSafe(|| f())); + + unsafe { + (&*(sender as *const JobSender)).send(result, mutex); + } + } + + Self { + harness: TaggedAtomicPtr::new(harness:: as *mut usize, EMPTY), + this: unsafe { NonNull::new_unchecked(job as *const _ as *mut ()) }, + mutex, + } + } + + pub unsafe fn as_receiver(&self) -> &JobReceiver { + unsafe { &*(self as *const Self as *const JobReceiver) } + } + + /// this function will drop `_self` and execute the job. + pub unsafe fn execute(_self: *mut Self) { + let (harness, this, sender, mutex) = unsafe { + let job = &*_self; + let harness: unsafe fn(*const (), *const JobSender, *const WorkerLatch) = + mem::transmute(job.harness.ptr(Ordering::Relaxed)); + let sender = mem::transmute::<*const Self, *const JobSender>(_self); + let this = job.this; + let mutex = job.mutex; + (harness, this, sender, mutex) + }; + + unsafe { + // past this point, `_self` may no longer be a valid pointer to a `QueuedJob`. + (harness)(this.as_ptr(), sender, mutex); + } + } +} + +mod queuedjobqueue { + //! Basically `JobVec`, but for `QueuedJob`s. + + use std::collections::VecDeque; + + use super::*; + + pub struct JobQueue { + jobs: VecDeque>, + } + + impl JobQueue { + pub fn new() -> Self { + Self { + jobs: VecDeque::new(), + } + } + + pub fn push_front(&mut self, job: *const QueuedJob) { + self.jobs + .push_front(unsafe { NonNull::new_unchecked(job as *mut _) }); + } + + pub fn push_back(&mut self, job: *const QueuedJob) { + self.jobs + .push_back(unsafe { NonNull::new_unchecked(job as *mut _) }); + } + + pub fn pop_front(&mut self) -> Option> { + self.jobs.pop_front() + } + + pub fn pop_back(&mut self) -> Option> { + self.jobs.pop_back() + } + + pub fn is_empty(&self) -> bool { + self.jobs.is_empty() + } + + pub fn len(&self) -> usize { + self.jobs.len() + } + } +}