aaaaaaaaaaaa

This commit is contained in:
Janis 2025-02-08 04:52:18 +01:00
parent f34dc61984
commit 5c7f1345c4
4 changed files with 316 additions and 366 deletions

View file

@ -36,19 +36,43 @@ pub enum JobState {
Inline = 1 << (u8::BITS - 1), Inline = 1 << (u8::BITS - 1),
} }
pub struct Job<T = ()> { pub struct Job<T = (), S = ()> {
state: AtomicU8, state: AtomicU8,
this: SendPtr<()>, this: SendPtr<()>,
harness: unsafe fn(*const (), *const Job<()>), harness: unsafe fn(*const (), *const Job<()>, &mut S),
maybe_boxed_val: UnsafeCell<MaybeUninit<Value<T>>>, maybe_boxed_val: UnsafeCell<MaybeUninit<Value<T>>>,
waiting_thread: UnsafeCell<Option<Thread>>, waiting_thread: UnsafeCell<Option<Thread>>,
} }
impl<T> Job<T> { impl<T, S> Job<T, S> {
pub unsafe fn cast_box<U, V>(self: Box<Self>) -> Box<Job<U, V>>
where
T: Sized,
U: Sized,
{
let ptr = Box::into_raw(self);
Box::from_raw(ptr.cast())
}
pub unsafe fn cast<U, V>(self: &Self) -> &Job<U, V>
where
T: Sized,
U: Sized,
{
// SAFETY: both T and U are sized, so Box<T> and Box<U> should be the
// same size as well.
unsafe { mem::transmute(self) }
}
pub fn id(&self) -> impl Eq {
(self.this, self.harness)
}
pub fn state(&self) -> u8 { pub fn state(&self) -> u8 {
self.state.load(Ordering::Relaxed) & !(JobState::Inline as u8) self.state.load(Ordering::Relaxed) & !(JobState::Inline as u8)
} }
pub fn wait(&self) -> Option<T> { pub fn wait(&self) -> T {
let mut state = self.state.load(Ordering::Relaxed); let mut state = self.state.load(Ordering::Relaxed);
let mask = JobState::Inline as u8; let mask = JobState::Inline as u8;
@ -70,7 +94,7 @@ impl<T> Job<T> {
.store(JobState::Pending as u8 | (state & mask), Ordering::Release); .store(JobState::Pending as u8 | (state & mask), Ordering::Release);
std::thread::park(); std::thread::park();
spin.reset(); spin.reset();
break; continue;
} }
Err(x) => { Err(x) => {
if x & JobState::Finished as u8 != 0 { if x & JobState::Finished as u8 != 0 {
@ -78,14 +102,13 @@ impl<T> Job<T> {
let value = (&*self.maybe_boxed_val.get()).assume_init_read(); let value = (&*self.maybe_boxed_val.get()).assume_init_read();
value.get(state & JobState::Inline as u8 != 0) value.get(state & JobState::Inline as u8 != 0)
}; };
return Some(val); return val;
} else { } else {
spin.spin(); spin.spin();
} }
} }
} }
} }
return None;
} }
/// call this when popping value from local queue /// call this when popping value from local queue
pub fn set_pending(&self) { pub fn set_pending(&self) {
@ -110,9 +133,9 @@ impl<T> Job<T> {
} }
} }
pub fn execute(&self) { pub fn execute(&self, s: &mut S) {
// SAFETY: self is non-null // SAFETY: self is non-null
unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast()) }; unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast(), s) };
} }
fn complete(&self, result: T) { fn complete(&self, result: T) {
@ -166,14 +189,14 @@ impl<F> HeapJob<F> {
pub fn new(f: F) -> Box<Self> { pub fn new(f: F) -> Box<Self> {
Box::new(Self { f }) Box::new(Self { f })
} }
pub fn into_boxed_job<T>(self: Box<Self>) -> Box<Job<()>> pub fn into_boxed_job<T, S>(self: Box<Self>) -> Box<Job<(), S>>
where where
F: FnOnce() -> T + Send, F: FnOnce(&mut S) -> T + Send,
T: Send, T: Send,
{ {
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>) unsafe fn harness<F, T, S>(this: *const (), job: *const Job<()>, s: &mut S)
where where
F: FnOnce() -> T + Send, F: FnOnce(&mut S) -> T + Send,
T: Sized + Send, T: Sized + Send,
{ {
let job = unsafe { &*job.cast::<Job<T>>() }; let job = unsafe { &*job.cast::<Job<T>>() };
@ -181,7 +204,7 @@ impl<F> HeapJob<F> {
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) }; let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
let f = this.f; let f = this.f;
job.complete(f()); job.complete(f(s));
} }
let size = mem::size_of::<T>(); let size = mem::size_of::<T>();
@ -197,12 +220,18 @@ impl<F> HeapJob<F> {
state: AtomicU8::new(new_state), state: AtomicU8::new(new_state),
this: SendPtr::new(Box::into_raw(self)).unwrap().cast(), this: SendPtr::new(Box::into_raw(self)).unwrap().cast(),
waiting_thread: UnsafeCell::new(None), waiting_thread: UnsafeCell::new(None),
harness: harness::<F, T>, harness: harness::<F, T, S>,
maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()),
}) })
} }
} }
impl<T, S> crate::latch::Probe for &Job<T, S> {
fn probe(&self) -> bool {
self.state() == JobState::Finished as u8
}
}
pub struct StackJob<F> { pub struct StackJob<F> {
f: UnsafeCell<ManuallyDrop<F>>, f: UnsafeCell<ManuallyDrop<F>>,
} }
@ -218,14 +247,14 @@ impl<F> StackJob<F> {
unsafe { ManuallyDrop::take(&mut *self.f.get()) } unsafe { ManuallyDrop::take(&mut *self.f.get()) }
} }
pub fn as_boxed_job<T>(&self) -> Box<Job<()>> pub fn as_job<T, S>(&self) -> Job<T, S>
where where
F: FnOnce() -> T + Send, F: FnOnce(&mut S) -> T + Send,
T: Send, T: Send,
{ {
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>) unsafe fn harness<F, T, S>(this: *const (), job: *const Job<()>, s: &mut S)
where where
F: FnOnce() -> T + Send, F: FnOnce(&mut S) -> T + Send,
T: Sized + Send, T: Sized + Send,
{ {
let job = unsafe { &*job.cast::<Job<T>>() }; let job = unsafe { &*job.cast::<Job<T>>() };
@ -233,7 +262,7 @@ impl<F> StackJob<F> {
let this = unsafe { &*this.cast::<StackJob<F>>() }; let this = unsafe { &*this.cast::<StackJob<F>>() };
let f = unsafe { this.unwrap() }; let f = unsafe { this.unwrap() };
job.complete(f()); job.complete(f(s));
} }
let size = mem::size_of::<T>(); let size = mem::size_of::<T>();
@ -245,12 +274,12 @@ impl<F> StackJob<F> {
JobState::Inline as u8 JobState::Inline as u8
}; };
Box::new(Job { Job {
state: AtomicU8::new(new_state), state: AtomicU8::new(new_state),
this: SendPtr::new(self).unwrap().cast(), this: SendPtr::new(self).unwrap().cast(),
waiting_thread: UnsafeCell::new(None), waiting_thread: UnsafeCell::new(None),
harness: harness::<F, T>, harness: harness::<F, T, S>,
maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()),
}) }
} }
} }

