667 lines
20 KiB
Rust
667 lines
20 KiB
Rust
use core::{
|
|
any::Any,
|
|
cell::UnsafeCell,
|
|
fmt::Debug,
|
|
hint::cold_path,
|
|
mem::{self, ManuallyDrop},
|
|
ptr::{self, NonNull},
|
|
sync::atomic::Ordering,
|
|
};
|
|
|
|
use alloc::boxed::Box;
|
|
use parking_lot_core::SpinWait;
|
|
|
|
use crate::util::{SmallBox, TaggedAtomicPtr};
|
|
|
|
#[repr(u8)]
|
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
|
pub enum JobState {
|
|
Empty,
|
|
Locked = 1,
|
|
Pending,
|
|
Finished,
|
|
// Inline = 1 << (u8::BITS - 1),
|
|
// IsError = 1 << (u8::BITS - 2),
|
|
}
|
|
|
|
impl JobState {
|
|
#[allow(dead_code)]
|
|
const MASK: u8 = 0; // Self::Inline as u8 | Self::IsError as u8;
|
|
|
|
fn from_u8(v: u8) -> Option<Self> {
|
|
match v {
|
|
0 => Some(Self::Empty),
|
|
1 => Some(Self::Locked),
|
|
2 => Some(Self::Pending),
|
|
3 => Some(Self::Finished),
|
|
_ => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub use joblist::JobList;
|
|
|
|
mod joblist {
|
|
use core::{fmt::Debug, ptr::NonNull};
|
|
|
|
use alloc::boxed::Box;
|
|
|
|
use super::Job;
|
|
|
|
// the list looks like this:
|
|
// head <-> job1 <-> job2 <-> ... <-> jobN <-> tail
|
|
pub struct JobList {
|
|
// these cannot be boxes because boxes are noalias.
|
|
head: NonNull<Job>,
|
|
tail: NonNull<Job>,
|
|
// the number of jobs in the list.
|
|
// this is used to judge whether or not to join sync or async.
|
|
job_count: usize,
|
|
}
|
|
|
|
impl JobList {
|
|
pub fn new() -> Self {
|
|
let head = Box::into_raw(Box::new(Job::empty()));
|
|
let tail = Box::into_raw(Box::new(Job::empty()));
|
|
|
|
// head and tail point at themselves
|
|
unsafe {
|
|
(&*head).link_mut().prev = None;
|
|
(&*head).link_mut().next = Some(NonNull::new_unchecked(tail));
|
|
|
|
(&*tail).link_mut().prev = Some(NonNull::new_unchecked(head));
|
|
(&*tail).link_mut().next = None;
|
|
|
|
Self {
|
|
head: NonNull::new_unchecked(head),
|
|
tail: NonNull::new_unchecked(tail),
|
|
job_count: 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
fn head(&self) -> NonNull<Job> {
|
|
self.head
|
|
}
|
|
fn tail(&self) -> NonNull<Job> {
|
|
self.tail
|
|
}
|
|
|
|
/// `job` must be valid until it is removed from the list.
|
|
pub unsafe fn push_front<T>(&mut self, job: *const Job<T>) {
|
|
self.job_count += 1;
|
|
let headlink = unsafe { self.head.as_ref().link_mut() };
|
|
|
|
let next = headlink.next.unwrap();
|
|
let next_link = unsafe { next.as_ref().link_mut() };
|
|
|
|
let job_ptr = unsafe { NonNull::new_unchecked(job as _) };
|
|
|
|
headlink.next = Some(job_ptr);
|
|
next_link.prev = Some(job_ptr);
|
|
|
|
let job_link = unsafe { job_ptr.as_ref().link_mut() };
|
|
job_link.next = Some(next);
|
|
job_link.prev = Some(self.head);
|
|
}
|
|
|
|
/// `job` must be valid until it is removed from the list.
|
|
pub unsafe fn push_back<T>(&mut self, job: *const Job<T>) {
|
|
self.job_count += 1;
|
|
let taillink = unsafe { self.tail.as_ref().link_mut() };
|
|
|
|
let prev = taillink.prev.unwrap();
|
|
let prev_link = unsafe { prev.as_ref().link_mut() };
|
|
|
|
let job_ptr = unsafe { NonNull::new_unchecked(job as _) };
|
|
|
|
taillink.prev = Some(job_ptr);
|
|
prev_link.next = Some(job_ptr);
|
|
|
|
let job_link = unsafe { job_ptr.as_ref().link_mut() };
|
|
job_link.prev = Some(prev);
|
|
job_link.next = Some(self.tail);
|
|
}
|
|
|
|
pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
|
|
self.job_count -= 1;
|
|
|
|
let headlink = unsafe { self.head.as_ref().link_mut() };
|
|
|
|
// SAFETY: headlink.next is guaranteed to be Some.
|
|
let job = headlink.next.unwrap();
|
|
let job_link = unsafe { job.as_ref().link_mut() };
|
|
|
|
// short-circuit here if the job is the tail
|
|
let next = job_link.next?;
|
|
let next_link = unsafe { next.as_ref().link_mut() };
|
|
|
|
headlink.next = Some(next);
|
|
next_link.prev = Some(self.head);
|
|
|
|
Some(job)
|
|
}
|
|
|
|
pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
|
|
self.job_count -= 1;
|
|
|
|
let taillink = unsafe { self.tail.as_ref().link_mut() };
|
|
|
|
// SAFETY: taillink.prev is guaranteed to be Some.
|
|
let job = taillink.prev.unwrap();
|
|
let job_link = unsafe { job.as_ref().link_mut() };
|
|
|
|
// short-circuit here if the job is the head
|
|
let prev = job_link.prev?;
|
|
let prev_link = unsafe { prev.as_ref().link_mut() };
|
|
|
|
taillink.prev = Some(prev);
|
|
prev_link.next = Some(self.tail);
|
|
|
|
Some(job)
|
|
}
|
|
|
|
pub fn is_empty(&self) -> bool {
|
|
self.job_count == 0
|
|
}
|
|
|
|
pub fn len(&self) -> usize {
|
|
self.job_count
|
|
}
|
|
}
|
|
|
|
impl Drop for JobList {
|
|
fn drop(&mut self) {
|
|
// Need to drop the head and tail, which were allocated on the heap.
|
|
// elements of the list are managed externally.
|
|
unsafe {
|
|
drop((Box::from_non_null(self.head), Box::from_non_null(self.tail)));
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Debug for JobList {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
f.debug_struct("JobList")
|
|
.field("head", &self.head)
|
|
.field("tail", &self.tail)
|
|
.field("job_count", &self.job_count)
|
|
.field_with("jobs", |f| {
|
|
let mut jobs = f.debug_list();
|
|
|
|
// SAFETY: head.next is guaranteed to be non-null and valid
|
|
let mut job = unsafe { self.head.as_ref().link_mut().next.unwrap() };
|
|
|
|
while job != self.tail {
|
|
let job_ref = unsafe { job.as_ref() };
|
|
jobs.entry(job_ref);
|
|
|
|
// SAFETY: job is guaranteed to be non-null and valid
|
|
// only the tail has a next of None
|
|
job = unsafe { job_ref.link_mut().next.unwrap() };
|
|
}
|
|
|
|
jobs.finish()
|
|
})
|
|
.finish()
|
|
}
|
|
}
|
|
}
|
|
|
|
#[repr(transparent)]
|
|
pub struct JobResult<T> {
|
|
inner: std::thread::Result<T>,
|
|
}
|
|
|
|
impl<T> JobResult<T> {
|
|
pub fn new(result: std::thread::Result<T>) -> Self {
|
|
Self { inner: result }
|
|
}
|
|
|
|
/// convert JobResult into a thread result.
|
|
#[allow(dead_code)]
|
|
pub fn into_inner(self) -> std::thread::Result<T> {
|
|
self.inner
|
|
}
|
|
|
|
// unwraps the result, propagating panics
|
|
pub fn into_result(self) -> T {
|
|
match self.inner {
|
|
Ok(val) => val,
|
|
Err(payload) => {
|
|
cold_path();
|
|
|
|
std::panic::resume_unwind(payload);
|
|
// #[cfg(feature = "std")]
|
|
// {
|
|
// std::panic::resume_unwind(err);
|
|
// }
|
|
// #[cfg(not(feature = "std"))]
|
|
// {
|
|
// // in no-std, we just panic with the error
|
|
// // TODO: figure out how to propagate the error
|
|
// panic!("Job failed: {:?}", payload);
|
|
// }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
struct Link<T> {
|
|
prev: Option<NonNull<T>>,
|
|
next: Option<NonNull<T>>,
|
|
}
|
|
|
|
// `Link` is invariant over `T`
|
|
impl<T> Clone for Link<T> {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
prev: self.prev.clone(),
|
|
next: self.next.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
// `Link` is invariant over `T`
|
|
impl<T> Copy for Link<T> {}
|
|
|
|
struct Thread;
|
|
|
|
union ValueOrThis<T> {
|
|
uninit: (),
|
|
value: ManuallyDrop<SmallBox<T>>,
|
|
this: NonNull<()>,
|
|
}
|
|
|
|
union LinkOrError<T> {
|
|
link: Link<T>,
|
|
waker: ManuallyDrop<Option<std::thread::Thread>>,
|
|
error: ManuallyDrop<Option<Box<dyn Any + Send + 'static>>>,
|
|
}
|
|
|
|
#[repr(C)]
|
|
pub struct Job<T = ()> {
|
|
/// stores the job's harness as a *const usize
|
|
harness_and_state: TaggedAtomicPtr<usize, 3>,
|
|
/// `this` before `execute()` is called, or `value` after `execute()`
|
|
value_or_this: UnsafeCell<ValueOrThis<T>>,
|
|
/// `link` before `execute()` is called, or `error` after `execute()`
|
|
error_or_link: UnsafeCell<LinkOrError<Job>>,
|
|
}
|
|
|
|
unsafe impl<T> Send for Job<T> {}
|
|
|
|
impl<T> Debug for Job<T> {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
let state = JobState::from_u8(self.harness_and_state.tag(Ordering::Relaxed) as u8).unwrap();
|
|
let mut debug = f.debug_struct("Job");
|
|
debug.field("state", &state).field_with("harness", |f| {
|
|
write!(f, "{:?}", self.harness_and_state.ptr(Ordering::Relaxed))
|
|
});
|
|
|
|
match state {
|
|
JobState::Empty => {
|
|
debug
|
|
.field_with("this", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.value_or_this.get()).this })
|
|
})
|
|
.field_with("link", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.error_or_link.get()).link })
|
|
});
|
|
}
|
|
JobState::Locked => {
|
|
#[derive(Debug)]
|
|
struct Locked;
|
|
debug.field("locked", &Locked);
|
|
}
|
|
JobState::Pending => {
|
|
debug
|
|
.field_with("this", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.value_or_this.get()).this })
|
|
})
|
|
.field_with("waker", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.error_or_link.get()).waker })
|
|
});
|
|
}
|
|
JobState::Finished => {
|
|
let err = unsafe { &(&*self.error_or_link.get()).error };
|
|
|
|
let result = match err.as_ref() {
|
|
Some(err) => Err(err),
|
|
None => Ok(unsafe { (&*self.value_or_this.get()).value.0.as_ptr() }),
|
|
};
|
|
|
|
debug.field("result", &result);
|
|
}
|
|
}
|
|
|
|
debug.finish()
|
|
}
|
|
}
|
|
|
|
impl<T> Job<T> {
|
|
pub fn empty() -> Job<T> {
|
|
Self {
|
|
harness_and_state: TaggedAtomicPtr::new(ptr::dangling_mut(), JobState::Empty as usize),
|
|
value_or_this: UnsafeCell::new(ValueOrThis {
|
|
this: NonNull::dangling(),
|
|
}),
|
|
error_or_link: UnsafeCell::new(LinkOrError {
|
|
link: Link {
|
|
prev: None,
|
|
next: None,
|
|
},
|
|
}),
|
|
// _phantom: PhantomPinned,
|
|
}
|
|
}
|
|
pub fn new(harness: unsafe fn(*const (), *const Job<T>), this: NonNull<()>) -> Job<T> {
|
|
Self {
|
|
harness_and_state: TaggedAtomicPtr::new(
|
|
unsafe { mem::transmute(harness) },
|
|
JobState::Empty as usize,
|
|
),
|
|
value_or_this: UnsafeCell::new(ValueOrThis { this }),
|
|
error_or_link: UnsafeCell::new(LinkOrError {
|
|
link: Link {
|
|
prev: None,
|
|
next: None,
|
|
},
|
|
}),
|
|
// _phantom: PhantomPinned,
|
|
}
|
|
}
|
|
|
|
// Job is passed around type-erased as `Job<()>`, to complete the job we
|
|
// need to cast it back to the original type.
|
|
pub unsafe fn transmute_ref<U>(&self) -> &Job<U> {
|
|
unsafe { mem::transmute::<&Job<T>, &Job<U>>(self) }
|
|
}
|
|
|
|
#[inline]
|
|
unsafe fn link_mut(&self) -> &mut Link<Job> {
|
|
unsafe { &mut (&mut *self.error_or_link.get()).link }
|
|
}
|
|
|
|
/// assumes job is in a `JobList`
|
|
pub unsafe fn unlink(&self) {
|
|
unsafe {
|
|
let mut dummy = None;
|
|
let Link { prev, next } = *self.link_mut();
|
|
|
|
*prev
|
|
.map(|ptr| &mut ptr.as_ref().link_mut().next)
|
|
.unwrap_or(&mut dummy) = next;
|
|
*next
|
|
.map(|ptr| &mut ptr.as_ref().link_mut().prev)
|
|
.unwrap_or(&mut dummy) = prev;
|
|
}
|
|
}
|
|
|
|
pub fn state(&self) -> u8 {
|
|
self.harness_and_state.tag(Ordering::Relaxed) as u8
|
|
}
|
|
|
|
pub fn wait(&self) -> JobResult<T> {
|
|
let mut spin = SpinWait::new();
|
|
loop {
|
|
match self.harness_and_state.compare_exchange_weak_tag(
|
|
JobState::Pending as usize,
|
|
JobState::Locked as usize,
|
|
Ordering::Acquire,
|
|
Ordering::Relaxed,
|
|
) {
|
|
// if still pending, sleep until completed
|
|
Ok(state) => {
|
|
debug_assert_eq!(state, JobState::Pending as usize);
|
|
unsafe {
|
|
*(&mut *self.error_or_link.get()).waker = Some(std::thread::current());
|
|
}
|
|
|
|
self.harness_and_state.set_tag(
|
|
JobState::Pending as usize,
|
|
Ordering::Release,
|
|
Ordering::Relaxed,
|
|
);
|
|
|
|
std::thread::park();
|
|
spin.reset();
|
|
|
|
// after sleeping, state should be `Finished`
|
|
}
|
|
Err(state) => {
|
|
// job finished under us, check if it was successful
|
|
if state == JobState::Finished as usize {
|
|
let err = unsafe { (&mut *self.error_or_link.get()).error.take() };
|
|
|
|
let result: std::thread::Result<T> = if let Some(err) = err {
|
|
cold_path();
|
|
Err(err)
|
|
} else {
|
|
let val = unsafe {
|
|
ManuallyDrop::take(&mut (&mut *self.value_or_this.get()).value)
|
|
};
|
|
|
|
Ok(val.into_inner())
|
|
};
|
|
|
|
return JobResult::new(result);
|
|
} else {
|
|
// spin until lock is released.
|
|
tracing::trace!("spin-waiting for job: {:?}", self);
|
|
spin.spin();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// call this when popping value from local queue
|
|
pub fn set_pending(&self) {
|
|
let mut spin = SpinWait::new();
|
|
loop {
|
|
match self.harness_and_state.compare_exchange_weak_tag(
|
|
JobState::Empty as usize,
|
|
JobState::Pending as usize,
|
|
Ordering::Acquire,
|
|
Ordering::Relaxed,
|
|
) {
|
|
Ok(state) => {
|
|
debug_assert_eq!(state, JobState::Empty as usize);
|
|
// set waker to None
|
|
unsafe {
|
|
(&mut *self.error_or_link.get()).waker = ManuallyDrop::new(None);
|
|
}
|
|
return;
|
|
}
|
|
Err(_) => {
|
|
// debug_assert_ne!(state, JobState::Empty as usize);
|
|
|
|
tracing::error!("######## what the sigma?");
|
|
spin.spin();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn execute(job: NonNull<Self>) {
|
|
tracing::trace!("executing job: {:?}", job);
|
|
|
|
// SAFETY: self is non-null
|
|
unsafe {
|
|
let this = job.as_ref();
|
|
let (ptr, state) = this.harness_and_state.ptr_and_tag(Ordering::Relaxed);
|
|
|
|
debug_assert_eq!(state, JobState::Pending as usize);
|
|
let harness: unsafe fn(*const (), *const Self) = mem::transmute(ptr.as_ptr());
|
|
|
|
let this = (*this.value_or_this.get()).this;
|
|
|
|
harness(this.as_ptr().cast(), job.as_ptr());
|
|
}
|
|
}
|
|
|
|
pub(crate) fn complete(&self, result: std::thread::Result<T>) {
|
|
let mut spin = SpinWait::new();
|
|
loop {
|
|
match self.harness_and_state.compare_exchange_weak_tag(
|
|
JobState::Pending as usize,
|
|
JobState::Locked as usize,
|
|
Ordering::Acquire,
|
|
Ordering::Relaxed,
|
|
) {
|
|
Ok(state) => {
|
|
debug_assert_eq!(state, JobState::Pending as usize);
|
|
break;
|
|
}
|
|
Err(_) => {
|
|
// debug_assert_ne!(state, JobState::Pending as usize);
|
|
spin.spin();
|
|
}
|
|
}
|
|
}
|
|
|
|
let waker = unsafe { (&mut *self.error_or_link.get()).waker.take() };
|
|
|
|
match result {
|
|
Ok(val) => unsafe {
|
|
(&mut *self.value_or_this.get()).value = ManuallyDrop::new(SmallBox::new(val));
|
|
(&mut *self.error_or_link.get()).error = ManuallyDrop::new(None);
|
|
},
|
|
Err(err) => unsafe {
|
|
(&mut *self.value_or_this.get()).uninit = ();
|
|
(&mut *self.error_or_link.get()).error = ManuallyDrop::new(Some(err));
|
|
},
|
|
}
|
|
|
|
if let Some(thread) = waker {
|
|
thread.unpark();
|
|
}
|
|
|
|
self.harness_and_state.set_tag(
|
|
JobState::Finished as usize,
|
|
Ordering::Release,
|
|
Ordering::Relaxed,
|
|
);
|
|
}
|
|
}
|
|
|
|
mod stackjob {
|
|
use crate::latch::Latch;
|
|
|
|
use super::*;
|
|
|
|
pub struct StackJob<F, L> {
|
|
latch: L,
|
|
f: UnsafeCell<ManuallyDrop<F>>,
|
|
}
|
|
|
|
impl<F, L> StackJob<F, L> {
|
|
pub fn new(f: F, latch: L) -> Self {
|
|
Self {
|
|
latch,
|
|
f: UnsafeCell::new(ManuallyDrop::new(f)),
|
|
}
|
|
}
|
|
|
|
pub unsafe fn unwrap(&self) -> F {
|
|
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
|
|
}
|
|
}
|
|
|
|
impl<F, L> StackJob<F, L>
|
|
where
|
|
L: Latch,
|
|
{
|
|
pub fn as_job<T>(&self) -> Job<()>
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Send,
|
|
{
|
|
#[align(8)]
|
|
unsafe fn harness<F, T, L: Latch>(this: *const (), job: *const Job<()>)
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Sized + Send,
|
|
{
|
|
let this = unsafe { &*this.cast::<StackJob<F, L>>() };
|
|
let f = unsafe { this.unwrap() };
|
|
|
|
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
|
|
|
|
let job = unsafe { &*job.cast::<Job<T>>() };
|
|
job.complete(result);
|
|
|
|
unsafe {
|
|
Latch::set_raw(&this.latch);
|
|
}
|
|
}
|
|
|
|
Job::new(harness::<F, T, L>, unsafe {
|
|
NonNull::new_unchecked(self as *const _ as *mut ())
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
mod heapjob {
|
|
use super::*;
|
|
|
|
pub struct HeapJob<F> {
|
|
f: F,
|
|
}
|
|
|
|
impl<F> HeapJob<F> {
|
|
pub fn new(f: F) -> Self {
|
|
Self { f }
|
|
}
|
|
|
|
pub fn into_inner(self) -> F {
|
|
self.f
|
|
}
|
|
|
|
pub fn into_boxed_job<T>(self: Box<Self>) -> *mut Job<()>
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Send,
|
|
{
|
|
#[align(8)]
|
|
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Send,
|
|
{
|
|
let job = job.cast_mut();
|
|
|
|
// turn `this`, which was allocated at (2), into box.
|
|
// miri complains this is a use-after-free, but it isn't? silly miri...
|
|
// Turns out this is actually correct on miri's end, but because
|
|
// we ensure that the scope lives as long as any jobs, this is
|
|
// actually fine, as far as I can tell.
|
|
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
|
|
let f = this.into_inner();
|
|
|
|
_ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
|
|
|
|
// drop job (this is fine because the job of a HeapJob is pure POD).
|
|
unsafe {
|
|
ptr::drop_in_place(job);
|
|
}
|
|
|
|
// free box that was allocated at (1)
|
|
_ = unsafe { Box::<ManuallyDrop<Job<T>>>::from_raw(job.cast()) };
|
|
}
|
|
|
|
// (1) allocate box for job
|
|
Box::into_raw(Box::new(Job::new(harness::<F, T>, {
|
|
// (2) convert self into a pointer
|
|
Box::into_non_null(self).cast()
|
|
})))
|
|
}
|
|
}
|
|
}
|
|
|
|
pub use heapjob::HeapJob;
|
|
pub use stackjob::StackJob;
|