use core::{ any::Any, cell::UnsafeCell, fmt::Debug, hint::cold_path, mem::{self, ManuallyDrop}, ptr::{self, NonNull}, sync::atomic::Ordering, }; use alloc::boxed::Box; use parking_lot_core::SpinWait; use crate::util::{SmallBox, TaggedAtomicPtr}; #[repr(u8)] #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum JobState { Empty, Locked = 1, Pending, Finished, // Inline = 1 << (u8::BITS - 1), // IsError = 1 << (u8::BITS - 2), } impl JobState { #[allow(dead_code)] const MASK: u8 = 0; // Self::Inline as u8 | Self::IsError as u8; fn from_u8(v: u8) -> Option { match v { 0 => Some(Self::Empty), 1 => Some(Self::Locked), 2 => Some(Self::Pending), 3 => Some(Self::Finished), _ => None, } } } pub use joblist::JobList; mod joblist { use core::{fmt::Debug, ptr::NonNull}; use alloc::boxed::Box; use super::Job; // the list looks like this: // head <-> job1 <-> job2 <-> ... <-> jobN <-> tail pub struct JobList { // these cannot be boxes because boxes are noalias. head: NonNull, tail: NonNull, // the number of jobs in the list. // this is used to judge whether or not to join sync or async. job_count: usize, } impl JobList { pub fn new() -> Self { let head = Box::into_raw(Box::new(Job::empty())); let tail = Box::into_raw(Box::new(Job::empty())); // head and tail point at themselves unsafe { (&*head).link_mut().prev = None; (&*head).link_mut().next = Some(NonNull::new_unchecked(tail)); (&*tail).link_mut().prev = Some(NonNull::new_unchecked(head)); (&*tail).link_mut().next = None; Self { head: NonNull::new_unchecked(head), tail: NonNull::new_unchecked(tail), job_count: 0, } } } fn head(&self) -> NonNull { self.head } fn tail(&self) -> NonNull { self.tail } /// `job` must be valid until it is removed from the list. pub unsafe fn push_front(&mut self, job: *const Job) { self.job_count += 1; let headlink = unsafe { self.head.as_ref().link_mut() }; let next = headlink.next.unwrap(); let next_link = unsafe { next.as_ref().link_mut() }; let job_ptr = unsafe { NonNull::new_unchecked(job as _) }; headlink.next = Some(job_ptr); next_link.prev = Some(job_ptr); let job_link = unsafe { job_ptr.as_ref().link_mut() }; job_link.next = Some(next); job_link.prev = Some(self.head); } /// `job` must be valid until it is removed from the list. pub unsafe fn push_back(&mut self, job: *const Job) { self.job_count += 1; let taillink = unsafe { self.tail.as_ref().link_mut() }; let prev = taillink.prev.unwrap(); let prev_link = unsafe { prev.as_ref().link_mut() }; let job_ptr = unsafe { NonNull::new_unchecked(job as _) }; taillink.prev = Some(job_ptr); prev_link.next = Some(job_ptr); let job_link = unsafe { job_ptr.as_ref().link_mut() }; job_link.prev = Some(prev); job_link.next = Some(self.tail); } pub fn pop_front(&mut self) -> Option> { self.job_count -= 1; let headlink = unsafe { self.head.as_ref().link_mut() }; // SAFETY: headlink.next is guaranteed to be Some. let job = headlink.next.unwrap(); let job_link = unsafe { job.as_ref().link_mut() }; // short-circuit here if the job is the tail let next = job_link.next?; let next_link = unsafe { next.as_ref().link_mut() }; headlink.next = Some(next); next_link.prev = Some(self.head); Some(job) } pub fn pop_back(&mut self) -> Option> { self.job_count -= 1; let taillink = unsafe { self.tail.as_ref().link_mut() }; // SAFETY: taillink.prev is guaranteed to be Some. let job = taillink.prev.unwrap(); let job_link = unsafe { job.as_ref().link_mut() }; // short-circuit here if the job is the head let prev = job_link.prev?; let prev_link = unsafe { prev.as_ref().link_mut() }; taillink.prev = Some(prev); prev_link.next = Some(self.tail); Some(job) } pub fn is_empty(&self) -> bool { self.job_count == 0 } pub fn len(&self) -> usize { self.job_count } } impl Drop for JobList { fn drop(&mut self) { // Need to drop the head and tail, which were allocated on the heap. // elements of the list are managed externally. unsafe { drop((Box::from_non_null(self.head), Box::from_non_null(self.tail))); } } } impl Debug for JobList { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("JobList") .field("head", &self.head) .field("tail", &self.tail) .field("job_count", &self.job_count) .field_with("jobs", |f| { let mut jobs = f.debug_list(); // SAFETY: head.next is guaranteed to be non-null and valid let mut job = unsafe { self.head.as_ref().link_mut().next.unwrap() }; while job != self.tail { let job_ref = unsafe { job.as_ref() }; jobs.entry(job_ref); // SAFETY: job is guaranteed to be non-null and valid // only the tail has a next of None job = unsafe { job_ref.link_mut().next.unwrap() }; } jobs.finish() }) .finish() } } } #[repr(transparent)] pub struct JobResult { inner: std::thread::Result, } impl JobResult { pub fn new(result: std::thread::Result) -> Self { Self { inner: result } } /// convert JobResult into a thread result. #[allow(dead_code)] pub fn into_inner(self) -> std::thread::Result { self.inner } // unwraps the result, propagating panics pub fn into_result(self) -> T { match self.inner { Ok(val) => val, Err(payload) => { cold_path(); std::panic::resume_unwind(payload); // #[cfg(feature = "std")] // { // std::panic::resume_unwind(err); // } // #[cfg(not(feature = "std"))] // { // // in no-std, we just panic with the error // // TODO: figure out how to propagate the error // panic!("Job failed: {:?}", payload); // } } } } } #[derive(Debug, PartialEq, Eq)] struct Link { prev: Option>, next: Option>, } // `Link` is invariant over `T` impl Clone for Link { fn clone(&self) -> Self { Self { prev: self.prev.clone(), next: self.next.clone(), } } } // `Link` is invariant over `T` impl Copy for Link {} struct Thread; union ValueOrThis { uninit: (), value: ManuallyDrop>, this: NonNull<()>, } union LinkOrError { link: Link, waker: ManuallyDrop>, error: ManuallyDrop>>, } #[repr(C)] pub struct Job { /// stores the job's harness as a *const usize harness_and_state: TaggedAtomicPtr, /// `this` before `execute()` is called, or `value` after `execute()` value_or_this: UnsafeCell>, /// `link` before `execute()` is called, or `error` after `execute()` error_or_link: UnsafeCell>, } unsafe impl Send for Job {} impl Debug for Job { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let state = JobState::from_u8(self.harness_and_state.tag(Ordering::Relaxed) as u8).unwrap(); let mut debug = f.debug_struct("Job"); debug.field("state", &state).field_with("harness", |f| { write!(f, "{:?}", self.harness_and_state.ptr(Ordering::Relaxed)) }); match state { JobState::Empty => { debug .field_with("this", |f| { write!(f, "{:?}", unsafe { &(&*self.value_or_this.get()).this }) }) .field_with("link", |f| { write!(f, "{:?}", unsafe { &(&*self.error_or_link.get()).link }) }); } JobState::Locked => { #[derive(Debug)] struct Locked; debug.field("locked", &Locked); } JobState::Pending => { debug .field_with("this", |f| { write!(f, "{:?}", unsafe { &(&*self.value_or_this.get()).this }) }) .field_with("waker", |f| { write!(f, "{:?}", unsafe { &(&*self.error_or_link.get()).waker }) }); } JobState::Finished => { let err = unsafe { &(&*self.error_or_link.get()).error }; let result = match err.as_ref() { Some(err) => Err(err), None => Ok(unsafe { (&*self.value_or_this.get()).value.0.as_ptr() }), }; debug.field("result", &result); } } debug.finish() } } impl Job { pub fn empty() -> Job { Self { harness_and_state: TaggedAtomicPtr::new(ptr::dangling_mut(), JobState::Empty as usize), value_or_this: UnsafeCell::new(ValueOrThis { this: NonNull::dangling(), }), error_or_link: UnsafeCell::new(LinkOrError { link: Link { prev: None, next: None, }, }), // _phantom: PhantomPinned, } } pub fn new(harness: unsafe fn(*const (), *const Job), this: NonNull<()>) -> Job { Self { harness_and_state: TaggedAtomicPtr::new( unsafe { mem::transmute(harness) }, JobState::Empty as usize, ), value_or_this: UnsafeCell::new(ValueOrThis { this }), error_or_link: UnsafeCell::new(LinkOrError { link: Link { prev: None, next: None, }, }), // _phantom: PhantomPinned, } } // Job is passed around type-erased as `Job<()>`, to complete the job we // need to cast it back to the original type. pub unsafe fn transmute_ref(&self) -> &Job { unsafe { mem::transmute::<&Job, &Job>(self) } } #[inline] unsafe fn link_mut(&self) -> &mut Link { unsafe { &mut (&mut *self.error_or_link.get()).link } } /// assumes job is in a `JobList` pub unsafe fn unlink(&self) { unsafe { let mut dummy = None; let Link { prev, next } = *self.link_mut(); *prev .map(|ptr| &mut ptr.as_ref().link_mut().next) .unwrap_or(&mut dummy) = next; *next .map(|ptr| &mut ptr.as_ref().link_mut().prev) .unwrap_or(&mut dummy) = prev; } } pub fn state(&self) -> u8 { self.harness_and_state.tag(Ordering::Relaxed) as u8 } pub fn wait(&self) -> JobResult { let mut spin = SpinWait::new(); loop { match self.harness_and_state.compare_exchange_weak_tag( JobState::Pending as usize, JobState::Locked as usize, Ordering::Acquire, Ordering::Relaxed, ) { // if still pending, sleep until completed Ok(state) => { debug_assert_eq!(state, JobState::Pending as usize); unsafe { *(&mut *self.error_or_link.get()).waker = Some(std::thread::current()); } self.harness_and_state.set_tag( JobState::Pending as usize, Ordering::Release, Ordering::Relaxed, ); std::thread::park(); spin.reset(); // after sleeping, state should be `Finished` } Err(state) => { // job finished under us, check if it was successful if state == JobState::Finished as usize { let err = unsafe { (&mut *self.error_or_link.get()).error.take() }; let result: std::thread::Result = if let Some(err) = err { cold_path(); Err(err) } else { let val = unsafe { ManuallyDrop::take(&mut (&mut *self.value_or_this.get()).value) }; Ok(val.into_inner()) }; return JobResult::new(result); } else { // spin until lock is released. tracing::trace!("spin-waiting for job: {:?}", self); spin.spin(); } } } } } /// call this when popping value from local queue pub fn set_pending(&self) { let mut spin = SpinWait::new(); loop { match self.harness_and_state.compare_exchange_weak_tag( JobState::Empty as usize, JobState::Pending as usize, Ordering::Acquire, Ordering::Relaxed, ) { Ok(state) => { debug_assert_eq!(state, JobState::Empty as usize); // set waker to None unsafe { (&mut *self.error_or_link.get()).waker = ManuallyDrop::new(None); } return; } Err(_) => { // debug_assert_ne!(state, JobState::Empty as usize); tracing::error!("######## what the sigma?"); spin.spin(); } } } } pub fn execute(job: NonNull) { tracing::trace!("executing job: {:?}", job); // SAFETY: self is non-null unsafe { let this = job.as_ref(); let (ptr, state) = this.harness_and_state.ptr_and_tag(Ordering::Relaxed); debug_assert_eq!(state, JobState::Pending as usize); let harness: unsafe fn(*const (), *const Self) = mem::transmute(ptr.as_ptr()); let this = (*this.value_or_this.get()).this; harness(this.as_ptr().cast(), job.as_ptr()); } } pub(crate) fn complete(&self, result: std::thread::Result) { let mut spin = SpinWait::new(); loop { match self.harness_and_state.compare_exchange_weak_tag( JobState::Pending as usize, JobState::Locked as usize, Ordering::Acquire, Ordering::Relaxed, ) { Ok(state) => { debug_assert_eq!(state, JobState::Pending as usize); break; } Err(_) => { // debug_assert_ne!(state, JobState::Pending as usize); spin.spin(); } } } let waker = unsafe { (&mut *self.error_or_link.get()).waker.take() }; match result { Ok(val) => unsafe { (&mut *self.value_or_this.get()).value = ManuallyDrop::new(SmallBox::new(val)); (&mut *self.error_or_link.get()).error = ManuallyDrop::new(None); }, Err(err) => unsafe { (&mut *self.value_or_this.get()).uninit = (); (&mut *self.error_or_link.get()).error = ManuallyDrop::new(Some(err)); }, } if let Some(thread) = waker { thread.unpark(); } self.harness_and_state.set_tag( JobState::Finished as usize, Ordering::Release, Ordering::Relaxed, ); } } mod stackjob { use crate::latch::Latch; use super::*; pub struct StackJob { latch: L, f: UnsafeCell>, } impl StackJob { pub fn new(f: F, latch: L) -> Self { Self { latch, f: UnsafeCell::new(ManuallyDrop::new(f)), } } pub unsafe fn unwrap(&self) -> F { unsafe { ManuallyDrop::take(&mut *self.f.get()) } } } impl StackJob where L: Latch, { pub fn as_job(&self) -> Job<()> where F: FnOnce() -> T + Send, T: Send, { #[align(8)] unsafe fn harness(this: *const (), job: *const Job<()>) where F: FnOnce() -> T + Send, T: Sized + Send, { let this = unsafe { &*this.cast::>() }; let f = unsafe { this.unwrap() }; let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); let job = unsafe { &*job.cast::>() }; job.complete(result); unsafe { Latch::set_raw(&this.latch); } } Job::new(harness::, unsafe { NonNull::new_unchecked(self as *const _ as *mut ()) }) } } } mod heapjob { use super::*; pub struct HeapJob { f: F, } impl HeapJob { pub fn new(f: F) -> Self { Self { f } } pub fn into_inner(self) -> F { self.f } pub fn into_boxed_job(self: Box) -> *mut Job<()> where F: FnOnce() -> T + Send, T: Send, { #[align(8)] unsafe fn harness(this: *const (), job: *const Job<()>) where F: FnOnce() -> T + Send, T: Send, { let job = job.cast_mut(); // turn `this`, which was allocated at (2), into box. // miri complains this is a use-after-free, but it isn't? silly miri... // Turns out this is actually correct on miri's end, but because // we ensure that the scope lives as long as any jobs, this is // actually fine, as far as I can tell. let this = unsafe { Box::from_raw(this.cast::>().cast_mut()) }; let f = this.into_inner(); _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); // drop job (this is fine because the job of a HeapJob is pure POD). unsafe { ptr::drop_in_place(job); } // free box that was allocated at (1) _ = unsafe { Box::>>::from_raw(job.cast()) }; } // (1) allocate box for job Box::into_raw(Box::new(Job::new(harness::, { // (2) convert self into a pointer Box::into_non_null(self).cast() }))) } } } pub use heapjob::HeapJob; pub use stackjob::StackJob;