executor/distaff/src/job.rs
2025-07-01 21:24:51 +02:00

322 lines
8 KiB
Rust

use core::{
cell::UnsafeCell,
fmt::Debug,
mem::{self, ManuallyDrop},
ptr::NonNull,
};
use std::cell::Cell;
use alloc::boxed::Box;
use crate::{
WorkerThread,
channel::{Parker, Sender},
};
#[repr(transparent)]
pub struct StackJob<F> {
f: UnsafeCell<ManuallyDrop<F>>,
}
impl<F> StackJob<F> {
pub fn new(f: F) -> Self {
Self {
f: UnsafeCell::new(ManuallyDrop::new(f)),
}
}
pub unsafe fn unwrap(&self) -> F {
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
}
}
pub struct HeapJob<F> {
f: F,
}
impl<F> HeapJob<F> {
pub fn new(f: F) -> Box<Self> {
Box::new(Self { f })
}
pub fn into_inner(self) -> F {
self.f
}
}
type JobHarness =
unsafe fn(&WorkerThread, this: NonNull<()>, sender: Option<crate::channel::Sender>);
#[repr(C)]
pub struct Job2<T = ()> {
harness: JobHarness,
this: NonNull<()>,
receiver: Cell<Option<crate::channel::Receiver<T>>>,
}
impl<T> Debug for Job2<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Job2")
.field("harness", &self.harness)
.field("this", &self.this)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct SharedJob {
harness: JobHarness,
this: NonNull<()>,
sender: Option<crate::channel::Sender>,
}
impl<T: Send> Job2<T> {
fn new(harness: JobHarness, this: NonNull<()>) -> Self {
let this = Self {
harness,
this,
receiver: Cell::new(None),
};
#[cfg(feature = "tracing")]
tracing::trace!("new job: {:?}", this);
this
}
pub fn share(&self, parker: Option<&Parker>) -> SharedJob {
#[cfg(feature = "tracing")]
tracing::trace!("sharing job: {:?}", self);
let (sender, receiver) = parker
.map(|parker| crate::channel::channel::<T>(parker.into()))
.unzip();
self.receiver.set(receiver);
SharedJob {
harness: self.harness,
this: self.this,
sender: unsafe { mem::transmute(sender) },
}
}
pub fn take_receiver(&self) -> Option<crate::channel::Receiver<T>> {
self.receiver.take()
}
pub fn from_stackjob<F>(job: &StackJob<F>) -> Self
where
F: FnOnce(&WorkerThread) -> T + Send,
{
#[align(8)]
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", skip_all, name = "stack_job_harness")
)]
unsafe fn harness<F, T>(worker: &WorkerThread, this: NonNull<()>, sender: Option<Sender>)
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
use std::panic::{AssertUnwindSafe, catch_unwind};
let f = unsafe { this.cast::<StackJob<F>>().as_ref().unwrap() };
let sender: Sender<T> = unsafe { mem::transmute(sender) };
// #[cfg(feature = "metrics")]
// if worker.heartbeat.parker() == mutex {
// worker
// .metrics
// .num_sent_to_self
// .fetch_add(1, Ordering::Relaxed);
// tracing::trace!("job sent to self");
// }
sender.send(catch_unwind(AssertUnwindSafe(|| f(worker))));
}
Self::new(harness::<F, T>, NonNull::from(job).cast())
}
pub fn from_heapjob<F>(job: Box<HeapJob<F>>) -> Self
where
F: FnOnce(&WorkerThread) -> T + Send,
{
#[align(8)]
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", skip_all, name = "heap_job_harness")
)]
unsafe fn harness<F, T>(worker: &WorkerThread, this: NonNull<()>, sender: Option<Sender>)
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
use std::panic::{AssertUnwindSafe, catch_unwind};
// expect MIRI to complain about this, but it is actually correct.
// because I am so much smarter than MIRI, naturally, obviously.
// unbox the job, which was allocated at (2)
let f = unsafe { (*Box::from_non_null(this.cast::<HeapJob<F>>())).into_inner() };
let result = catch_unwind(AssertUnwindSafe(|| f(worker)));
let sender: Option<Sender<T>> = unsafe { mem::transmute(sender) };
if let Some(sender) = sender {
sender.send(result);
}
}
// (1) allocate box for job
Self::new(
harness::<F, T>,
// (2) convert job into a pointer
Box::into_non_null(job).cast(),
)
}
pub fn from_harness(harness: JobHarness, this: NonNull<()>) -> Self {
Self::new(harness, this)
}
pub fn is_shared(&self) -> bool {
unsafe { (&*self.receiver.as_ptr()).is_some() }
}
}
impl SharedJob {
pub unsafe fn execute(self, worker: &WorkerThread) {
#[cfg(feature = "tracing")]
tracing::trace!("executing shared job: {:?}", self);
let Self {
harness,
this,
sender,
} = self;
unsafe {
(harness)(worker, this, sender);
}
#[cfg(feature = "tracing")]
tracing::trace!("finished executing shared job: {:?}", this);
}
}
pub use queuedjobqueue::JobQueue;
mod queuedjobqueue {
//! Basically `JobVec`, but for `QueuedJob`s.
// TODO: use non-null's here and rely on Into/From for &T
use std::{collections::VecDeque, ptr::NonNull};
use super::Job2 as Job;
#[derive(Debug)]
pub struct JobQueue {
jobs: VecDeque<NonNull<Job>>,
}
impl JobQueue {
pub fn new() -> Self {
Self {
jobs: VecDeque::new(),
}
}
pub fn push_front(&mut self, job: *const Job) {
self.jobs
.push_front(unsafe { NonNull::new_unchecked(job as *mut _) });
}
pub fn push_back(&mut self, job: *const Job) {
self.jobs
.push_back(unsafe { NonNull::new_unchecked(job as *mut _) });
}
pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
self.jobs.pop_front()
}
pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
self.jobs.pop_back()
}
pub fn is_empty(&self) -> bool {
self.jobs.is_empty()
}
pub fn len(&self) -> usize {
self.jobs.len()
}
}
}
pub mod traits {
use std::{cell::UnsafeCell, mem::ManuallyDrop};
use crate::WorkerThread;
use super::{HeapJob, Job2, StackJob};
pub trait IntoJob<T> {
fn into_job(self) -> Job2<T>;
}
pub trait InlineJob<T>: IntoJob<T> {
fn run_inline(self, worker: &WorkerThread) -> T;
}
impl<F, T> IntoJob<T> for F
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn into_job(self) -> Job2<T> {
Job2::from_heapjob(HeapJob::new(self))
}
}
impl<F, T> IntoJob<T> for &UnsafeCell<ManuallyDrop<F>>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn into_job(self) -> Job2<T> {
Job2::from_stackjob(unsafe { std::mem::transmute::<Self, &StackJob<F>>(self) })
}
}
impl<F, T> InlineJob<T> for &UnsafeCell<ManuallyDrop<F>>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn run_inline(self, worker: &WorkerThread) -> T {
unsafe { ManuallyDrop::take(&mut *self.get())(worker) }
}
}
impl<F, T> IntoJob<T> for &StackJob<F>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn into_job(self) -> Job2<T> {
Job2::from_stackjob(self)
}
}
impl<F, T> InlineJob<T> for &StackJob<F>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn run_inline(self, worker: &WorkerThread) -> T {
unsafe { self.unwrap()(worker) }
}
}
}