1216 lines
36 KiB
Rust
1216 lines
36 KiB
Rust
mod util {
|
|
use std::{
|
|
cell::{Cell, UnsafeCell},
|
|
marker::PhantomData,
|
|
mem::ManuallyDrop,
|
|
num::NonZero,
|
|
ops::{Deref, DerefMut},
|
|
ptr::NonNull,
|
|
sync::atomic::{AtomicPtr, Ordering},
|
|
};
|
|
|
|
pub struct DropGuard<F: FnOnce()>(UnsafeCell<ManuallyDrop<F>>);
|
|
|
|
impl<F> DropGuard<F>
|
|
where
|
|
F: FnOnce(),
|
|
{
|
|
pub fn new(f: F) -> DropGuard<F> {
|
|
Self(UnsafeCell::new(ManuallyDrop::new(f)))
|
|
}
|
|
}
|
|
|
|
impl<F> Drop for DropGuard<F>
|
|
where
|
|
F: FnOnce(),
|
|
{
|
|
fn drop(&mut self) {
|
|
unsafe {
|
|
ManuallyDrop::take(&mut *self.0.get())();
|
|
}
|
|
}
|
|
}
|
|
|
|
#[repr(transparent)]
|
|
pub struct TaggedAtomicPtr<T, const BITS: usize>(AtomicPtr<()>, PhantomData<T>);
|
|
|
|
impl<T, const BITS: usize> TaggedAtomicPtr<T, BITS> {
|
|
const fn mask() -> usize {
|
|
!(!0usize << BITS)
|
|
}
|
|
|
|
pub fn new(ptr: *mut T, tag: usize) -> TaggedAtomicPtr<T, BITS> {
|
|
assert!(core::mem::align_of::<T>().ilog2() as usize >= BITS);
|
|
let mask = Self::mask();
|
|
Self(
|
|
AtomicPtr::new(ptr.with_addr((ptr.addr() & !mask) | (tag & mask)).cast()),
|
|
PhantomData,
|
|
)
|
|
}
|
|
|
|
pub fn ptr(&self, order: Ordering) -> NonNull<T> {
|
|
unsafe {
|
|
NonNull::new_unchecked(self.0.load(order) as _)
|
|
.map_addr(|addr| NonZero::new_unchecked(addr.get() & !Self::mask()))
|
|
}
|
|
}
|
|
|
|
pub fn tag(&self, order: Ordering) -> usize {
|
|
self.0.load(order).addr() & Self::mask()
|
|
}
|
|
|
|
/// returns tag
|
|
#[inline]
|
|
pub fn compare_exchange_weak_tag(
|
|
&self,
|
|
old: usize,
|
|
new: usize,
|
|
success: Ordering,
|
|
failure: Ordering,
|
|
) -> Result<usize, usize> {
|
|
let mask = Self::mask();
|
|
let old_ptr = self.0.load(failure);
|
|
|
|
let old = old_ptr.with_addr((old_ptr.addr() & !mask) | (old & mask));
|
|
let new = old_ptr.with_addr((old_ptr.addr() & !mask) | (new & mask));
|
|
|
|
let result = self.0.compare_exchange_weak(old, new, success, failure);
|
|
|
|
result
|
|
.map(|ptr| ptr.addr() & mask)
|
|
.map_err(|ptr| ptr.addr() & mask)
|
|
}
|
|
|
|
pub fn set_ptr(&self, ptr: *mut T, success: Ordering, failure: Ordering) {
|
|
let mask = Self::mask();
|
|
let ptr = ptr.cast::<()>();
|
|
loop {
|
|
let old = self.0.load(failure);
|
|
let new = ptr.with_addr((ptr.addr() & !mask) | (old.addr() & mask));
|
|
if self
|
|
.0
|
|
.compare_exchange_weak(old, new, success, failure)
|
|
.is_ok()
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn set_tag(&self, tag: usize, success: Ordering, failure: Ordering) {
|
|
let mask = Self::mask();
|
|
loop {
|
|
let ptr = self.0.load(failure);
|
|
let new = ptr.with_addr((ptr.addr() & !mask) | (tag & mask));
|
|
if self
|
|
.0
|
|
.compare_exchange_weak(ptr, new, success, failure)
|
|
.is_ok()
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn ptr_and_tag(&self, order: Ordering) -> (NonNull<T>, usize) {
|
|
let mask = Self::mask();
|
|
let ptr = self.0.load(order);
|
|
let tag = ptr.addr() & mask;
|
|
let addr = ptr.addr() & !mask;
|
|
let ptr = unsafe { NonNull::new_unchecked(ptr.with_addr(addr).cast()) };
|
|
(ptr, tag)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
|
#[repr(transparent)]
|
|
pub struct SendPtr<T>(NonNull<T>);
|
|
|
|
impl<T> SendPtr<T> {
|
|
pub fn as_ptr(&self) -> *mut T {
|
|
self.0.as_ptr()
|
|
}
|
|
pub unsafe fn new_unchecked(t: *const T) -> Self {
|
|
unsafe { Self(NonNull::new_unchecked(t.cast_mut())) }
|
|
}
|
|
pub fn new(t: *const T) -> Option<Self> {
|
|
NonNull::new(t.cast_mut()).map(Self)
|
|
}
|
|
pub fn cast<U>(self) -> SendPtr<U> {
|
|
SendPtr(self.0.cast::<U>())
|
|
}
|
|
}
|
|
|
|
impl<T> Deref for SendPtr<T> {
|
|
type Target = T;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
unsafe { &*self.0.as_ptr() }
|
|
}
|
|
}
|
|
|
|
impl<T> DerefMut for SendPtr<T> {
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
unsafe { &mut *self.0.as_ptr() }
|
|
}
|
|
}
|
|
|
|
pub struct XorShift64Star {
|
|
state: Cell<u64>,
|
|
}
|
|
|
|
impl XorShift64Star {
|
|
/// Initializes the prng with a seed. Provided seed must be nonzero.
|
|
pub fn new(seed: u64) -> Self {
|
|
XorShift64Star {
|
|
state: Cell::new(seed),
|
|
}
|
|
}
|
|
|
|
/// Returns a pseudorandom number.
|
|
pub fn next(&self) -> u64 {
|
|
let mut x = self.state.get();
|
|
debug_assert_ne!(x, 0);
|
|
x ^= x >> 12;
|
|
x ^= x << 25;
|
|
x ^= x >> 27;
|
|
self.state.set(x);
|
|
x.wrapping_mul(0x2545_f491_4f6c_dd1d)
|
|
}
|
|
|
|
/// Return a pseudorandom number from `0..n`.
|
|
pub fn next_usize(&self, n: usize) -> usize {
|
|
(self.next() % n as u64) as usize
|
|
}
|
|
}
|
|
}
|
|
|
|
mod job {
|
|
use std::{
|
|
any::Any,
|
|
cell::{Cell, UnsafeCell},
|
|
fmt::Debug,
|
|
marker::PhantomPinned,
|
|
mem::{self, ManuallyDrop, MaybeUninit},
|
|
pin::Pin,
|
|
ptr::{self, NonNull},
|
|
sync::atomic::{AtomicPtr, AtomicU8, Ordering},
|
|
thread::Thread,
|
|
};
|
|
|
|
use parking_lot_core::SpinWait;
|
|
|
|
use super::util::{SendPtr, TaggedAtomicPtr};
|
|
|
|
#[cfg_attr(target_pointer_width = "64", repr(align(16)))]
|
|
#[cfg_attr(target_pointer_width = "32", repr(align(8)))]
|
|
#[derive(Debug, Default, Clone, Copy)]
|
|
struct Size2([usize; 2]);
|
|
|
|
pub struct Value<T>(pub MaybeUninit<Box<MaybeUninit<T>>>);
|
|
|
|
impl<T> Value<T> {
|
|
/// must only be called once. takes a reference so this can be called in
|
|
/// drop()
|
|
unsafe fn get_unchecked(&self, inline: bool) -> T {
|
|
if inline {
|
|
unsafe { mem::transmute_copy::<MaybeUninit<Box<MaybeUninit<T>>>, T>(&self.0) }
|
|
} else {
|
|
unsafe {
|
|
let inner = *self.0.assume_init_read();
|
|
inner.assume_init()
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn get(self) -> T {
|
|
let this = ManuallyDrop::new(self);
|
|
let inline = Self::is_inline();
|
|
|
|
// SAFETY: inline is correctly calculated and this function
|
|
// consumes `self`
|
|
unsafe { this.get_unchecked(inline) }
|
|
}
|
|
|
|
pub fn is_inline() -> bool {
|
|
// the value can be stored inline iff the size of T is equal or
|
|
// smaller than the size of the boxed type and the alignment of the
|
|
// boxed type is an integer multiple of the alignment of T
|
|
mem::size_of::<T>() < mem::size_of::<Box<MaybeUninit<T>>>()
|
|
&& mem::align_of::<Box<MaybeUninit<T>>>() % mem::align_of::<T>() == 0
|
|
}
|
|
|
|
pub fn new(value: T) -> Self {
|
|
let inline = Self::is_inline();
|
|
|
|
// SAFETY: we know the box is allocated if state was `Pending`.
|
|
if inline {
|
|
let mut this = Self(MaybeUninit::uninit());
|
|
unsafe {
|
|
*mem::transmute::<&mut MaybeUninit<Box<MaybeUninit<T>>>, &mut T>(&mut this.0) =
|
|
value;
|
|
}
|
|
this
|
|
} else {
|
|
Self(MaybeUninit::new(Box::new(MaybeUninit::new(value))))
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Drop for Value<T> {
|
|
fn drop(&mut self) {
|
|
unsafe {
|
|
// drop contained value.
|
|
_ = self.get_unchecked(Self::is_inline());
|
|
}
|
|
}
|
|
}
|
|
|
|
#[repr(u8)]
|
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
|
pub enum JobState {
|
|
Empty,
|
|
Locked = 1,
|
|
Pending,
|
|
Finished,
|
|
// Inline = 1 << (u8::BITS - 1),
|
|
// IsError = 1 << (u8::BITS - 2),
|
|
}
|
|
|
|
impl JobState {
|
|
const MASK: u8 = 0; // Self::Inline as u8 | Self::IsError as u8;
|
|
fn from_u8(v: u8) -> Option<Self> {
|
|
match v {
|
|
0 => Some(Self::Empty),
|
|
1 => Some(Self::Locked),
|
|
2 => Some(Self::Pending),
|
|
3 => Some(Self::Finished),
|
|
_ => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct JobList {
|
|
head: Pin<Box<Job>>,
|
|
tail: Pin<Box<Job>>,
|
|
}
|
|
|
|
impl JobList {
|
|
pub fn new() -> JobList {
|
|
let head = Box::pin(Job::empty());
|
|
let tail = Box::pin(Job::empty());
|
|
|
|
// head and tail point at themselves
|
|
unsafe {
|
|
(&mut *head.err_or_link.get()).link.next = None;
|
|
(&mut *head.err_or_link.get()).link.prev =
|
|
Some(NonNull::new_unchecked((&raw const *tail).cast_mut()));
|
|
|
|
(&mut *tail.err_or_link.get()).link.next =
|
|
Some(NonNull::new_unchecked((&raw const *head).cast_mut()));
|
|
(&mut *tail.err_or_link.get()).link.prev = None;
|
|
}
|
|
|
|
Self { head, tail }
|
|
}
|
|
|
|
fn head_ptr(&self) -> *const Job {
|
|
&raw const *self.head
|
|
}
|
|
fn tail_ptr(&self) -> *const Job {
|
|
&raw const *self.tail
|
|
}
|
|
fn head(&self) -> NonNull<Job> {
|
|
unsafe { NonNull::new_unchecked(self.head_ptr().cast_mut()) }
|
|
}
|
|
fn tail(&self) -> NonNull<Job> {
|
|
unsafe { NonNull::new_unchecked(self.tail_ptr().cast_mut()) }
|
|
}
|
|
|
|
/// elem must be valid until it is popped.
|
|
pub unsafe fn push_front<T>(&mut self, elem: Pin<&Job<T>>) {
|
|
let head_link = unsafe { self.head.link_mut() };
|
|
|
|
// SAFETY: head will always have a previous element.
|
|
let prev = head_link.prev.unwrap();
|
|
let prev_link = unsafe { prev.as_ref().link_mut() };
|
|
|
|
let elem_ptr = unsafe { NonNull::new_unchecked(&*elem as *const Job<T> as *mut Job) };
|
|
head_link.prev = Some(elem_ptr);
|
|
prev_link.next = Some(elem_ptr);
|
|
|
|
let elem_link = unsafe { elem.link_mut() };
|
|
elem_link.prev = Some(prev);
|
|
elem_link.next = Some(self.head());
|
|
}
|
|
|
|
/// elem must be valid until it is popped.
|
|
pub unsafe fn push_back<T>(&mut self, elem: Pin<&Job<T>>) {
|
|
let tail_link = unsafe { self.tail.link_mut() };
|
|
|
|
// SAFETY: tail will always have a previous element.
|
|
let next = tail_link.next.unwrap();
|
|
let next_link = unsafe { next.as_ref().link_mut() };
|
|
|
|
let elem_ptr = unsafe { NonNull::new_unchecked(&*elem as *const Job<T> as *mut Job) };
|
|
tail_link.next = Some(elem_ptr);
|
|
next_link.prev = Some(elem_ptr);
|
|
|
|
let elem_link = unsafe { elem.link_mut() };
|
|
elem_link.next = Some(next);
|
|
elem_link.prev = Some(self.tail());
|
|
}
|
|
|
|
pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
|
|
let head_link = unsafe { self.head.link_mut() };
|
|
|
|
// SAFETY: head will always have a previous element.
|
|
let elem = head_link.prev.unwrap();
|
|
let elem_link = unsafe { elem.as_ref().link_mut() };
|
|
|
|
let prev = elem_link.prev?.as_ptr();
|
|
head_link.prev = unsafe { Some(NonNull::new_unchecked(prev)) };
|
|
|
|
let prev_link = unsafe { (&*prev).link_mut() };
|
|
prev_link.next = Some(self.head());
|
|
|
|
Some(elem)
|
|
}
|
|
|
|
pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
|
|
// TODO: next and elem might be the same
|
|
let tail_link = unsafe { self.tail.link_mut() };
|
|
|
|
// SAFETY: head will always have a previous element.
|
|
let elem = tail_link.next.unwrap();
|
|
let elem_link = unsafe { elem.as_ref().link_mut() };
|
|
|
|
let next = elem_link.next?.as_ptr();
|
|
tail_link.next = unsafe { Some(NonNull::new_unchecked(next)) };
|
|
|
|
let next_link = unsafe { (&*next).link_mut() };
|
|
next_link.prev = Some(self.tail());
|
|
|
|
Some(elem)
|
|
}
|
|
}
|
|
|
|
union ValueOrThis<T> {
|
|
uninit: (),
|
|
value: ManuallyDrop<Value<T>>,
|
|
this: NonNull<()>,
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq)]
|
|
struct Link<T> {
|
|
prev: Option<NonNull<T>>,
|
|
next: Option<NonNull<T>>,
|
|
}
|
|
|
|
impl<T> Clone for Link<T> {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
prev: self.prev.clone(),
|
|
next: self.next.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
// because Copy is invariant over `T`
|
|
impl<T> Copy for Link<T> {}
|
|
|
|
union LinkOrError<T> {
|
|
link: Link<T>,
|
|
waker: ManuallyDrop<Option<Thread>>,
|
|
error: ManuallyDrop<Option<Box<dyn Any + Send + 'static>>>,
|
|
}
|
|
|
|
pub struct Job<T = ()> {
|
|
/// tagged pointer, 8-aligned
|
|
harness_and_state: TaggedAtomicPtr<usize, 3>,
|
|
/// NonNull<()> before execute(), Value<T> after
|
|
val_or_this: UnsafeCell<ValueOrThis<T>>,
|
|
/// (prev,next) before execute(), Box<...> after
|
|
err_or_link: UnsafeCell<LinkOrError<Job>>,
|
|
_phantom: PhantomPinned,
|
|
}
|
|
|
|
impl<T> Debug for Job<T> {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
let state =
|
|
JobState::from_u8(self.harness_and_state.tag(Ordering::Relaxed) as u8).unwrap();
|
|
let mut debug = f.debug_struct("Job");
|
|
debug.field("state", &state).field_with("harness", |f| {
|
|
write!(f, "{:?}", self.harness_and_state.ptr(Ordering::Relaxed))
|
|
});
|
|
|
|
match state {
|
|
JobState::Empty => {
|
|
debug
|
|
.field_with("this", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.val_or_this.get()).this })
|
|
})
|
|
.field_with("link", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.err_or_link.get()).link })
|
|
});
|
|
}
|
|
JobState::Locked => {
|
|
#[derive(Debug)]
|
|
struct Locked;
|
|
debug.field("locked", &Locked);
|
|
}
|
|
JobState::Pending => {
|
|
debug
|
|
.field_with("this", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.val_or_this.get()).this })
|
|
})
|
|
.field_with("waker", |f| {
|
|
write!(f, "{:?}", unsafe { &(&*self.err_or_link.get()).waker })
|
|
});
|
|
}
|
|
JobState::Finished => {
|
|
let err = unsafe { &(&*self.err_or_link.get()).error };
|
|
|
|
let result = match err.as_ref() {
|
|
Some(err) => Err(err),
|
|
None => Ok(unsafe { (&*self.val_or_this.get()).value.0.as_ptr() }),
|
|
};
|
|
|
|
debug.field("result", &result);
|
|
}
|
|
}
|
|
|
|
debug.finish()
|
|
}
|
|
}
|
|
|
|
unsafe impl<T> Send for Job<T> {}
|
|
|
|
impl<T> Job<T> {
|
|
pub fn new(harness: unsafe fn(*const (), *const Job<T>), this: NonNull<()>) -> Job<T> {
|
|
Self {
|
|
harness_and_state: TaggedAtomicPtr::new(
|
|
unsafe { mem::transmute(harness) },
|
|
JobState::Empty as usize,
|
|
),
|
|
val_or_this: UnsafeCell::new(ValueOrThis { this }),
|
|
err_or_link: UnsafeCell::new(LinkOrError {
|
|
link: Link {
|
|
prev: None,
|
|
next: None,
|
|
},
|
|
}),
|
|
_phantom: PhantomPinned,
|
|
}
|
|
}
|
|
pub fn empty() -> Job<T> {
|
|
Self {
|
|
harness_and_state: TaggedAtomicPtr::new(
|
|
ptr::dangling_mut(),
|
|
JobState::Empty as usize,
|
|
),
|
|
val_or_this: UnsafeCell::new(ValueOrThis {
|
|
this: NonNull::dangling(),
|
|
}),
|
|
err_or_link: UnsafeCell::new(LinkOrError {
|
|
link: Link {
|
|
prev: None,
|
|
next: None,
|
|
},
|
|
}),
|
|
_phantom: PhantomPinned,
|
|
}
|
|
}
|
|
|
|
unsafe fn link_mut(&self) -> &mut Link<Job> {
|
|
unsafe { &mut (&mut *self.err_or_link.get()).link }
|
|
}
|
|
|
|
/// assumes job is in joblist
|
|
pub unsafe fn unlink(&self) -> Option<()> {
|
|
unsafe {
|
|
let link = self.link_mut();
|
|
link.prev?.as_ref().link_mut().next = link.next;
|
|
link.next?.as_ref().link_mut().prev = link.prev;
|
|
}
|
|
Some(())
|
|
}
|
|
|
|
pub fn state(&self) -> u8 {
|
|
self.harness_and_state.tag(Ordering::Relaxed) as u8
|
|
}
|
|
|
|
pub fn wait(&self) -> std::thread::Result<T> {
|
|
let mut spin = SpinWait::new();
|
|
loop {
|
|
match self.harness_and_state.compare_exchange_weak_tag(
|
|
JobState::Pending as usize,
|
|
JobState::Locked as usize,
|
|
Ordering::Acquire,
|
|
Ordering::Relaxed,
|
|
) {
|
|
// if still pending, sleep until completed
|
|
Ok(state) => {
|
|
assert_eq!(state, JobState::Pending as usize);
|
|
unsafe {
|
|
*(&mut *self.err_or_link.get()).waker = Some(std::thread::current());
|
|
}
|
|
|
|
self.harness_and_state.set_tag(
|
|
JobState::Pending as usize,
|
|
Ordering::Release,
|
|
Ordering::Relaxed,
|
|
);
|
|
|
|
std::thread::park();
|
|
spin.reset();
|
|
|
|
// after sleeping, state should be `Finished`
|
|
}
|
|
Err(state) => {
|
|
assert_ne!(state, JobState::Pending as usize);
|
|
|
|
if state == JobState::Finished as usize {
|
|
let err = unsafe { (&mut *self.err_or_link.get()).error.take() };
|
|
|
|
let result: std::thread::Result<T> = if let Some(err) = err {
|
|
Err(err)
|
|
} else {
|
|
let val = unsafe {
|
|
ManuallyDrop::take(&mut (&mut *self.val_or_this.get()).value)
|
|
};
|
|
|
|
Ok(val.get())
|
|
};
|
|
|
|
return result;
|
|
} else {
|
|
// spin until lock is released.
|
|
// eprintln!(
|
|
// "wait({:?}): spinning ({:?})",
|
|
// self as *const _,
|
|
// JobState::from_u8(state as u8).unwrap()
|
|
// );
|
|
spin.spin();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// call this when popping value from local queue
|
|
pub fn set_pending(&self) {
|
|
let mut spin = SpinWait::new();
|
|
loop {
|
|
match self.harness_and_state.compare_exchange_weak_tag(
|
|
JobState::Empty as usize,
|
|
JobState::Pending as usize,
|
|
Ordering::Acquire,
|
|
Ordering::Relaxed,
|
|
) {
|
|
Ok(state) => {
|
|
assert_eq!(state, JobState::Empty as usize);
|
|
// set waker to None
|
|
unsafe {
|
|
(&mut *self.err_or_link.get()).waker = ManuallyDrop::new(None);
|
|
}
|
|
return;
|
|
}
|
|
Err(state) => {
|
|
assert_ne!(state, JobState::Empty as usize);
|
|
|
|
eprintln!("######## what the sigma?");
|
|
spin.spin();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn execute(&self) {
|
|
// SAFETY: self is non-null
|
|
unsafe {
|
|
let (ptr, state) = self.harness_and_state.ptr_and_tag(Ordering::Relaxed);
|
|
assert_eq!(state, JobState::Pending as usize);
|
|
|
|
let harness: unsafe fn(*const (), *const Self) = mem::transmute(ptr.as_ptr());
|
|
let this = (*self.val_or_this.get()).this;
|
|
|
|
harness(this.as_ptr().cast(), (self as *const Self).cast());
|
|
}
|
|
}
|
|
|
|
fn complete(&self, result: std::thread::Result<T>) {
|
|
let mut spin = SpinWait::new();
|
|
loop {
|
|
match self.harness_and_state.compare_exchange_weak_tag(
|
|
JobState::Pending as usize,
|
|
JobState::Locked as usize,
|
|
Ordering::Acquire,
|
|
Ordering::Relaxed,
|
|
) {
|
|
Ok(state) => {
|
|
assert_eq!(state, JobState::Pending as usize);
|
|
break;
|
|
}
|
|
Err(state) => {
|
|
assert_ne!(state, JobState::Pending as usize);
|
|
spin.spin();
|
|
}
|
|
}
|
|
}
|
|
|
|
let waker = unsafe { (&mut *self.err_or_link.get()).waker.take() };
|
|
|
|
match result {
|
|
Ok(val) => unsafe {
|
|
(&mut *self.val_or_this.get()).value = ManuallyDrop::new(Value::new(val));
|
|
(&mut *self.err_or_link.get()).error = ManuallyDrop::new(None);
|
|
},
|
|
Err(err) => unsafe {
|
|
(&mut *self.val_or_this.get()).uninit = ();
|
|
(&mut *self.err_or_link.get()).error = ManuallyDrop::new(Some(err));
|
|
},
|
|
}
|
|
|
|
if let Some(thread) = waker {
|
|
thread.unpark();
|
|
}
|
|
|
|
self.harness_and_state.set_tag(
|
|
JobState::Finished as usize,
|
|
Ordering::Release,
|
|
Ordering::Relaxed,
|
|
);
|
|
}
|
|
}
|
|
|
|
impl Job {}
|
|
|
|
pub struct HeapJob<F> {
|
|
f: F,
|
|
}
|
|
|
|
impl<F> HeapJob<F> {
|
|
pub fn new(f: F) -> Box<Self> {
|
|
Box::new(Self { f })
|
|
}
|
|
pub fn into_boxed_job<T>(self: Box<Self>) -> Box<Job<()>>
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Send,
|
|
{
|
|
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Sized + Send,
|
|
{
|
|
let job = unsafe { &*job.cast::<Job<T>>() };
|
|
|
|
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
|
|
let f = this.f;
|
|
|
|
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
|
|
|
|
job.complete(result);
|
|
}
|
|
|
|
Box::new(Job::new(harness::<F, T>, unsafe {
|
|
NonNull::new_unchecked(Box::into_raw(self)).cast()
|
|
}))
|
|
}
|
|
}
|
|
|
|
pub struct StackJob<F> {
|
|
f: UnsafeCell<ManuallyDrop<F>>,
|
|
}
|
|
|
|
impl<F> StackJob<F> {
|
|
pub fn new(f: F) -> Self {
|
|
Self {
|
|
f: UnsafeCell::new(ManuallyDrop::new(f)),
|
|
}
|
|
}
|
|
|
|
pub unsafe fn unwrap(&self) -> F {
|
|
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
|
|
}
|
|
|
|
pub fn as_job<T>(&self) -> Job<()>
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Send,
|
|
{
|
|
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
|
|
where
|
|
F: FnOnce() -> T + Send,
|
|
T: Sized + Send,
|
|
{
|
|
let this = unsafe { &*this.cast::<StackJob<F>>() };
|
|
let f = unsafe { this.unwrap() };
|
|
|
|
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(f));
|
|
|
|
let job_ref = unsafe { &*job.cast::<Job<T>>() };
|
|
job_ref.complete(result);
|
|
}
|
|
|
|
Job::new(harness::<F, T>, unsafe {
|
|
NonNull::new_unchecked(self as *const _ as *mut ())
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
use std::{
|
|
cell::{Cell, UnsafeCell},
|
|
collections::BTreeMap,
|
|
mem,
|
|
pin::{pin, Pin},
|
|
ptr::NonNull,
|
|
sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc, OnceLock, Weak,
|
|
},
|
|
time::Duration,
|
|
};
|
|
|
|
use job::*;
|
|
use parking_lot::{Condvar, Mutex};
|
|
use util::DropGuard;
|
|
|
|
pub struct Scope {
|
|
join_count: Cell<usize>,
|
|
context: Arc<Context>,
|
|
index: usize,
|
|
heartbeat: Arc<AtomicBool>,
|
|
queue: UnsafeCell<JobList>,
|
|
}
|
|
|
|
thread_local! {
|
|
static SCOPE: UnsafeCell<Option<NonNull<Scope>>> = const { UnsafeCell::new(None) };
|
|
}
|
|
|
|
impl Scope {
|
|
/// locks shared context
|
|
fn new() -> Self {
|
|
let context = Context::global().clone();
|
|
Self::new_in(context)
|
|
}
|
|
|
|
/// locks shared context
|
|
fn new_in(context: Arc<Context>) -> Self {
|
|
let (heartbeat, index) = context.shared.lock().new_heartbeat();
|
|
|
|
Self {
|
|
context,
|
|
index,
|
|
heartbeat,
|
|
join_count: Cell::new(0),
|
|
queue: UnsafeCell::new(JobList::new()),
|
|
}
|
|
}
|
|
|
|
fn with_in<T, F: FnOnce(&Scope) -> T>(ctx: Arc<Context>, f: F) -> T {
|
|
let mut guard = Option::<DropGuard<Box<dyn FnOnce()>>>::None;
|
|
|
|
let scope = match Self::current_ref() {
|
|
Some(scope) if Arc::ptr_eq(&scope.context, &ctx) => scope,
|
|
Some(_) => {
|
|
let old = unsafe { Self::unset_current().unwrap().as_ptr() };
|
|
guard = Some(DropGuard::new(Box::new(move || unsafe {
|
|
_ = Box::from_raw(Self::unset_current().unwrap().as_ptr());
|
|
Self::set_current(old.cast_const());
|
|
})));
|
|
let current = Box::into_raw(Box::new(Self::new_in(ctx)));
|
|
unsafe {
|
|
Self::set_current(current.cast_const());
|
|
&*current
|
|
}
|
|
}
|
|
None => {
|
|
let current = Box::into_raw(Box::new(Self::new_in(ctx)));
|
|
|
|
guard = Some(DropGuard::new(Box::new(|| unsafe {
|
|
_ = Box::from_raw(Self::unset_current().unwrap().as_ptr());
|
|
})));
|
|
|
|
unsafe {
|
|
Self::set_current(current.cast_const());
|
|
|
|
&*current
|
|
}
|
|
}
|
|
};
|
|
|
|
let t = f(scope);
|
|
drop(guard);
|
|
t
|
|
}
|
|
|
|
pub fn with<T, F: FnOnce(&Scope) -> T>(f: F) -> T {
|
|
Self::with_in(Context::global().clone(), f)
|
|
}
|
|
|
|
unsafe fn set_current(scope: *const Scope) {
|
|
SCOPE.with(|ptr| unsafe {
|
|
_ = (&mut *ptr.get()).insert(NonNull::new_unchecked(scope.cast_mut()));
|
|
})
|
|
}
|
|
|
|
unsafe fn unset_current() -> Option<NonNull<Scope>> {
|
|
SCOPE.with(|ptr| unsafe { (&mut *ptr.get()).take() })
|
|
}
|
|
|
|
fn current() -> Option<NonNull<Scope>> {
|
|
SCOPE.with(|ptr| unsafe { *ptr.get() })
|
|
}
|
|
|
|
fn current_ref<'a>() -> Option<&'a Scope> {
|
|
SCOPE.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) })
|
|
}
|
|
|
|
fn push_front<T>(&self, job: Pin<&Job<T>>) {
|
|
unsafe {
|
|
self.queue.as_mut_unchecked().push_front(job);
|
|
}
|
|
}
|
|
fn push_back<T>(&self, job: Pin<&Job<T>>) {
|
|
unsafe {
|
|
self.queue.as_mut_unchecked().push_back(job);
|
|
}
|
|
}
|
|
fn pop_back(&self) -> Option<NonNull<Job>> {
|
|
unsafe { self.queue.as_mut_unchecked().pop_back() }
|
|
}
|
|
fn pop_front(&self) -> Option<NonNull<Job>> {
|
|
unsafe { self.queue.as_mut_unchecked().pop_front() }
|
|
}
|
|
|
|
pub fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
|
|
where
|
|
RA: Send,
|
|
RB: Send,
|
|
A: FnOnce() -> RA + Send,
|
|
B: FnOnce() -> RB + Send,
|
|
{
|
|
self.join_heartbeat_every::<_, _, _, _, 64>(a, b)
|
|
}
|
|
|
|
pub fn join_seq<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
|
|
where
|
|
RA: Send,
|
|
RB: Send,
|
|
A: FnOnce() -> RA + Send,
|
|
B: FnOnce() -> RB + Send,
|
|
{
|
|
(a(), b())
|
|
}
|
|
|
|
pub fn join_heartbeat_every<A, B, RA, RB, const TIMES: usize>(&self, a: A, b: B) -> (RA, RB)
|
|
where
|
|
RA: Send,
|
|
RB: Send,
|
|
A: FnOnce() -> RA + Send,
|
|
B: FnOnce() -> RB + Send,
|
|
{
|
|
let count = self.join_count.get();
|
|
self.join_count.set(count.wrapping_add(1) % TIMES);
|
|
|
|
if count == 1 {
|
|
self.join_heartbeat(a, b)
|
|
} else {
|
|
self.join_seq(a, b)
|
|
}
|
|
}
|
|
|
|
pub fn join_heartbeat<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
|
|
where
|
|
RA: Send,
|
|
RB: Send,
|
|
A: FnOnce() -> RA + Send,
|
|
B: FnOnce() -> RB + Send,
|
|
{
|
|
let b = StackJob::new(b);
|
|
|
|
let job = pin!(b.as_job());
|
|
self.push_front(job.as_ref());
|
|
|
|
let ra = a();
|
|
|
|
let rb = if job.state() == JobState::Empty as u8 {
|
|
unsafe {
|
|
job.unlink();
|
|
}
|
|
|
|
self.tick();
|
|
unsafe { b.unwrap()() }
|
|
} else {
|
|
match self.wait_until::<RB>(unsafe {
|
|
mem::transmute::<Pin<&Job<()>>, Pin<&Job<RB>>>(job.as_ref())
|
|
}) {
|
|
Some(Ok(t)) => t,
|
|
Some(Err(payload)) => std::panic::resume_unwind(payload),
|
|
None => unsafe { b.unwrap()() },
|
|
}
|
|
};
|
|
|
|
drop(b);
|
|
(ra, rb)
|
|
}
|
|
|
|
#[inline]
|
|
fn tick(&self) {
|
|
if self.heartbeat.load(Ordering::Relaxed) {
|
|
self.heartbeat_cold();
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn execute(&self, job: &Job) {
|
|
self.tick();
|
|
job.execute();
|
|
}
|
|
|
|
#[cold]
|
|
fn heartbeat_cold(&self) {
|
|
let mut guard = self.context.shared.lock();
|
|
|
|
if !guard.jobs.contains_key(&self.index) {
|
|
if let Some(job) = self.pop_back() {
|
|
unsafe {
|
|
job.as_ref().set_pending();
|
|
}
|
|
guard.jobs.insert(self.index, job);
|
|
self.context.shared_job.notify_one();
|
|
}
|
|
}
|
|
|
|
self.heartbeat.store(false, Ordering::Relaxed);
|
|
}
|
|
|
|
#[cold]
|
|
pub fn wait_until<T>(&self, job: Pin<&Job<T>>) -> Option<std::thread::Result<T>> {
|
|
let shared_job = self.context.shared.lock().jobs.remove(&self.index);
|
|
|
|
if let Some(ptr) = shared_job {
|
|
if ptr.as_ptr() == &*job as *const _ as *mut _ {
|
|
return None;
|
|
} else {
|
|
unsafe {
|
|
self.execute(ptr.as_ref());
|
|
}
|
|
}
|
|
}
|
|
|
|
while job.state() != JobState::Finished as u8 {
|
|
let Some(job) = self
|
|
.context
|
|
.shared
|
|
.lock()
|
|
.jobs
|
|
.pop_first()
|
|
.map(|(_, job)| job)
|
|
.or_else(|| {
|
|
self.pop_front().inspect(|job| unsafe {
|
|
job.as_ref().set_pending();
|
|
})
|
|
})
|
|
else {
|
|
break;
|
|
};
|
|
|
|
unsafe {
|
|
self.execute(job.as_ref());
|
|
}
|
|
}
|
|
// while job isn't done, run other jobs.
|
|
Some(job.wait())
|
|
}
|
|
}
|
|
|
|
fn join<A, B, RA, RB>(a: A, b: B) -> (RA, RB)
|
|
where
|
|
RA: Send,
|
|
RB: Send,
|
|
A: FnOnce() -> RA + Send,
|
|
B: FnOnce() -> RB + Send,
|
|
{
|
|
Scope::with(|scope| scope.join_heartbeat(a, b))
|
|
}
|
|
|
|
struct Heartbeat {
|
|
weak: Arc<AtomicBool>,
|
|
index: usize,
|
|
}
|
|
|
|
pub struct ThreadPool {
|
|
context: Arc<Context>,
|
|
}
|
|
|
|
impl ThreadPool {
|
|
pub fn new() -> ThreadPool {
|
|
Self {
|
|
context: Context::new(),
|
|
}
|
|
}
|
|
|
|
pub fn global() -> ThreadPool {
|
|
ThreadPool {
|
|
context: Context::global().clone(),
|
|
}
|
|
}
|
|
|
|
pub fn scope<T, F: FnOnce(&Scope) -> T>(&self, f: F) -> T {
|
|
Scope::with_in(self.context.clone(), f)
|
|
}
|
|
}
|
|
|
|
struct Context {
|
|
shared: Mutex<SharedContext>,
|
|
shared_job: Condvar,
|
|
}
|
|
|
|
struct SharedContext {
|
|
jobs: BTreeMap<usize, NonNull<Job>>,
|
|
heartbeats: BTreeMap<usize, Weak<AtomicBool>>,
|
|
// monotonic increasing id
|
|
heartbeats_id: usize,
|
|
should_stop: bool,
|
|
rng: util::XorShift64Star,
|
|
}
|
|
|
|
unsafe impl Send for SharedContext {}
|
|
|
|
impl SharedContext {
|
|
fn new_heartbeat(&mut self) -> (Arc<AtomicBool>, usize) {
|
|
let index = self.heartbeats_id;
|
|
self.heartbeats_id.checked_add(1).unwrap();
|
|
|
|
let is_set = Arc::new(AtomicBool::new(false));
|
|
let weak = Arc::downgrade(&is_set);
|
|
|
|
self.heartbeats.insert(index, weak);
|
|
|
|
(is_set, index)
|
|
}
|
|
}
|
|
|
|
impl Context {
|
|
fn new() -> Arc<Context> {
|
|
let this = Arc::new(Self {
|
|
shared: Mutex::new(SharedContext {
|
|
jobs: BTreeMap::new(),
|
|
heartbeats: BTreeMap::new(),
|
|
heartbeats_id: 0,
|
|
should_stop: false,
|
|
rng: util::XorShift64Star::new(37),
|
|
}),
|
|
shared_job: Condvar::new(),
|
|
});
|
|
|
|
eprintln!("created threadpool {:?}", Arc::as_ptr(&this));
|
|
|
|
let num_threads = available_parallelism();
|
|
// let num_threads = 2;
|
|
let barrier = Arc::new(std::sync::Barrier::new(num_threads + 1));
|
|
|
|
for _ in 0..num_threads {
|
|
let ctx = this.clone();
|
|
let barrier = barrier.clone();
|
|
std::thread::spawn(|| worker(ctx, barrier));
|
|
}
|
|
|
|
let ctx = this.clone();
|
|
std::thread::spawn(|| heartbeat_worker(ctx));
|
|
|
|
barrier.wait();
|
|
|
|
this
|
|
}
|
|
|
|
pub fn global() -> &'static Arc<Self> {
|
|
GLOBAL_CONTEXT.get_or_init(|| Self::new())
|
|
}
|
|
}
|
|
|
|
static GLOBAL_CONTEXT: OnceLock<Arc<Context>> = OnceLock::new();
|
|
const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(100);
|
|
|
|
fn available_parallelism() -> usize {
|
|
std::thread::available_parallelism()
|
|
.map(|n| n.get())
|
|
.unwrap_or(1)
|
|
}
|
|
|
|
fn worker(ctx: Arc<Context>, barrier: Arc<std::sync::Barrier>) {
|
|
unsafe {
|
|
Scope::set_current(Box::into_raw(Box::new(Scope::new_in(ctx.clone()))).cast_const());
|
|
}
|
|
let _guard =
|
|
DropGuard::new(|| unsafe { drop(Box::from_raw(Scope::unset_current().unwrap().as_ptr())) });
|
|
|
|
let scope = Scope::current_ref().unwrap();
|
|
|
|
barrier.wait();
|
|
|
|
let mut job = ctx.shared.lock().jobs.pop_first();
|
|
loop {
|
|
if let Some((_, job)) = job {
|
|
unsafe {
|
|
scope.execute(job.as_ref());
|
|
}
|
|
}
|
|
|
|
let mut guard = ctx.shared.lock();
|
|
if guard.should_stop {
|
|
break;
|
|
}
|
|
|
|
ctx.shared_job.wait(&mut guard);
|
|
job = guard.jobs.pop_first();
|
|
}
|
|
}
|
|
|
|
fn heartbeat_worker(ctx: Arc<Context>) {
|
|
let mut i = 0;
|
|
loop {
|
|
let sleep_for = {
|
|
let mut guard = ctx.shared.lock();
|
|
if guard.should_stop {
|
|
break;
|
|
}
|
|
|
|
let mut n = 0;
|
|
guard.heartbeats.retain(|_, b| {
|
|
b.upgrade()
|
|
.inspect(|heartbeat| {
|
|
if n == i {
|
|
heartbeat.store(true, Ordering::Relaxed);
|
|
}
|
|
n += 1;
|
|
})
|
|
.is_some()
|
|
});
|
|
let num_heartbeats = guard.heartbeats.len();
|
|
|
|
drop(guard);
|
|
|
|
if i >= num_heartbeats {
|
|
i = 0;
|
|
} else {
|
|
i += 1;
|
|
}
|
|
|
|
HEARTBEAT_INTERVAL.checked_div(num_heartbeats as u32)
|
|
};
|
|
|
|
if let Some(duration) = sleep_for {
|
|
std::thread::sleep(duration);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests;
|