executor/distaff/src/job.rs
2025-06-30 16:48:49 +02:00

1357 lines
41 KiB
Rust

use core::{
any::Any,
cell::UnsafeCell,
fmt::Debug,
hint::cold_path,
mem::{self, ManuallyDrop},
ptr::{self, NonNull},
sync::atomic::Ordering,
};
use std::{
cell::Cell,
marker::PhantomData,
mem::MaybeUninit,
ops::DerefMut,
sync::atomic::{AtomicU8, AtomicU32, AtomicUsize},
};
use alloc::boxed::Box;
use parking_lot::{Condvar, Mutex};
use parking_lot_core::SpinWait;
use crate::{
latch::{Probe, WorkerLatch},
util::{DropGuard, SmallBox, TaggedAtomicPtr},
};
#[repr(u8)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum JobState {
Empty,
Locked = 1,
Pending,
Finished,
// Inline = 1 << (u8::BITS - 1),
// IsError = 1 << (u8::BITS - 2),
}
impl JobState {
#[allow(dead_code)]
const MASK: u8 = 0; // Self::Inline as u8 | Self::IsError as u8;
fn from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(Self::Empty),
1 => Some(Self::Locked),
2 => Some(Self::Pending),
3 => Some(Self::Finished),
_ => None,
}
}
}
pub use joblist::JobList;
pub use jobvec::JobVec;
// replacement for `JobList` that uses a VecDeque instead of a linked list.
mod jobvec {
use std::ptr::NonNull;
use super::Job;
use alloc::collections::VecDeque;
#[derive(Debug)]
pub struct JobVec {
jobs: VecDeque<NonNull<Job>>,
}
impl JobVec {
pub fn new() -> Self {
Self {
jobs: VecDeque::new(),
}
}
pub fn remove(&mut self, job: &Job) {
// SAFETY: job is guaranteed to be valid and non-null
let job_ptr = unsafe { NonNull::new_unchecked(job as *const Job as _) };
self.jobs.retain(|j| *j != job_ptr);
}
pub fn push_front<T>(&mut self, job: *const Job<T>) {
let job_ptr = unsafe { NonNull::new_unchecked(job as _) };
self.jobs.push_front(job_ptr);
}
pub fn push_back<T>(&mut self, job: *const Job<T>) {
let job_ptr = unsafe { NonNull::new_unchecked(job as _) };
self.jobs.push_back(job_ptr);
}
pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
self.jobs.pop_front()
}
pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
self.jobs.pop_back()
}
pub fn is_empty(&self) -> bool {
self.jobs.is_empty()
}
pub fn len(&self) -> usize {
self.jobs.len()
}
}
}
mod joblist {
use core::{fmt::Debug, ptr::NonNull};
use alloc::boxed::Box;
use super::Job;
// the list looks like this:
// head <-> job1 <-> job2 <-> ... <-> jobN <-> tail
pub struct JobList {
// these cannot be boxes because boxes are noalias.
head: NonNull<Job>,
tail: NonNull<Job>,
// the number of jobs in the list.
// this is used to judge whether or not to join sync or async.
job_count: usize,
}
impl JobList {
pub fn new() -> Self {
let head = Box::into_raw(Box::new(Job::empty()));
let tail = Box::into_raw(Box::new(Job::empty()));
// head and tail point at themselves
unsafe {
(&*head).link_mut().prev = None;
(&*head).link_mut().next = Some(NonNull::new_unchecked(tail));
(&*tail).link_mut().prev = Some(NonNull::new_unchecked(head));
(&*tail).link_mut().next = None;
Self {
head: NonNull::new_unchecked(head),
tail: NonNull::new_unchecked(tail),
job_count: 0,
}
}
}
fn head(&self) -> NonNull<Job> {
self.head
}
fn tail(&self) -> NonNull<Job> {
self.tail
}
pub fn remove(&mut self, job: &Job) {
job.unlink();
self.job_count -= 1;
}
/// `job` must be valid until it is removed from the list.
pub unsafe fn push_front<T>(&mut self, job: *const Job<T>) {
self.job_count += 1;
let headlink = unsafe { self.head.as_ref().link_mut() };
let next = headlink.next.unwrap();
let next_link = unsafe { next.as_ref().link_mut() };
let job_ptr = unsafe { NonNull::new_unchecked(job as _) };
headlink.next = Some(job_ptr);
next_link.prev = Some(job_ptr);
let job_link = unsafe { job_ptr.as_ref().link_mut() };
job_link.next = Some(next);
job_link.prev = Some(self.head);
}
/// `job` must be valid until it is removed from the list.
pub unsafe fn push_back<T>(&mut self, job: *const Job<T>) {
self.job_count += 1;
let taillink = unsafe { self.tail.as_ref().link_mut() };
let prev = taillink.prev.unwrap();
let prev_link = unsafe { prev.as_ref().link_mut() };
let job_ptr = unsafe { NonNull::new_unchecked(job as _) };
taillink.prev = Some(job_ptr);
prev_link.next = Some(job_ptr);
let job_link = unsafe { job_ptr.as_ref().link_mut() };
job_link.prev = Some(prev);
job_link.next = Some(self.tail);
}
pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
let headlink = unsafe { self.head.as_ref().link_mut() };
// SAFETY: headlink.next is guaranteed to be Some.
let job = headlink.next.unwrap();
let job_link = unsafe { job.as_ref().link_mut() };
// short-circuit here if the job is the tail
let next = job_link.next?;
let next_link = unsafe { next.as_ref().link_mut() };
headlink.next = Some(next);
next_link.prev = Some(self.head);
// decrement job count after having potentially short-circuited
self.job_count -= 1;
Some(job)
}
pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
let taillink = unsafe { self.tail.as_ref().link_mut() };
// SAFETY: taillink.prev is guaranteed to be Some.
let job = taillink.prev.unwrap();
let job_link = unsafe { job.as_ref().link_mut() };
// short-circuit here if the job is the head
let prev = job_link.prev?;
let prev_link = unsafe { prev.as_ref().link_mut() };
taillink.prev = Some(prev);
prev_link.next = Some(self.tail);
// decrement job count after having potentially short-circuited
self.job_count -= 1;
Some(job)
}
pub fn is_empty(&self) -> bool {
self.job_count == 0
}
pub fn len(&self) -> usize {
self.job_count
}
}
impl Drop for JobList {
fn drop(&mut self) {
// Need to drop the head and tail, which were allocated on the heap.
// elements of the list are managed externally.
unsafe {
drop((Box::from_non_null(self.head), Box::from_non_null(self.tail)));
}
}
}
impl Debug for JobList {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobList")
.field("head", &self.head)
.field("tail", &self.tail)
.field("job_count", &self.job_count)
.field_with("jobs", |f| {
let mut jobs = f.debug_list();
// SAFETY: head.next is guaranteed to be non-null and valid
let mut job = unsafe { self.head.as_ref().link_mut().next.unwrap() };
while job != self.tail {
let job_ref = unsafe { job.as_ref() };
jobs.entry(job_ref);
// SAFETY: job is guaranteed to be non-null and valid
// only the tail has a next of None
job = unsafe { job_ref.link_mut().next.unwrap() };
}
jobs.finish()
})
.finish()
}
}
}
#[repr(transparent)]
pub struct JobResult<T> {
inner: std::thread::Result<T>,
}
impl<T> JobResult<T> {
pub fn new(result: std::thread::Result<T>) -> Self {
Self { inner: result }
}
/// convert JobResult into a thread result.
#[allow(dead_code)]
pub fn into_inner(self) -> std::thread::Result<T> {
self.inner
}
// unwraps the result, propagating panics
pub fn into_result(self) -> T {
match self.inner {
Ok(val) => val,
Err(payload) => {
cold_path();
std::panic::resume_unwind(payload);
// #[cfg(feature = "std")]
// {
// std::panic::resume_unwind(err);
// }
// #[cfg(not(feature = "std"))]
// {
// // in no-std, we just panic with the error
// // TODO: figure out how to propagate the error
// panic!("Job failed: {:?}", payload);
// }
}
}
}
}
#[derive(Debug, PartialEq, Eq)]
struct Link<T> {
prev: Option<NonNull<T>>,
next: Option<NonNull<T>>,
}
// `Link` is invariant over `T`
impl<T> Clone for Link<T> {
fn clone(&self) -> Self {
Self {
prev: self.prev.clone(),
next: self.next.clone(),
}
}
}
// `Link` is invariant over `T`
impl<T> Copy for Link<T> {}
union ValueOrThis<T> {
uninit: (),
value: ManuallyDrop<SmallBox<T>>,
this: NonNull<()>,
}
union LinkOrError<T> {
link: Link<T>,
waker: ManuallyDrop<Option<std::thread::Thread>>,
error: ManuallyDrop<Option<Box<dyn Any + Send + 'static>>>,
}
#[repr(C)]
pub struct Job<T = ()> {
/// stores the job's harness as a *const usize
harness_and_state: TaggedAtomicPtr<usize, 3>,
/// `this` before `execute()` is called, or `value` after `execute()`
value_or_this: UnsafeCell<ValueOrThis<T>>,
/// `link` before `execute()` is called, or `error` after `execute()`
error_or_link: UnsafeCell<LinkOrError<Job>>,
}
unsafe impl<T> Send for Job<T> {}
impl<T> Debug for Job<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = JobState::from_u8(self.harness_and_state.tag(Ordering::Relaxed) as u8).unwrap();
let mut debug = f.debug_struct("Job");
debug.field("state", &state).field_with("harness", |f| {
write!(f, "{:?}", self.harness_and_state.ptr(Ordering::Relaxed))
});
match state {
JobState::Empty => {
debug
.field_with("this", |f| {
write!(f, "{:?}", unsafe { &(&*self.value_or_this.get()).this })
})
.field_with("link", |f| {
write!(f, "{:?}", unsafe { &(&*self.error_or_link.get()).link })
});
}
JobState::Locked => {
#[derive(Debug)]
struct Locked;
debug.field("locked", &Locked);
}
JobState::Pending => {
debug
.field_with("this", |f| {
write!(f, "{:?}", unsafe { &(&*self.value_or_this.get()).this })
})
.field_with("waker", |f| {
write!(f, "{:?}", unsafe { &(&*self.error_or_link.get()).waker })
});
}
JobState::Finished => {
let err = unsafe { &(&*self.error_or_link.get()).error };
let result = match err.as_ref() {
Some(err) => Err(err),
None => Ok(unsafe { (&*self.value_or_this.get()).value.0.as_ptr() }),
};
debug.field("result", &result);
}
}
debug.finish()
}
}
impl<T> Job<T> {
pub fn empty() -> Job<T> {
Self {
harness_and_state: TaggedAtomicPtr::new(ptr::dangling_mut(), JobState::Empty as usize),
value_or_this: UnsafeCell::new(ValueOrThis {
this: NonNull::dangling(),
}),
error_or_link: UnsafeCell::new(LinkOrError {
link: Link {
prev: None,
next: None,
},
}),
// _phantom: PhantomPinned,
}
}
pub fn new(harness: unsafe fn(*const (), *const Job<T>), this: NonNull<()>) -> Job<T> {
Self {
harness_and_state: TaggedAtomicPtr::new(
unsafe { mem::transmute(harness) },
JobState::Empty as usize,
),
value_or_this: UnsafeCell::new(ValueOrThis { this }),
error_or_link: UnsafeCell::new(LinkOrError {
link: Link {
prev: None,
next: None,
},
}),
// _phantom: PhantomPinned,
}
}
// Job is passed around type-erased as `Job<()>`, to complete the job we
// need to cast it back to the original type.
pub unsafe fn transmute_ref<U>(&self) -> &Job<U> {
unsafe { mem::transmute::<&Job<T>, &Job<U>>(self) }
}
#[inline]
unsafe fn link_mut(&self) -> &mut Link<Job> {
unsafe { &mut (&mut *self.error_or_link.get()).link }
}
/// assumes job is in a `JobList`
pub fn unlink(&self) {
// SAFETY: if the job isn't linked, these will operate on a dummy value.
unsafe {
let mut dummy = None;
let Link { prev, next } = *self.link_mut();
*prev
.map(|ptr| &mut ptr.as_ref().link_mut().next)
.unwrap_or(&mut dummy) = next;
*next
.map(|ptr| &mut ptr.as_ref().link_mut().prev)
.unwrap_or(&mut dummy) = prev;
}
}
pub fn state(&self) -> u8 {
self.harness_and_state.tag(Ordering::Relaxed) as u8
}
pub fn wait(&self) -> JobResult<T> {
let mut spin = SpinWait::new();
loop {
match self.harness_and_state.compare_exchange_weak_tag(
JobState::Pending as usize,
JobState::Locked as usize,
Ordering::Acquire,
Ordering::Relaxed,
) {
// if still pending, sleep until completed
Ok(state) => {
debug_assert_eq!(state, JobState::Pending as usize);
unsafe {
*(&mut *self.error_or_link.get()).waker = Some(std::thread::current());
}
self.harness_and_state.set_tag(
JobState::Pending as usize,
Ordering::Release,
Ordering::Relaxed,
);
std::thread::park();
spin.reset();
// after sleeping, state should be `Finished`
}
Err(state) => {
// job finished under us, check if it was successful
if state == JobState::Finished as usize {
let err = unsafe { (&mut *self.error_or_link.get()).error.take() };
let result: std::thread::Result<T> = if let Some(err) = err {
cold_path();
Err(err)
} else {
let val = unsafe {
ManuallyDrop::take(&mut (&mut *self.value_or_this.get()).value)
};
Ok(val.into_inner())
};
return JobResult::new(result);
} else {
// spin until lock is released.
tracing::trace!("spin-waiting for job: {:?}", self);
spin.spin();
}
}
}
}
}
/// call this when popping value from local queue
pub fn set_pending(&self) {
let mut spin = SpinWait::new();
loop {
match self.harness_and_state.compare_exchange_weak_tag(
JobState::Empty as usize,
JobState::Pending as usize,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(state) => {
debug_assert_eq!(state, JobState::Empty as usize);
// set waker to None
unsafe {
(&mut *self.error_or_link.get()).waker = ManuallyDrop::new(None);
}
return;
}
Err(_) => {
// debug_assert_ne!(state, JobState::Empty as usize);
tracing::error!("######## what the sigma?");
spin.spin();
}
}
}
}
pub fn execute(job: NonNull<Self>) {
tracing::trace!("executing job: {:?}", job);
// SAFETY: self is non-null
unsafe {
let this = job.as_ref();
let (ptr, state) = this.harness_and_state.ptr_and_tag(Ordering::Relaxed);
debug_assert_eq!(state, JobState::Pending as usize);
let harness: unsafe fn(*const (), *const Self) = mem::transmute(ptr.as_ptr());
let this = (*this.value_or_this.get()).this;
harness(this.as_ptr().cast(), job.as_ptr());
}
}
pub(crate) fn complete(&self, result: std::thread::Result<T>) {
let mut spin = SpinWait::new();
loop {
match self.harness_and_state.compare_exchange_weak_tag(
JobState::Pending as usize,
JobState::Locked as usize,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(state) => {
debug_assert_eq!(state, JobState::Pending as usize);
break;
}
Err(_) => {
// debug_assert_ne!(state, JobState::Pending as usize);
spin.spin();
}
}
}
let waker = unsafe { (&mut *self.error_or_link.get()).waker.take() };
match result {
Ok(val) => unsafe {
(&mut *self.value_or_this.get()).value = ManuallyDrop::new(SmallBox::new(val));
(&mut *self.error_or_link.get()).error = ManuallyDrop::new(None);
},
Err(err) => unsafe {
(&mut *self.value_or_this.get()).uninit = ();
(&mut *self.error_or_link.get()).error = ManuallyDrop::new(Some(err));
},
}
if let Some(thread) = waker {
thread.unpark();
}
self.harness_and_state.set_tag(
JobState::Finished as usize,
Ordering::Release,
Ordering::Relaxed,
);
}
}
mod stackjob {
use crate::latch::Latch;
use super::*;
pub struct StackJob<F, L> {
latch: L,
f: UnsafeCell<ManuallyDrop<F>>,
}
impl<F, L> StackJob<F, L> {
pub fn new(f: F, latch: L) -> Self {
Self {
latch,
f: UnsafeCell::new(ManuallyDrop::new(f)),
}
}
pub unsafe fn unwrap(&self) -> F {
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
}
}
impl<F, L> StackJob<F, L>
where
L: Latch,
{
pub fn as_job<T>(&self) -> Job<()>
where
F: FnOnce() -> T + Send,
T: Send,
{
#[align(8)]
unsafe fn harness<F, T, L: Latch>(this: *const (), job: *const Job<()>)
where
F: FnOnce() -> T + Send,
T: Sized + Send,
{
let this = unsafe { &*this.cast::<StackJob<F, L>>() };
let f = unsafe { this.unwrap() };
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
tracing::trace!("stack job completed: {:?}", job);
let job = unsafe { &*job.cast::<Job<T>>() };
job.complete(result);
unsafe {
Latch::set_raw(&this.latch);
}
}
Job::new(harness::<F, T, L>, unsafe {
NonNull::new_unchecked(self as *const _ as *mut ())
})
}
}
}
mod heapjob {
use super::*;
pub struct HeapJob<F> {
f: F,
}
impl<F> HeapJob<F> {
pub fn new(f: F) -> Self {
Self { f }
}
pub fn into_inner(self) -> F {
self.f
}
pub fn into_boxed_job<T>(self: Box<Self>) -> *mut Job<()>
where
F: FnOnce() -> T + Send,
T: Send,
{
#[align(8)]
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
where
F: FnOnce() -> T + Send,
T: Send,
{
let job = job.cast_mut();
// turn `this`, which was allocated at (2), into box.
// miri complains this is a use-after-free, but it isn't? silly miri...
// Turns out this is actually correct on miri's end, but because
// we ensure that the scope lives as long as any jobs, this is
// actually fine, as far as I can tell.
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
let f = this.into_inner();
{
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
let job = unsafe { &*job.cast::<Job<T>>() };
job.complete(result);
}
// drop job (this is fine because the job of a HeapJob is pure POD).
unsafe {
ptr::drop_in_place(job);
}
tracing::trace!("heap job completed: {:?}", job);
// free box that was allocated at (1)
_ = unsafe { Box::<ManuallyDrop<Job<T>>>::from_raw(job.cast()) };
}
// (1) allocate box for job
Box::into_raw(Box::new(Job::new(harness::<F, T>, {
// (2) convert self into a pointer
Box::into_non_null(self).cast()
})))
}
}
}
pub use heapjob::HeapJob;
pub use stackjob::StackJob;
#[cfg(test)]
mod tests {
use crate::latch::{AtomicLatch, LatchRef};
use super::*;
#[test]
fn job_lifecycle() {
let latch = AtomicLatch::new();
let stack = StackJob::new(|| 3 + 4, LatchRef::new(&latch));
let job = stack.as_job::<i32>();
assert_eq!(job.state(), JobState::Empty as u8);
job.set_pending();
assert_eq!(job.state(), JobState::Pending as u8);
// execute the job
Job::<()>::execute(unsafe { NonNull::new_unchecked(&job as *const Job as _) });
// wait for the job to finish
let result = unsafe { job.transmute_ref::<i32>().wait() };
assert_eq!(result.into_result(), 7);
}
// #[test]
#[should_panic]
fn job_lifecycle_panic() {
let latch = AtomicLatch::new();
let stack = StackJob::new(|| panic!("test panic"), LatchRef::new(&latch));
let job = stack.as_job::<i32>();
assert_eq!(job.state(), JobState::Empty as u8);
job.set_pending();
assert_eq!(job.state(), JobState::Pending as u8);
// execute the job
Job::<()>::execute(unsafe { NonNull::new_unchecked(&job as *const Job as _) });
// wait for the job to finish
let result = unsafe { job.transmute_ref::<i32>().wait() };
std::panic::resume_unwind(result.into_inner().unwrap_err());
}
#[test]
fn joblist_popback() {
let mut list = JobList::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
unsafe {
list.push_back(job1);
list.push_back(job2);
}
assert_eq!(list.len(), 2);
let popped_job = list.pop_back().unwrap();
assert_eq!(popped_job.as_ptr(), job2 as _);
let popped_job = list.pop_back().unwrap();
assert_eq!(popped_job.as_ptr(), job1 as _);
assert!(list.is_empty());
}
#[test]
fn joblist_popfront() {
let mut list = JobList::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
unsafe {
list.push_front(job1);
list.push_front(job2);
}
assert_eq!(list.len(), 2);
let popped_job = list.pop_front().unwrap();
assert_eq!(popped_job.as_ptr(), job2 as _);
let popped_job = list.pop_front().unwrap();
assert_eq!(popped_job.as_ptr(), job1 as _);
assert!(list.is_empty());
}
#[test]
fn joblist_unlink_middle() {
let mut list = JobList::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job3 = Box::into_raw(Box::new(Job::<i32>::empty()));
unsafe {
list.push_back(job1);
list.push_back(job2);
list.push_back(job3);
}
assert_eq!(list.len(), 3);
// Unlink the middle job (job2)
unsafe {
(&*job2).unlink();
}
// Check that job1 and job3 are still in the list
let popped_job1 = list.pop_front().unwrap();
assert_eq!(popped_job1.as_ptr(), job1 as _);
let popped_job3 = list.pop_front().unwrap();
assert_eq!(popped_job3.as_ptr(), job3 as _);
}
#[test]
fn joblist_unlink_head() {
let mut list = JobList::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
unsafe {
list.push_back(job1);
list.push_back(job2);
}
assert_eq!(list.len(), 2);
unsafe {
(&*job1).unlink();
}
// Check that job2 is still in the list
let popped_job2 = list.pop_front().unwrap();
assert_eq!(popped_job2.as_ptr(), job2 as _);
}
#[test]
fn joblist_unlink_tail() {
let mut list = JobList::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
unsafe {
list.push_back(job1);
list.push_back(job2);
}
assert_eq!(list.len(), 2);
unsafe {
(&*job2).unlink();
}
// Check that job1 is still in the list
let popped_job1 = list.pop_front().unwrap();
assert_eq!(popped_job1.as_ptr(), job1 as _);
}
#[test]
fn joblist_unlink_single() {
let mut list = JobList::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
unsafe {
list.push_back(job1);
}
assert_eq!(list.len(), 1);
unsafe {
(&*job1).unlink();
}
// Check that popping from an empty list returns None
assert!(list.pop_front().is_none());
}
#[test]
fn joblist_pop_empty() {
let mut list = JobList::new();
// Popping from an empty list should return None
assert!(list.pop_front().is_none());
assert!(list.pop_back().is_none());
}
#[test]
fn jobvec_push_front() {
let mut vec = JobVec::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
vec.push_front(job1);
vec.push_front(job2);
assert_eq!(vec.len(), 2);
let popped_job = vec.pop_front().unwrap();
assert_eq!(popped_job.as_ptr(), job2 as _);
let popped_job = vec.pop_front().unwrap();
assert_eq!(popped_job.as_ptr(), job1 as _);
assert!(vec.is_empty());
}
#[test]
fn jobvec_push_back() {
let mut vec = JobVec::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
vec.push_back(job1);
vec.push_back(job2);
assert_eq!(vec.len(), 2);
let popped_job = vec.pop_back().unwrap();
assert_eq!(popped_job.as_ptr(), job2 as _);
let popped_job = vec.pop_back().unwrap();
assert_eq!(popped_job.as_ptr(), job1 as _);
assert!(vec.is_empty());
}
#[test]
fn jobvec_push_front_pop_back() {
let mut vec = JobVec::new();
let job1 = Box::into_raw(Box::new(Job::<i32>::empty()));
let job2 = Box::into_raw(Box::new(Job::<i32>::empty()));
vec.push_front(job1);
vec.push_front(job2);
assert_eq!(vec.len(), 2);
let popped_job = vec.pop_back().unwrap();
assert_eq!(popped_job.as_ptr(), job1 as _);
let popped_job = vec.pop_back().unwrap();
assert_eq!(popped_job.as_ptr(), job2 as _);
assert!(vec.is_empty());
}
}
// A job, whether a `StackJob` or `HeapJob`, is turned into a `QueuedJob` when it is pushed to the job queue.
#[repr(C)]
pub struct QueuedJob {
/// The job's harness and state.
harness: TaggedAtomicPtr<usize, 3>,
// This is later invalidated by the Receiver/Sender, so it must be wrapped in a `MaybeUninit`.
// I'm not sure if it also must be inside of an `UnsafeCell`..
inner: Cell<MaybeUninit<QueueJobInner>>,
}
impl Debug for QueuedJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueuedJob")
.field("harness", &self.harness)
.field("inner", unsafe {
(&*self.inner.as_ptr()).assume_init_ref()
})
.finish()
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
struct QueueJobInner {
/// The job's value or `this` pointer. This is either a `StackJob` or `HeapJob`.
this: NonNull<()>,
/// The mutex to wake when the job is finished executing.
mutex: *const WorkerLatch,
}
/// A union that allows us to store either a `T` or a `U` without needing to know which one it is at runtime.
/// The state must be tracked separately.
union UnsafeVariant<T, U> {
t: ManuallyDrop<T>,
u: ManuallyDrop<U>,
}
// The processed job is the result of executing a job, it contains the result of the job or an error.
#[repr(C)]
struct JobChannel<T = ()> {
tag: TaggedAtomicPtr<usize, 3>,
value: UnsafeCell<MaybeUninit<UnsafeVariant<SmallBox<T>, Box<dyn Any + Send + 'static>>>>,
}
#[repr(transparent)]
pub struct JobSender<T = ()> {
channel: JobChannel<T>,
}
#[repr(transparent)]
pub struct JobReceiver<T = ()> {
channel: JobChannel<T>,
}
#[repr(C)]
struct Job2 {}
const EMPTY: usize = 0;
const SHARED: usize = 1 << 2;
const FINISHED: usize = 1 << 0;
const ERROR: usize = 1 << 1;
impl<T> JobSender<T> {
#[tracing::instrument(level = "trace", skip_all)]
pub fn send(&self, result: std::thread::Result<T>, mutex: *const WorkerLatch) {
tracing::trace!("sending job ({:?}) result", &raw const *self);
// We want to lock here so that we can be sure that we wake the worker
// only if it was waiting, and not immediately after having received the
// result and waiting for further work:
// | thread 1 | thread 2 |
// | | | | |
// | send-> | | |
// | FINISHED | | |
// | | | poll() |
// | | | sleep() |
// | wake() | |
// | | | !woken! | // the worker has already received the result
// | | | | | // and is waiting for more work, it shouldn't
// | | | | | // be woken up here.
// | <-send | | |
//
// if we lock, it looks like this:
// | thread 1 | thread 2 |
// | | | | |
// | send-> | | |
// | lock() | | |
// | FINISHED | | |
// | | | lock()-> | // thread 2 tries to lock.
// | wake() | | // the wake signal is ignored
// | | | |
// | unlock() | |
// | | | l=lock() | // thread2 wakes up and receives the lock
// | | | poll() |
// | <-send | sleep(l) | // thread 2 is now sleeping
//
// This concludes my TED talk on why we need to lock here.
let _guard = unsafe { mutex.as_ref() }.map(|mutex| {
let guard = mutex.lock();
DropGuard::new(move || {
// // SAFETY: we forget the guard here so we no longer borrow the mutex.
// mem::forget(guard);
_ = guard;
mutex.wake();
// // SAFETY: we can safely unlock the mutex here, as we are the only ones holding it.
// mutex.force_unlock();
})
});
assert!(self.channel.tag.tag(Ordering::Acquire) & FINISHED == 0);
match result {
Ok(value) => {
let slot = unsafe { &mut *self.channel.value.get() };
slot.write(UnsafeVariant {
t: ManuallyDrop::new(SmallBox::new(value)),
});
self.channel.tag.fetch_or_tag(FINISHED, Ordering::Release);
}
Err(payload) => {
let slot = unsafe { &mut *self.channel.value.get() };
slot.write(UnsafeVariant {
u: ManuallyDrop::new(payload),
});
self.channel
.tag
.fetch_or_tag(FINISHED | ERROR, Ordering::Release);
}
}
// wake the worker waiting on the mutex and drop the guard
}
}
impl<T> JobReceiver<T> {
#[tracing::instrument(level = "trace", skip_all)]
pub fn is_finished(&self) -> bool {
self.channel.tag.tag(Ordering::Acquire) & FINISHED != 0
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn poll(&self) -> Option<std::thread::Result<T>> {
let tag = self.channel.tag.take_tag(Ordering::Acquire);
if tag & FINISHED == 0 {
return None;
}
tracing::trace!("received job ({:?}) result", &raw const *self);
// SAFETY: if we received a non-EMPTY tag, the value must be initialized.
// because we atomically set the taag to EMPTY, we can be sure that we're the only ones accessing the value.
let slot = unsafe { (&mut *self.channel.value.get()).assume_init_mut() };
if tag & ERROR != 0 {
// job failed, return the error
let err = unsafe { ManuallyDrop::take(&mut slot.u) };
Some(Err(err))
} else {
// job succeeded, return the value
let value = unsafe { ManuallyDrop::take(&mut slot.t) };
Some(Ok(value.into_inner()))
}
}
}
impl QueuedJob {
fn new(
harness: TaggedAtomicPtr<usize, 3>,
this: NonNull<()>,
mutex: *const WorkerLatch,
) -> Self {
let this = Self {
harness,
inner: Cell::new(MaybeUninit::new(QueueJobInner { this, mutex })),
};
tracing::trace!("new queued job: {:?}", this);
this
}
pub fn from_stackjob<F, T, L>(job: &StackJob<F, L>, mutex: *const WorkerLatch) -> Self
where
F: FnOnce() -> T + Send,
T: Send,
{
#[align(8)]
#[tracing::instrument(level = "trace", skip_all, name = "stack_job_harness")]
unsafe fn harness<F, T, L>(
this: *const (),
sender: *const JobSender,
mutex: *const WorkerLatch,
) where
F: FnOnce() -> T + Send,
T: Send,
{
use std::panic::{AssertUnwindSafe, catch_unwind};
let f = unsafe { (*this.cast::<StackJob<F, L>>()).unwrap() };
let result = catch_unwind(AssertUnwindSafe(|| f()));
unsafe {
(&*(sender as *const JobSender<T>)).send(result, mutex);
}
}
Self::new(
TaggedAtomicPtr::new(harness::<F, T, L> as *mut usize, EMPTY),
unsafe { NonNull::new_unchecked(job as *const _ as *mut ()) },
mutex,
)
}
pub fn from_heapjob<F, T>(job: Box<HeapJob<F>>, mutex: *const WorkerLatch) -> NonNull<Self>
where
F: FnOnce() -> T + Send,
T: Send,
{
#[align(8)]
#[tracing::instrument(level = "trace", skip_all, name = "heap_job_harness")]
unsafe fn harness<F, T>(
this: *const (),
sender: *const JobSender,
mutex: *const WorkerLatch,
) where
F: FnOnce() -> T + Send,
T: Send,
{
use std::panic::{AssertUnwindSafe, catch_unwind};
// expect MIRI to complain about this, but it is actually correct.
// because I am so much smarter than MIRI, naturally, obviously.
// unbox the job, which was allocated at (2)
let f = unsafe { (*Box::from_raw(this.cast::<HeapJob<F>>().cast_mut())).into_inner() };
let result = catch_unwind(AssertUnwindSafe(|| f()));
unsafe {
(&*(sender as *const JobSender<T>)).send(result, mutex);
}
// drop the job, which was allocated at (1)
_ = unsafe { Box::<ManuallyDrop<JobSender>>::from_raw(sender as *mut _) };
}
// (1) allocate box for job
Box::into_non_null(Box::new(Self::new(
TaggedAtomicPtr::new(harness::<F, T> as *mut usize, EMPTY),
// (2) convert job into a pointer
unsafe { NonNull::new_unchecked(Box::into_raw(job) as *mut ()) },
mutex,
)))
}
pub fn from_harness(
harness: unsafe fn(*const (), *const JobSender, *const WorkerLatch),
this: NonNull<()>,
mutex: *const WorkerLatch,
) -> Self {
Self::new(
TaggedAtomicPtr::new(harness as *mut usize, EMPTY),
this,
mutex,
)
}
pub fn set_shared(&self) {
self.harness.fetch_or_tag(SHARED, Ordering::Relaxed);
}
pub fn is_shared(&self) -> bool {
self.harness.tag(Ordering::Relaxed) & SHARED != 0
}
pub unsafe fn as_receiver<T>(&self) -> &JobReceiver<T> {
unsafe { mem::transmute::<&QueuedJob, &JobReceiver<T>>(self) }
}
/// this function will drop `_self` and execute the job.
#[tracing::instrument(level = "trace", skip_all)]
pub unsafe fn execute(_self: *mut Self) {
let (harness, this, sender, mutex) = unsafe {
let job = &*_self;
tracing::debug!("executing queued job: {:?}", job);
let harness: unsafe fn(*const (), *const JobSender, *const WorkerLatch) =
mem::transmute(job.harness.ptr(Ordering::Relaxed));
let sender = mem::transmute::<*const Self, *const JobSender>(_self);
let QueueJobInner { this, mutex } =
job.inner.replace(MaybeUninit::uninit()).assume_init();
(harness, this, sender, mutex)
};
unsafe {
// past this point, `_self` may no longer be a valid pointer to a `QueuedJob`.
(harness)(this.as_ptr(), sender, mutex);
}
}
}
impl Probe for QueuedJob {
fn probe(&self) -> bool {
self.harness.tag(Ordering::Relaxed) & FINISHED != 0
}
}
impl Probe for JobReceiver {
fn probe(&self) -> bool {
self.channel.tag.tag(Ordering::Relaxed) & FINISHED != 0
}
}
pub use queuedjobqueue::JobQueue;
mod queuedjobqueue {
//! Basically `JobVec`, but for `QueuedJob`s.
use std::collections::VecDeque;
use super::*;
#[derive(Debug)]
pub struct JobQueue {
jobs: VecDeque<NonNull<QueuedJob>>,
}
impl JobQueue {
pub fn new() -> Self {
Self {
jobs: VecDeque::new(),
}
}
pub fn push_front(&mut self, job: *const QueuedJob) {
self.jobs
.push_front(unsafe { NonNull::new_unchecked(job as *mut _) });
}
pub fn push_back(&mut self, job: *const QueuedJob) {
self.jobs
.push_back(unsafe { NonNull::new_unchecked(job as *mut _) });
}
pub fn pop_front(&mut self) -> Option<NonNull<QueuedJob>> {
self.jobs.pop_front()
}
pub fn pop_back(&mut self) -> Option<NonNull<QueuedJob>> {
self.jobs.pop_back()
}
pub fn is_empty(&self) -> bool {
self.jobs.is_empty()
}
pub fn len(&self) -> usize {
self.jobs.len()
}
}
}