new job impl

This commit is contained in:
Janis 2025-06-27 12:48:50 +02:00
parent 3b07565118
commit c4b4f9248a

View file

@ -7,11 +7,16 @@ use core::{
ptr::{self, NonNull},
sync::atomic::Ordering,
};
use std::{
marker::PhantomData,
sync::atomic::{AtomicU8, AtomicU32, AtomicUsize},
};
use alloc::boxed::Box;
use parking_lot::{Condvar, Mutex};
use parking_lot_core::SpinWait;
use crate::util::{SmallBox, TaggedAtomicPtr};
use crate::util::{DropGuard, SmallBox, TaggedAtomicPtr};
#[repr(u8)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@ -977,3 +982,254 @@ mod tests {
assert!(vec.is_empty());
}
}
// The worker waits on this latch whenever it has nothing to do.
pub struct WorkerLatch {
mutex: Mutex<()>,
condvar: Condvar,
}
impl WorkerLatch {
pub fn lock(&self) {
mem::forget(self.mutex.lock());
}
pub fn unlock(&self) {
unsafe {
self.mutex.force_unlock();
}
}
pub fn wait(&self) {
let mut guard = self.mutex.lock();
self.condvar.wait(&mut guard);
}
pub fn wake(&self) {
self.condvar.notify_one();
}
}
// A job, whether a `StackJob` or `HeapJob`, is turned into a `QueuedJob` when it is pushed to the job queue.
#[repr(C)]
pub struct QueuedJob {
/// The job's harness and state.
harness: TaggedAtomicPtr<usize, 3>,
/// The job's value or `this` pointer. This is either a `StackJob` or `HeapJob`.
this: NonNull<()>,
/// The mutex to wake when the job is finished executing.
mutex: *const WorkerLatch,
}
/// A union that allows us to store either a `T` or a `U` without needing to know which one it is at runtime.
/// The state must be tracked separately.
union UnsafeVariant<T, U> {
t: ManuallyDrop<T>,
u: ManuallyDrop<U>,
}
// The processed job is the result of executing a job, it contains the result of the job or an error.
#[repr(C)]
struct JobChannel<T = ()> {
tag: AtomicUsize,
value: UnsafeCell<UnsafeVariant<SmallBox<T>, Box<dyn Any + Send + 'static>>>,
}
#[repr(transparent)]
pub struct JobSender<T = ()> {
channel: JobChannel<T>,
}
#[repr(transparent)]
pub struct JobReceiver<T = ()> {
channel: JobChannel<T>,
}
#[repr(C)]
struct Job2 {}
const EMPTY: usize = 0;
const FINISHED: usize = 1 << 0;
const ERROR: usize = 1 << 1;
impl<T> JobSender<T> {
pub fn send(&self, result: std::thread::Result<T>, mutex: *const WorkerLatch) {
// We want to lock here so that we can be sure that we wake the worker
// only if it was waiting, and not immediately after having received the
// result and waiting for further work:
// | thread 1 | thread 2 |
// | | | | |
// | send-> | | |
// | FINISHED | | |
// | | | poll() |
// | | | sleep() |
// | wake() | |
// | | | !woken! | // the worker has already received the result
// | | | | | // and is waiting for more work, it shouldn't
// | | | | | // be woken up here.
// | <-send | | |
//
// if we lock, it looks like this:
// | thread 1 | thread 2 |
// | | | | |
// | send-> | | |
// | lock() | | |
// | FINISHED | | |
// | | | poll() |
// | | | lock()-> | // thread 2 tries to lock.
// | wake() | | // the wake signal is ignored
// | | | |
// | unlock() | |
// | | | l=lock() | // thread2 wakes up and receives the lock
// | <-send | sleep(l) | // thread 2 is now sleeping
//
// This concludes my TED talk on why we need to lock here.
unsafe {
(&*mutex).lock();
}
let _guard = DropGuard::new(|| unsafe { (&*mutex).unlock() });
match result {
Ok(value) => {
let value = SmallBox::new(value);
let slot = unsafe { &mut *self.channel.value.get() };
slot.t = ManuallyDrop::new(value);
self.channel.tag.store(FINISHED, Ordering::Release)
}
Err(payload) => {
let slot = unsafe { &mut *self.channel.value.get() };
slot.u = ManuallyDrop::new(payload);
self.channel.tag.store(FINISHED | ERROR, Ordering::Release)
}
}
// wake the worker waiting on the mutex
unsafe {
(&*mutex).wake();
}
}
}
impl<T> JobReceiver<T> {
pub fn poll(&self) -> Option<std::thread::Result<T>> {
let tag = self.channel.tag.swap(EMPTY, Ordering::Acquire);
if tag == EMPTY {
return None;
}
// SAFETY: if we received a non-EMPTY tag, the value must be initialized.
// because we atomically set the taag to EMPTY, we can be sure that we're the only ones accessing the value.
let slot = unsafe { &mut *self.channel.value.get() };
if tag & ERROR != 0 {
// job failed, return the error
let err = unsafe { ManuallyDrop::take(&mut slot.u) };
Some(Err(err))
} else {
// job succeeded, return the value
let value = unsafe { ManuallyDrop::take(&mut slot.t) };
Some(Ok(value.into_inner()))
}
}
}
impl QueuedJob {
pub fn from_stackjob<F, T, L>(job: &StackJob<F, L>, mutex: *const WorkerLatch) -> Self
where
F: FnOnce() -> T + Send,
T: Send,
{
#[align(8)]
unsafe fn harness<F, T, L>(
this: *const (),
sender: *const JobSender,
mutex: *const WorkerLatch,
) where
F: FnOnce() -> T + Send,
T: Send,
{
use std::panic::{AssertUnwindSafe, catch_unwind};
let f = unsafe { (*this.cast::<StackJob<F, L>>()).unwrap() };
let result = catch_unwind(AssertUnwindSafe(|| f()));
unsafe {
(&*(sender as *const JobSender<T>)).send(result, mutex);
}
}
Self {
harness: TaggedAtomicPtr::new(harness::<F, T, L> as *mut usize, EMPTY),
this: unsafe { NonNull::new_unchecked(job as *const _ as *mut ()) },
mutex,
}
}
pub unsafe fn as_receiver(&self) -> &JobReceiver {
unsafe { &*(self as *const Self as *const JobReceiver) }
}
/// this function will drop `_self` and execute the job.
pub unsafe fn execute(_self: *mut Self) {
let (harness, this, sender, mutex) = unsafe {
let job = &*_self;
let harness: unsafe fn(*const (), *const JobSender, *const WorkerLatch) =
mem::transmute(job.harness.ptr(Ordering::Relaxed));
let sender = mem::transmute::<*const Self, *const JobSender>(_self);
let this = job.this;
let mutex = job.mutex;
(harness, this, sender, mutex)
};
unsafe {
// past this point, `_self` may no longer be a valid pointer to a `QueuedJob`.
(harness)(this.as_ptr(), sender, mutex);
}
}
}
mod queuedjobqueue {
//! Basically `JobVec`, but for `QueuedJob`s.
use std::collections::VecDeque;
use super::*;
pub struct JobQueue {
jobs: VecDeque<NonNull<QueuedJob>>,
}
impl JobQueue {
pub fn new() -> Self {
Self {
jobs: VecDeque::new(),
}
}
pub fn push_front(&mut self, job: *const QueuedJob) {
self.jobs
.push_front(unsafe { NonNull::new_unchecked(job as *mut _) });
}
pub fn push_back(&mut self, job: *const QueuedJob) {
self.jobs
.push_back(unsafe { NonNull::new_unchecked(job as *mut _) });
}
pub fn pop_front(&mut self) -> Option<NonNull<QueuedJob>> {
self.jobs.pop_front()
}
pub fn pop_back(&mut self) -> Option<NonNull<QueuedJob>> {
self.jobs.pop_back()
}
pub fn is_empty(&self) -> bool {
self.jobs.is_empty()
}
pub fn len(&self) -> usize {
self.jobs.len()
}
}
}