diff --git a/Cargo.toml b/Cargo.toml index 8e5c658..649e93e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ anyhow = "1.0.89" thiserror = "2.0" bitflags = "2.6" core_affinity = "0.8.1" +parking_lot_core = "0.9.10" # derive_more = "1.0.0" [dev-dependencies] diff --git a/src/job/mod.rs b/src/job/mod.rs index 386ce78..6d09c1d 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -3,6 +3,9 @@ use std::{cell::UnsafeCell, marker::PhantomPinned, sync::atomic::AtomicBool}; use crate::latch::Latch; +pub mod spice; +pub mod v2; + pub trait Job { unsafe fn execute(this: *const (), args: Args); } @@ -35,23 +38,15 @@ impl JobRef { } } -pub struct StackJob -where - L: Latch + Sync, -{ +pub struct StackJob { task: UnsafeCell>, - latch: L, _phantom: PhantomPinned, } -impl StackJob -where - L: Latch + Sync, -{ - pub fn new(task: F, latch: L) -> StackJob { +impl StackJob { + pub fn new(task: F) -> StackJob { Self { task: UnsafeCell::new(Some(task)), - latch, _phantom: PhantomPinned, } } @@ -77,16 +72,14 @@ where } } -impl Job for StackJob +impl Job for StackJob where F: FnOnce(Args), - L: Latch + Sync, { unsafe fn execute(this: *const (), args: Args) { let this = &*this.cast::(); let func = (*this.task.get()).take().unwrap(); func(args); - Latch::set_raw(&this.latch); // set internal latch here? } } diff --git a/src/job/spice.rs b/src/job/spice.rs new file mode 100644 index 0000000..9cb6cba --- /dev/null +++ b/src/job/spice.rs @@ -0,0 +1 @@ +use std::{cell::UnsafeCell, sync::atomic::AtomicU8}; diff --git a/src/job/v2.rs b/src/job/v2.rs new file mode 100644 index 0000000..b3c6adb --- /dev/null +++ b/src/job/v2.rs @@ -0,0 +1,256 @@ +use core::{ + cell::UnsafeCell, + mem::{self, ManuallyDrop, MaybeUninit}, + sync::atomic::{AtomicU8, Ordering}, +}; +use std::thread::Thread; + +use parking_lot_core::SpinWait; + +use crate::util::SendPtr; + +#[allow(dead_code)] +#[cfg_attr(target_pointer_width = "64", repr(align(16)))] +#[cfg_attr(target_pointer_width = "32", repr(align(8)))] +#[derive(Debug, Default, Clone, Copy)] +struct Size2([usize; 2]); + +struct Value(pub MaybeUninit>>); + +impl Value { + unsafe fn get(self, inline: bool) -> T { + if inline { + unsafe { mem::transmute_copy(&self.0) } + } else { + unsafe { (*self.0.assume_init()).assume_init() } + } + } +} + +#[repr(u8)] +pub enum JobState { + Empty, + Locked = 1, + Pending, + Finished, + Inline = 1 << (u8::BITS - 1), +} + +pub struct Job { + state: AtomicU8, + this: SendPtr<()>, + harness: unsafe fn(*const (), *const Job<()>), + maybe_boxed_val: UnsafeCell>>, + waiting_thread: UnsafeCell>, +} + +impl Job { + pub fn state(&self) -> u8 { + self.state.load(Ordering::Relaxed) & !(JobState::Inline as u8) + } + pub fn wait(&self) -> Option { + let mut state = self.state.load(Ordering::Relaxed); + let mask = JobState::Inline as u8; + + let mut spin = SpinWait::new(); + loop { + match self.state.compare_exchange( + JobState::Pending as u8 | (state & mask), + JobState::Locked as u8 | (state & mask), + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(x) => { + state = x; + unsafe { + *self.waiting_thread.get() = Some(std::thread::current()); + } + + self.state + .store(JobState::Pending as u8 | (state & mask), Ordering::Release); + std::thread::park(); + spin.reset(); + break; + } + Err(x) => { + if x & JobState::Finished as u8 != 0 { + let val = unsafe { + let value = (&*self.maybe_boxed_val.get()).assume_init_read(); + value.get(state & JobState::Inline as u8 != 0) + }; + return Some(val); + } else { + spin.spin(); + } + } + } + } + return None; + } + /// call this when popping value from local queue + pub fn set_pending(&self) { + let state = self.state.load(Ordering::Relaxed); + let mask = JobState::Inline as u8; + + let mut spin = SpinWait::new(); + loop { + match self.state.compare_exchange( + JobState::Empty as u8 | (state & mask), + JobState::Pending as u8 | (state & mask), + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => { + return; + } + Err(_) => { + spin.spin(); + } + } + } + } + + pub fn execute(&self) { + // SAFETY: self is non-null + unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast()) }; + } + + fn complete(&self, result: T) { + let mut state = self.state.load(Ordering::Relaxed); + let mask = JobState::Inline as u8; + + let mut spin = SpinWait::new(); + loop { + match self.state.compare_exchange( + JobState::Pending as u8 | (state & mask), + JobState::Locked as u8 | (state & mask), + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(x) => { + state = x; + break; + } + Err(_) => { + spin.spin(); + } + } + } + + unsafe { + let value = (&mut *self.maybe_boxed_val.get()).assume_init_mut(); + // SAFETY: we know the box is allocated if state was `Pending`. + if state & JobState::Inline as u8 == 0 { + value.0 = MaybeUninit::new(Box::new(MaybeUninit::new(result))); + } else { + *mem::transmute::<_, &mut T>(&mut value.0) = result; + } + } + + if let Some(thread) = unsafe { &mut *self.waiting_thread.get() }.take() { + thread.unpark(); + } + + self.state + .store(JobState::Finished as u8 | (state & mask), Ordering::Release); + } +} + +impl Job {} + +pub struct HeapJob { + f: F, +} + +impl HeapJob { + pub fn new(f: F) -> Box { + Box::new(Self { f }) + } + pub fn into_boxed_job(self: Box) -> Box> + where + F: FnOnce() -> T + Send, + T: Send, + { + unsafe fn harness(this: *const (), job: *const Job<()>) + where + F: FnOnce() -> T + Send, + T: Sized + Send, + { + let job = unsafe { &*job.cast::>() }; + + let this = unsafe { Box::from_raw(this.cast::>().cast_mut()) }; + let f = this.f; + + job.complete(f()); + } + + let size = mem::size_of::(); + let align = mem::align_of::(); + + let new_state = if size > mem::size_of::>() || align > mem::align_of::>() { + JobState::Empty as u8 + } else { + JobState::Inline as u8 + }; + + Box::new(Job { + state: AtomicU8::new(new_state), + this: SendPtr::new(Box::into_raw(self)).unwrap().cast(), + waiting_thread: UnsafeCell::new(None), + harness: harness::, + maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), + }) + } +} + +pub struct StackJob { + f: UnsafeCell>, +} + +impl StackJob { + pub fn new(f: F) -> Self { + Self { + f: UnsafeCell::new(ManuallyDrop::new(f)), + } + } + + pub unsafe fn unwrap(&self) -> F { + unsafe { ManuallyDrop::take(&mut *self.f.get()) } + } + + pub fn as_boxed_job(&self) -> Box> + where + F: FnOnce() -> T + Send, + T: Send, + { + unsafe fn harness(this: *const (), job: *const Job<()>) + where + F: FnOnce() -> T + Send, + T: Sized + Send, + { + let job = unsafe { &*job.cast::>() }; + + let this = unsafe { &*this.cast::>() }; + let f = unsafe { this.unwrap() }; + + job.complete(f()); + } + + let size = mem::size_of::(); + let align = mem::align_of::(); + + let new_state = if size > mem::size_of::>() || align > mem::align_of::>() { + JobState::Empty as u8 + } else { + JobState::Inline as u8 + }; + + Box::new(Job { + state: AtomicU8::new(new_state), + this: SendPtr::new(self).unwrap().cast(), + waiting_thread: UnsafeCell::new(None), + harness: harness::, + maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index 1b5c87d..0b953ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ use task::{HeapTask, StackTask, TaskRef}; use tracing::debug; pub mod job; +pub mod util; pub mod task { use std::{cell::UnsafeCell, marker::PhantomPinned, pin::Pin}; diff --git a/src/melange.rs b/src/melange.rs index 40b1ce9..da69c0d 100644 --- a/src/melange.rs +++ b/src/melange.rs @@ -3,20 +3,19 @@ use std::{ collections::VecDeque, marker::PhantomPinned, ops::{Deref, DerefMut}, - pin::pin, ptr::{self, NonNull}, sync::{ atomic::{AtomicBool, Ordering}, Arc, Weak, }, thread, - time::{Duration, Instant}, + time::Duration, }; use crossbeam::utils::CachePadded; use parking_lot::{Condvar, Mutex}; -use crate::{latch::*, task::*, ThreadControl, ThreadStatus}; +use crate::{latch::*, ThreadControl}; mod job { use std::{ cell::{Cell, UnsafeCell}, @@ -304,7 +303,8 @@ mod job { } //use job::{Future, Job, JobQueue, JobStack}; -use crate::job::{Job, JobRef, StackJob}; +use crate::job::v2::{Job, JobState, StackJob}; +// use crate::job::{Job, JobRef, StackJob}; struct ThreadState { control: ThreadControl, @@ -312,11 +312,10 @@ struct ThreadState { struct Heartbeat { is_set: Weak, - last_time: Cell, } pub struct SharedContext { - shared_tasks: Vec>, + shared_tasks: Vec>>, heartbeats: Vec>, rng: crate::rng::XorShift64Star, } @@ -337,7 +336,6 @@ impl SharedContext { let is_set = Arc::new(AtomicBool::new(true)); let heartbeat = Heartbeat { is_set: Arc::downgrade(&is_set), - last_time: Cell::new(Instant::now()), }; let index = match self.heartbeats.iter().position(|a| a.is_none()) { @@ -355,7 +353,7 @@ impl SharedContext { (is_set, index) } - fn pop_first_task(&mut self) -> Option { + fn pop_first_task(&mut self) -> Option> { self.shared_tasks .iter_mut() .filter_map(|task| task.take()) @@ -376,7 +374,7 @@ std::thread_local! { pub struct WorkerThread { context: Arc, index: usize, - queue: VecDeque, + queue: VecDeque>, heartbeat: Arc, join_count: u8, sleep_count: usize, @@ -504,8 +502,10 @@ impl WorkerThread { } } - fn execute_job(&mut self, job: JobRef) { - unsafe { core::mem::transmute::>(job).execute(self) }; + fn execute_job(&mut self, job: NonNull) { + unsafe { + job.as_ref().execute(); + } } #[cold] @@ -569,18 +569,19 @@ impl WorkerThread { RB: Send, { let mut ra = None; + let latch = AtomicLatch::new(); let a = |scope: &mut WorkerThread| { if scope.heartbeat.load(Ordering::Relaxed) { scope.heartbeat_cold(); } ra = Some(a(scope)); + unsafe { + Latch::set_raw(&latch); + } }; - let latch = AtomicLatch::new(); - let ctx = self.context.clone(); - let idx = self.index; - let stack = StackJob::new(a, latch); + let stack = StackJob::new(a); let task: JobRef = unsafe { core::mem::transmute::, JobRef>(stack.as_task_ref()) }; @@ -631,27 +632,33 @@ impl WorkerThread { impl Context { fn heartbeat(self: Arc, interaval: Duration) { + let mut n = 0; loop { if self.heartbeat_control.should_terminate.probe() { break; } let sleep_for = { let guard = self.shared.lock(); - let now = Instant::now(); let num_heartbeats = guard .heartbeats .iter() .filter_map(Option::as_ref) - .filter_map(|h| h.is_set.upgrade().map(|is_set| (is_set, &h.last_time))) - .inspect(|(is_set, last_time)| { - if now.duration_since(last_time.get()) >= interaval { + .filter_map(|h| h.is_set.upgrade().map(|is_set| is_set)) + .enumerate() + .inspect(|(i, is_set)| { + if *i == n { is_set.store(true, Ordering::Relaxed); - last_time.set(now); } }) .count(); + if n >= num_heartbeats { + n = 0; + } else { + n += 1; + } + interaval.checked_div(num_heartbeats as u32) }; @@ -660,6 +667,41 @@ impl Context { } } } + + fn heartbeat2(self: Arc, interval: Duration) { + let mut i = 0; + loop { + if self.heartbeat_control.should_terminate.probe() { + break; + } + let sleep_for = { + let guard = self.shared.lock(); + + let mut num = 0; + for is_set in guard + .heartbeats + .iter() + .filter_map(Option::as_ref) + .filter_map(|h| h.is_set.upgrade()) + { + if num == i { + is_set.store(true, Ordering::Relaxed); + } + num += 1; + } + + if num >= i { + i = 0; + } + + interval.checked_div(num) + }; + + if let Some(duration) = sleep_for { + thread::sleep(duration); + } + } + } } impl Drop for Context { @@ -714,7 +756,7 @@ impl ThreadPool { let ctx = this.context.clone(); std::thread::spawn(|| { - ctx.heartbeat(Duration::from_micros(100)); + ctx.heartbeat2(Duration::from_micros(100)); }); this diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..f903d8f --- /dev/null +++ b/src/util.rs @@ -0,0 +1,67 @@ +use core::{ + cell::Cell, + ops::{Deref, DerefMut}, + ptr::NonNull, +}; + +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[repr(transparent)] +pub struct SendPtr(NonNull); + +impl SendPtr { + pub fn as_ptr(&self) -> *mut T { + self.0.as_ptr() + } + pub unsafe fn new_unchecked(t: *const T) -> Self { + unsafe { Self(NonNull::new_unchecked(t.cast_mut())) } + } + pub fn new(t: *const T) -> Option { + NonNull::new(t.cast_mut()).map(Self) + } + pub fn cast(self) -> SendPtr { + SendPtr(self.0.cast::()) + } +} + +impl Deref for SendPtr { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.0.as_ptr() } + } +} + +impl DerefMut for SendPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { &mut *self.0.as_ptr() } + } +} + +pub struct XorShift64Star { + state: Cell, +} + +impl XorShift64Star { + /// Initializes the prng with a seed. Provided seed must be nonzero. + pub fn new(seed: u64) -> Self { + XorShift64Star { + state: Cell::new(seed), + } + } + + /// Returns a pseudorandom number. + pub fn next(&self) -> u64 { + let mut x = self.state.get(); + debug_assert_ne!(x, 0); + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + self.state.set(x); + x.wrapping_mul(0x2545_f491_4f6c_dd1d) + } + + /// Return a pseudorandom number from `0..n`. + pub fn next_usize(&self, n: usize) -> usize { + (self.next() % n as u64) as usize + } +}