View file

@ -1,3 +1,5 @@
#![feature(vec_deque_pop_if)]
use std::{ use std::{
cell::{Cell, UnsafeCell}, cell::{Cell, UnsafeCell},
future::Future, future::Future,

View file

@ -17,295 +17,299 @@ use parking_lot::{Condvar, Mutex};
use crate::{latch::*, ThreadControl}; use crate::{latch::*, ThreadControl};
mod job { mod job {
use std::{ use core::{
cell::{Cell, UnsafeCell}, cell::UnsafeCell,
collections::VecDeque, mem::{self, ManuallyDrop, MaybeUninit},
mem::ManuallyDrop,
panic::{self, AssertUnwindSafe},
ptr::NonNull,
sync::atomic::{AtomicU8, Ordering}, sync::atomic::{AtomicU8, Ordering},
thread::{self, Thread},
}; };
use std::thread::Thread;
use super::WorkerThread as Scope; use parking_lot_core::SpinWait;
enum Poll { use crate::util::SendPtr;
use super::WorkerThread;
#[allow(dead_code)]
#[cfg_attr(target_pointer_width = "64", repr(align(16)))]
#[cfg_attr(target_pointer_width = "32", repr(align(8)))]
#[derive(Debug, Default, Clone, Copy)]
struct Size2([usize; 2]);
struct Value<T>(pub MaybeUninit<Box<MaybeUninit<T>>>);
impl<T> Value<T> {
unsafe fn get(self, inline: bool) -> T {
if inline {
unsafe { mem::transmute_copy(&self.0) }
} else {
unsafe { (*self.0.assume_init()).assume_init() }
}
}
}
#[repr(u8)]
pub enum JobState {
Empty,
Locked = 1,
Pending, Pending,
Ready, Finished,
Locked, Inline = 1 << (u8::BITS - 1),
} }
#[derive(Debug, Default)] pub struct Job<T = ()> {
pub struct Future<T = ()> {
state: AtomicU8, state: AtomicU8,
/// Can only be accessed if `state` is `Poll::Locked`. this: SendPtr<()>,
harness: unsafe fn(*const (), *const Job<()>, &mut WorkerThread),
maybe_boxed_val: UnsafeCell<MaybeUninit<Value<T>>>,
waiting_thread: UnsafeCell<Option<Thread>>, waiting_thread: UnsafeCell<Option<Thread>>,
/// Can only be written if `state` is `Poll::Locked` and read if `state` is
/// `Poll::Ready`.
val: UnsafeCell<Option<Box<thread::Result<T>>>>,
} }
impl<T> Future<T> { impl<T> Job<T> {
pub fn poll(&self) -> bool { pub unsafe fn cast_box<U>(self: Box<Self>) -> Box<Job<U>>
self.state.load(Ordering::Acquire) == Poll::Ready as u8 where
T: Sized,
U: Sized,
{
let ptr = Box::into_raw(self);
Box::from_raw(ptr.cast())
} }
pub fn wait(&self) -> Option<thread::Result<T>> { pub unsafe fn cast<U>(self: &Self) -> &Job<U>
where
T: Sized,
U: Sized,
{
// SAFETY: both T and U are sized, so Box<T> and Box<U> should be the
// same size as well.
unsafe { mem::transmute(self) }
}
pub fn state(&self) -> u8 {
self.state.load(Ordering::Relaxed) & !(JobState::Inline as u8)
}
pub fn wait(&self) -> T {
let mut state = self.state.load(Ordering::Relaxed);
let mask = JobState::Inline as u8;
let mut spin = SpinWait::new();
loop { loop {
let result = self.state.compare_exchange( match self.state.compare_exchange(
Poll::Pending as u8, JobState::Pending as u8 | (state & mask),
Poll::Locked as u8, JobState::Locked as u8 | (state & mask),
Ordering::AcqRel,
Ordering::Acquire, Ordering::Acquire,
); Ordering::Relaxed,
) {
Ok(x) => {
state = x;
unsafe {
*self.waiting_thread.get() = Some(std::thread::current());
}
match result { self.state
Ok(_) => { .store(JobState::Pending as u8 | (state & mask), Ordering::Release);
// SAFETY: std::thread::park();
// Lock is acquired, only we are accessing `self.waiting_thread`. spin.reset();
unsafe { *self.waiting_thread.get() = Some(thread::current()) };
self.state.store(Poll::Pending as u8, Ordering::Release);
thread::park();
// Skip yielding after being woken up.
continue; continue;
} }
Err(state) if state == Poll::Ready as u8 => { Err(x) => {
// SAFETY: if x & JobState::Finished as u8 != 0 {
// `state` is `Poll::Ready` only after `Self::complete` let val = unsafe {
// releases the lock. let value = (&*self.maybe_boxed_val.get()).assume_init_read();
// value.get(state & JobState::Inline as u8 != 0)
// Calling `Self::complete` when `state` is `Poll::Ready` };
// cannot mutate `self.val`. return val;
break unsafe { (*self.val.get()).take().map(|b| *b) }; } else {
spin.spin();
} }
_ => (),
} }
}
}
}
/// call this when popping value from local queue
pub fn set_pending(&self) {
let state = self.state.load(Ordering::Relaxed);
let mask = JobState::Inline as u8;
thread::yield_now(); let mut spin = SpinWait::new();
}
}
pub fn complete(&self, val: thread::Result<T>) {
let val = Box::new(val);
loop { loop {
let result = self.state.compare_exchange( match self.state.compare_exchange(
Poll::Pending as u8, JobState::Empty as u8 | (state & mask),
Poll::Locked as u8, JobState::Pending as u8 | (state & mask),
Ordering::AcqRel,
Ordering::Acquire, Ordering::Acquire,
); Ordering::Relaxed,
) {
match result { Ok(_) => {
Ok(_) => break, return;
Err(_) => thread::yield_now(), }
Err(_) => {
spin.spin();
}
}
}
}
pub fn execute(&self, s: &mut WorkerThread) {
// SAFETY: self is non-null
unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast(), s) };
}
fn complete(&self, result: T) {
let mut state = self.state.load(Ordering::Relaxed);
let mask = JobState::Inline as u8;
let mut spin = SpinWait::new();
loop {
match self.state.compare_exchange(
JobState::Pending as u8 | (state & mask),
JobState::Locked as u8 | (state & mask),
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(x) => {
state = x;
break;
}
Err(_) => {
spin.spin();
}
} }
} }
// SAFETY:
// Lock is acquired, only we are accessing `self.val`.
unsafe { unsafe {
*self.val.get() = Some(val); let value = (&mut *self.maybe_boxed_val.get()).assume_init_mut();
// SAFETY: we know the box is allocated if state was `Pending`.
if state & JobState::Inline as u8 == 0 {
value.0 = MaybeUninit::new(Box::new(MaybeUninit::new(result)));
} else {
*mem::transmute::<_, &mut T>(&mut value.0) = result;
}
} }
// SAFETY: if let Some(thread) = unsafe { &mut *self.waiting_thread.get() }.take() {
// Lock is acquired, only we are accessing `self.waiting_thread`.
if let Some(thread) = unsafe { (*self.waiting_thread.get()).take() } {
thread.unpark(); thread.unpark();
} }
self.state.store(Poll::Ready as u8, Ordering::Release); self.state
.store(JobState::Finished as u8 | (state & mask), Ordering::Release);
} }
} }
pub struct JobStack<F = ()> { impl Job {}
/// All code paths should call either `Job::execute` or `Self::unwrap` to
/// avoid a potential memory leak. pub struct HeapJob<F> {
f: F,
}
impl<F> HeapJob<F> {
pub fn new(f: F) -> Box<Self> {
Box::new(Self { f })
}
pub fn into_boxed_job<T>(self: Box<Self>) -> Box<Job<T>>
where
F: FnOnce(&mut WorkerThread) -> T + Send,
T: Send,
{
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>, s: &mut WorkerThread)
where
F: FnOnce(&mut WorkerThread) -> T + Send,
T: Sized + Send,
{
let job = unsafe { &*job.cast::<Job<T>>() };
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
let f = this.f;
job.complete(f(s));
}
let size = mem::size_of::<T>();
let align = mem::align_of::<T>();
let new_state = if size > mem::size_of::<Box<T>>() || align > mem::align_of::<Box<T>>()
{
JobState::Empty as u8
} else {
JobState::Inline as u8
};
Box::new(Job {
state: AtomicU8::new(new_state),
this: SendPtr::new(Box::into_raw(self)).unwrap().cast(),
waiting_thread: UnsafeCell::new(None),
harness: harness::<F, T>,
maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()),
})
}
}
impl<T> crate::latch::Probe for &Job<T> {
fn probe(&self) -> bool {
self.state() == JobState::Finished as u8
}
}
pub struct StackJob<F> {
f: UnsafeCell<ManuallyDrop<F>>, f: UnsafeCell<ManuallyDrop<F>>,
} }
impl<F> JobStack<F> { impl<F> StackJob<F> {
pub fn new(f: F) -> Self { pub fn new(f: F) -> Self {
Self { Self {
f: UnsafeCell::new(ManuallyDrop::new(f)), f: UnsafeCell::new(ManuallyDrop::new(f)),
} }
} }
/// SAFETY: pub unsafe fn unwrap(&self) -> F {
/// It should only be called once.
pub unsafe fn take_once(&self) -> F {
// SAFETY:
// No `Job` has has been executed, therefore `self.f` has not yet been
// `take`n.
unsafe { ManuallyDrop::take(&mut *self.f.get()) } unsafe { ManuallyDrop::take(&mut *self.f.get()) }
} }
}
/// `Job` is only sent, not shared between threads. pub fn as_job<T>(&self) -> Job<T>
///
/// When popped from the `JobQueue`, it gets copied before sending across
/// thread boundaries.
#[derive(Clone, Debug)]
pub struct Job<T = ()> {
stack: NonNull<JobStack>,
harness: unsafe fn(&mut Scope, NonNull<JobStack>, NonNull<Future>),
fut: Cell<Option<NonNull<Future<T>>>>,
}
impl<T> Job<T> {
pub fn new<F>(stack: &JobStack<F>) -> Self
where where
F: FnOnce(&mut Scope) -> T + Send, F: FnOnce(&mut WorkerThread) -> T + Send,
T: Send, T: Send,
{ {
/// SAFETY: unsafe fn harness<F, T>(this: *const (), job: *const Job<()>, s: &mut WorkerThread)
/// It should only be called while the `stack` is still alive. where
unsafe fn harness<F, T>( F: FnOnce(&mut WorkerThread) -> T + Send,
scope: &mut Scope,
stack: NonNull<JobStack>,
fut: NonNull<Future>,
) where
F: FnOnce(&mut Scope) -> T + Send,
T: Send, T: Send,
{ {
// SAFETY: let job = unsafe { &*job.cast::<Job<T>>() };
// The `stack` is still alive.
let stack: &JobStack<F> = unsafe { stack.cast().as_ref() };
// SAFETY:
// This is the first call to `take_once` since `Job::execute`
// (the only place where this harness is called) is called only
// after the job has been popped.
let f = unsafe { stack.take_once() };
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let fut: &Future<T> = unsafe { fut.cast().as_ref() };
fut.complete(panic::catch_unwind(AssertUnwindSafe(|| f(scope)))); let this = unsafe { &*this.cast::<StackJob<F>>() };
let f = unsafe { this.unwrap() };
job.complete(f(s));
} }
Self { let size = mem::size_of::<T>();
stack: NonNull::from(stack).cast(), let align = mem::align_of::<T>();
let new_state = if size > mem::size_of::<Box<T>>() || align > mem::align_of::<Box<T>>()
{
JobState::Empty as u8
} else {
JobState::Inline as u8
};
Job {
state: AtomicU8::new(new_state),
this: SendPtr::new(self).unwrap().cast(),
waiting_thread: UnsafeCell::new(None),
harness: harness::<F, T>, harness: harness::<F, T>,
fut: Cell::new(None), maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()),
} }
} }
pub fn is_waiting(&self) -> bool {
self.fut.get().is_none()
}
pub fn eq(&self, other: &Job) -> bool {
self.stack == other.stack
}
/// SAFETY:
/// It should only be called after being popped from a `JobQueue`.
pub unsafe fn poll(&self) -> bool {
self.fut
.get()
.map(|fut| {
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let fut = unsafe { fut.as_ref() };
fut.poll()
})
.unwrap_or_default()
}
/// SAFETY:
/// It should only be called after being popped from a `JobQueue`.
pub unsafe fn wait(&self) -> Option<thread::Result<T>> {
self.fut.get().and_then(|fut| {
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let result = unsafe { fut.as_ref().wait() };
// SAFETY:
// We only can drop the `Box` *after* waiting on the `Future`
// in order to ensure unique access.
unsafe {
drop(Box::from_raw(fut.as_ptr()));
}
result
})
}
/// SAFETY:
/// It should only be called in the case where the job has been popped
/// from the front and will not be `Job::Wait`ed.
pub unsafe fn drop(&self) {
if let Some(fut) = self.fut.get() {
// SAFETY:
// Before being popped, the `JobQueue` allocates and store a
// `Future` in `self.fur_or_next` that should get passed here.
unsafe {
drop(Box::from_raw(fut.as_ptr()));
}
}
}
}
impl Job {
/// SAFETY:
/// It should only be called while the `JobStack` it was created with is
/// still alive and after being popped from a `JobQueue`.
pub unsafe fn execute(&self, scope: &mut Scope) {
// SAFETY:
// Before being popped, the `JobQueue` allocates and store a
// `Future` in `self.fur_or_next` that should get passed here.
unsafe {
(self.harness)(scope, self.stack, self.fut.get().unwrap());
}
}
}
// SAFETY:
// The job's `stack` will only be accessed after acquiring a lock (in
// `Future`), while `prev` and `fut_or_next` are never accessed after being
// sent across threads.
unsafe impl Send for Job {}
#[derive(Debug, Default)]
pub struct JobQueue(VecDeque<NonNull<Job>>);
impl JobQueue {
pub fn len(&self) -> usize {
self.0.len()
}
/// SAFETY:
/// Any `Job` pushed onto the queue should alive at least until it gets
/// popped.
pub unsafe fn push_back<T>(&mut self, job: &Job<T>) {
self.0.push_back(NonNull::from(job).cast());
}
pub fn pop_back(&mut self) {
self.0.pop_back();
}
pub fn pop_front(&mut self) -> Option<Job> {
// SAFETY:
// `Job` is still alive as per contract in `push_back`.
let job = unsafe { self.0.pop_front()?.as_ref() };
job.fut
.set(Some(Box::leak(Box::new(Future::default())).into()));
Some(job.clone())
}
} }
} }
//use job::{Future, Job, JobQueue, JobStack}; //use job::{Future, Job, JobQueue, JobStack};
use crate::job::v2::{Job, JobState, StackJob}; use crate::job::v2::{Job as JobArchetype, JobState, StackJob};
// use crate::job::{Job, JobRef, StackJob}; // use crate::job::{Job, JobRef, StackJob};
type Job<T = ()> = JobArchetype<T, WorkerThread>;
struct ThreadState { struct ThreadState {
control: ThreadControl, control: ThreadControl,
} }
@ -320,6 +324,9 @@ pub struct SharedContext {
rng: crate::rng::XorShift64Star, rng: crate::rng::XorShift64Star,
} }
// SAFETY: Job is Send
unsafe impl Send for SharedContext {}
pub struct Context { pub struct Context {
shared: Mutex<SharedContext>, shared: Mutex<SharedContext>,
threads: Box<[CachePadded<ThreadState>]>, threads: Box<[CachePadded<ThreadState>]>,
@ -360,17 +367,13 @@ impl SharedContext {
.next() .next()
} }
fn pop_random_task(&mut self) -> Option<JobRef> { fn pop_random_task(&mut self) -> Option<NonNull<Job>> {
let i = self.rng.next_usize(self.shared_tasks.len()); let i = self.rng.next_usize(self.shared_tasks.len());
let (a, b) = self.shared_tasks.split_at_mut(i); let (a, b) = self.shared_tasks.split_at_mut(i);
a.into_iter().chain(b).filter_map(|task| task.take()).next() a.into_iter().chain(b).filter_map(|task| task.take()).next()
} }
} }
std::thread_local! {
static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null())};
}
pub struct WorkerThread { pub struct WorkerThread {
context: Arc<Context>, context: Arc<Context>,
index: usize, index: usize,
@ -381,6 +384,9 @@ pub struct WorkerThread {
_marker: PhantomPinned, _marker: PhantomPinned,
} }
// SAFETY: Job is Send
unsafe impl Send for WorkerThread {}
impl WorkerThread { impl WorkerThread {
fn new(context: Arc<Context>, heartbeat: Arc<AtomicBool>, index: usize) -> WorkerThread { fn new(context: Arc<Context>, heartbeat: Arc<AtomicBool>, index: usize) -> WorkerThread {
WorkerThread { WorkerThread {
@ -393,23 +399,6 @@ impl WorkerThread {
_marker: PhantomPinned, _marker: PhantomPinned,
} }
} }
unsafe fn set_current(this: *const Self) {
WORKER_THREAD_STATE.with(|ptr| {
assert!(ptr.get().is_null());
ptr.set(this);
});
}
unsafe fn unset_current() {
WORKER_THREAD_STATE.with(|ptr| {
assert!(!ptr.get().is_null());
ptr.set(ptr::null());
});
}
unsafe fn current() -> *const WorkerThread {
let ptr = WORKER_THREAD_STATE.with(|ptr| ptr.get());
ptr
}
fn state(&self) -> &CachePadded<ThreadState> { fn state(&self) -> &CachePadded<ThreadState> {
&self.context.threads[self.index] &self.context.threads[self.index]
} }
@ -422,89 +411,33 @@ impl WorkerThread {
fn ctx(&self) -> &Arc<Context> { fn ctx(&self) -> &Arc<Context> {
&self.context &self.context
} }
fn with<T, F: FnOnce(Option<&WorkerThread>) -> T>(f: F) -> T {
WORKER_THREAD_STATE.with(|worker| {
f(unsafe { NonNull::new(worker.get().cast_mut()).map(|ptr| ptr.as_ref()) })
})
}
fn with_mut<T, F: FnOnce(Option<&mut WorkerThread>) -> T>(f: F) -> T {
WORKER_THREAD_STATE.with(|worker| {
f(unsafe { NonNull::new(worker.get().cast_mut()).map(|mut ptr| ptr.as_mut()) })
})
}
} }
struct CurrentWorker;
impl Deref for CurrentWorker {
type Target = WorkerThread;
fn deref(&self) -> &Self::Target {
unsafe {
NonNull::new(WorkerThread::current().cast_mut())
.unwrap()
.as_ref()
}
}
}
impl DerefMut for CurrentWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
NonNull::new(WorkerThread::current().cast_mut())
.unwrap()
.as_mut()
}
}
}
// impl Drop for WorkerThread {
// fn drop(&mut self) {
// WORKER_THREAD_STATE.with(|ptr| {
// assert!(!ptr.get().is_null());
// ptr.set(ptr::null());
// });
// }
// }
impl WorkerThread { impl WorkerThread {
fn worker(self) { fn worker(mut self) {
{ self.control().notify_running();
let worker = Box::leak(Box::new(self));
unsafe {
WorkerThread::set_current(worker);
}
}
CurrentWorker.control().notify_running();
loop { loop {
let task = { CurrentWorker.shared().lock().pop_first_task() }; let task = { self.shared().lock().pop_first_task() };
if let Some(task) = task { if let Some(task) = task {
CurrentWorker.execute_job(task); self.execute_job(task);
} }
if CurrentWorker.control().should_terminate.probe() { if self.control().should_terminate.probe() {
break; break;
} }
let mut guard = CurrentWorker.shared().lock(); let mut guard = self.shared().lock();
CurrentWorker.ctx().task_shared.wait(&mut guard); self.ctx().task_shared.wait(&mut guard);
} }
CurrentWorker.control().notify_termination(); self.control().notify_termination();
unsafe {
let worker = Box::from_raw(WorkerThread::current().cast_mut());
WorkerThread::unset_current();
}
} }
fn execute_job(&mut self, job: NonNull<Job>) { fn execute_job(&mut self, job: NonNull<Job>) {
unsafe { unsafe {
job.as_ref().execute(); job.as_ref().execute(self);
} }
} }
@ -568,44 +501,30 @@ impl WorkerThread {
RA: Send, RA: Send,
RB: Send, RB: Send,
{ {
let mut ra = None; let b = StackJob::new(b);
let latch = AtomicLatch::new();
let a = |scope: &mut WorkerThread| {
if scope.heartbeat.load(Ordering::Relaxed) {
scope.heartbeat_cold();
}
ra = Some(a(scope)); let job = Box::new(b.as_job());
unsafe { self.queue
Latch::set_raw(&latch); .push_back(unsafe { NonNull::new_unchecked(&job as *const _ as *mut _) });
} let job = unsafe { job.cast_box::<RB, Self>() };
let ra = a(self);
let rb = if job.state() == JobState::Empty as u8 {
self.pop_job_id(unsafe { job.as_ref().cast() });
unsafe { b.unwrap()(self) }
} else {
self.run_until(&job.as_ref());
job.wait()
}; };
let stack = StackJob::new(a); (ra, rb)
let task: JobRef =
unsafe { core::mem::transmute::<JobRef<_>, JobRef>(stack.as_task_ref()) };
let id = task.id();
self.queue.push_back(task);
let rb = b(self);
if !latch.probe() {
if let Some(job) = self.queue.pop_back() {
if job.id() == id {
unsafe {
(stack.take_once())(self);
}
return (ra.unwrap(), rb);
} else {
self.queue.push_back(job);
}
}
} }
self.run_until(&latch); fn pop_job_id(&mut self, id: &Job) -> Option<&Job<()>> {
self.queue
(ra.unwrap(), rb) .pop_back_if(|job| unsafe { (&*job).as_ref().id() == id.id() })
.map(|job| unsafe { job.as_ref() })
} }
fn run_until<L: Probe>(&mut self, latch: &L) { fn run_until<L: Probe>(&mut self, latch: &L) {

View file

@ -4,7 +4,7 @@ use core::{
ptr::NonNull, ptr::NonNull,
}; };
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] #[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Copy)]
#[repr(transparent)] #[repr(transparent)]
pub struct SendPtr<T>(NonNull<T>); pub struct SendPtr<T>(NonNull<T>);