This commit is contained in:
Janis 2025-02-07 07:36:37 +01:00
parent b83bfeca51
commit f34dc61984
7 changed files with 396 additions and 35 deletions

View file

@ -32,6 +32,7 @@ anyhow = "1.0.89"
thiserror = "2.0"
bitflags = "2.6"
core_affinity = "0.8.1"
parking_lot_core = "0.9.10"
# derive_more = "1.0.0"
[dev-dependencies]

View file

@ -3,6 +3,9 @@ use std::{cell::UnsafeCell, marker::PhantomPinned, sync::atomic::AtomicBool};
use crate::latch::Latch;
pub mod spice;
pub mod v2;
pub trait Job<Args = ()> {
unsafe fn execute(this: *const (), args: Args);
}
@ -35,23 +38,15 @@ impl<Args> JobRef<Args> {
}
}
pub struct StackJob<F, L>
where
L: Latch + Sync,
{
pub struct StackJob<F> {
task: UnsafeCell<Option<F>>,
latch: L,
_phantom: PhantomPinned,
}
impl<F, L> StackJob<F, L>
where
L: Latch + Sync,
{
pub fn new(task: F, latch: L) -> StackJob<F, L> {
impl<F> StackJob<F> {
pub fn new(task: F) -> StackJob<F> {
Self {
task: UnsafeCell::new(Some(task)),
latch,
_phantom: PhantomPinned,
}
}
@ -77,16 +72,14 @@ where
}
}
impl<Args, F, L> Job<Args> for StackJob<F, L>
impl<Args, F> Job<Args> for StackJob<F>
where
F: FnOnce(Args),
L: Latch + Sync,
{
unsafe fn execute(this: *const (), args: Args) {
let this = &*this.cast::<Self>();
let func = (*this.task.get()).take().unwrap();
func(args);
Latch::set_raw(&this.latch);
// set internal latch here?
}
}

1
src/job/spice.rs Normal file
View file

@ -0,0 +1 @@
use std::{cell::UnsafeCell, sync::atomic::AtomicU8};

256
src/job/v2.rs Normal file
View file

@ -0,0 +1,256 @@
use core::{
cell::UnsafeCell,
mem::{self, ManuallyDrop, MaybeUninit},
sync::atomic::{AtomicU8, Ordering},
};
use std::thread::Thread;
use parking_lot_core::SpinWait;
use crate::util::SendPtr;
#[allow(dead_code)]
#[cfg_attr(target_pointer_width = "64", repr(align(16)))]
#[cfg_attr(target_pointer_width = "32", repr(align(8)))]
#[derive(Debug, Default, Clone, Copy)]
struct Size2([usize; 2]);
struct Value<T>(pub MaybeUninit<Box<MaybeUninit<T>>>);
impl<T> Value<T> {
unsafe fn get(self, inline: bool) -> T {
if inline {
unsafe { mem::transmute_copy(&self.0) }
} else {
unsafe { (*self.0.assume_init()).assume_init() }
}
}
}
#[repr(u8)]
pub enum JobState {
Empty,
Locked = 1,
Pending,
Finished,
Inline = 1 << (u8::BITS - 1),
}
pub struct Job<T = ()> {
state: AtomicU8,
this: SendPtr<()>,
harness: unsafe fn(*const (), *const Job<()>),
maybe_boxed_val: UnsafeCell<MaybeUninit<Value<T>>>,
waiting_thread: UnsafeCell<Option<Thread>>,
}
impl<T> Job<T> {
pub fn state(&self) -> u8 {
self.state.load(Ordering::Relaxed) & !(JobState::Inline as u8)
}
pub fn wait(&self) -> Option<T> {
let mut state = self.state.load(Ordering::Relaxed);
let mask = JobState::Inline as u8;
let mut spin = SpinWait::new();
loop {
match self.state.compare_exchange(
JobState::Pending as u8 | (state & mask),
JobState::Locked as u8 | (state & mask),
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(x) => {
state = x;
unsafe {
*self.waiting_thread.get() = Some(std::thread::current());
}
self.state
.store(JobState::Pending as u8 | (state & mask), Ordering::Release);
std::thread::park();
spin.reset();
break;
}
Err(x) => {
if x & JobState::Finished as u8 != 0 {
let val = unsafe {
let value = (&*self.maybe_boxed_val.get()).assume_init_read();
value.get(state & JobState::Inline as u8 != 0)
};
return Some(val);
} else {
spin.spin();
}
}
}
}
return None;
}
/// call this when popping value from local queue
pub fn set_pending(&self) {
let state = self.state.load(Ordering::Relaxed);
let mask = JobState::Inline as u8;
let mut spin = SpinWait::new();
loop {
match self.state.compare_exchange(
JobState::Empty as u8 | (state & mask),
JobState::Pending as u8 | (state & mask),
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
return;
}
Err(_) => {
spin.spin();
}
}
}
}
pub fn execute(&self) {
// SAFETY: self is non-null
unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast()) };
}
fn complete(&self, result: T) {
let mut state = self.state.load(Ordering::Relaxed);
let mask = JobState::Inline as u8;
let mut spin = SpinWait::new();
loop {
match self.state.compare_exchange(
JobState::Pending as u8 | (state & mask),
JobState::Locked as u8 | (state & mask),
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(x) => {
state = x;
break;
}
Err(_) => {
spin.spin();
}
}
}
unsafe {
let value = (&mut *self.maybe_boxed_val.get()).assume_init_mut();
// SAFETY: we know the box is allocated if state was `Pending`.
if state & JobState::Inline as u8 == 0 {
value.0 = MaybeUninit::new(Box::new(MaybeUninit::new(result)));
} else {
*mem::transmute::<_, &mut T>(&mut value.0) = result;
}
}
if let Some(thread) = unsafe { &mut *self.waiting_thread.get() }.take() {
thread.unpark();
}
self.state
.store(JobState::Finished as u8 | (state & mask), Ordering::Release);
}
}
impl Job {}
pub struct HeapJob<F> {
f: F,
}
impl<F> HeapJob<F> {
pub fn new(f: F) -> Box<Self> {
Box::new(Self { f })
}
pub fn into_boxed_job<T>(self: Box<Self>) -> Box<Job<()>>
where
F: FnOnce() -> T + Send,
T: Send,
{
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
where
F: FnOnce() -> T + Send,
T: Sized + Send,
{
let job = unsafe { &*job.cast::<Job<T>>() };
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
let f = this.f;
job.complete(f());
}
let size = mem::size_of::<T>();
let align = mem::align_of::<T>();
let new_state = if size > mem::size_of::<Box<T>>() || align > mem::align_of::<Box<T>>() {
JobState::Empty as u8
} else {
JobState::Inline as u8
};
Box::new(Job {
state: AtomicU8::new(new_state),
this: SendPtr::new(Box::into_raw(self)).unwrap().cast(),
waiting_thread: UnsafeCell::new(None),
harness: harness::<F, T>,
maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()),
})
}
}
pub struct StackJob<F> {
f: UnsafeCell<ManuallyDrop<F>>,
}
impl<F> StackJob<F> {
pub fn new(f: F) -> Self {
Self {
f: UnsafeCell::new(ManuallyDrop::new(f)),
}
}
pub unsafe fn unwrap(&self) -> F {
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
}
pub fn as_boxed_job<T>(&self) -> Box<Job<()>>
where
F: FnOnce() -> T + Send,
T: Send,
{
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
where
F: FnOnce() -> T + Send,
T: Sized + Send,
{
let job = unsafe { &*job.cast::<Job<T>>() };
let this = unsafe { &*this.cast::<StackJob<F>>() };
let f = unsafe { this.unwrap() };
job.complete(f());
}
let size = mem::size_of::<T>();
let align = mem::align_of::<T>();
let new_state = if size > mem::size_of::<Box<T>>() || align > mem::align_of::<Box<T>>() {
JobState::Empty as u8
} else {
JobState::Inline as u8
};
Box::new(Job {
state: AtomicU8::new(new_state),
this: SendPtr::new(self).unwrap().cast(),
waiting_thread: UnsafeCell::new(None),
harness: harness::<F, T>,
maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()),
})
}
}

View file

@ -28,6 +28,7 @@ use task::{HeapTask, StackTask, TaskRef};
use tracing::debug;
pub mod job;
pub mod util;
pub mod task {
use std::{cell::UnsafeCell, marker::PhantomPinned, pin::Pin};

View file

@ -3,20 +3,19 @@ use std::{
collections::VecDeque,
marker::PhantomPinned,
ops::{Deref, DerefMut},
pin::pin,
ptr::{self, NonNull},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Weak,
},
thread,
time::{Duration, Instant},
time::Duration,
};
use crossbeam::utils::CachePadded;
use parking_lot::{Condvar, Mutex};
use crate::{latch::*, task::*, ThreadControl, ThreadStatus};
use crate::{latch::*, ThreadControl};
mod job {
use std::{
cell::{Cell, UnsafeCell},
@ -304,7 +303,8 @@ mod job {
}
//use job::{Future, Job, JobQueue, JobStack};
use crate::job::{Job, JobRef, StackJob};
use crate::job::v2::{Job, JobState, StackJob};
// use crate::job::{Job, JobRef, StackJob};
struct ThreadState {
control: ThreadControl,
@ -312,11 +312,10 @@ struct ThreadState {
struct Heartbeat {
is_set: Weak<AtomicBool>,
last_time: Cell<Instant>,
}
pub struct SharedContext {
shared_tasks: Vec<Option<JobRef>>,
shared_tasks: Vec<Option<NonNull<Job>>>,
heartbeats: Vec<Option<Heartbeat>>,
rng: crate::rng::XorShift64Star,
}
@ -337,7 +336,6 @@ impl SharedContext {
let is_set = Arc::new(AtomicBool::new(true));
let heartbeat = Heartbeat {
is_set: Arc::downgrade(&is_set),
last_time: Cell::new(Instant::now()),
};
let index = match self.heartbeats.iter().position(|a| a.is_none()) {
@ -355,7 +353,7 @@ impl SharedContext {
(is_set, index)
}
fn pop_first_task(&mut self) -> Option<JobRef> {
fn pop_first_task(&mut self) -> Option<NonNull<Job>> {
self.shared_tasks
.iter_mut()
.filter_map(|task| task.take())
@ -376,7 +374,7 @@ std::thread_local! {
pub struct WorkerThread {
context: Arc<Context>,
index: usize,
queue: VecDeque<JobRef>,
queue: VecDeque<NonNull<Job>>,
heartbeat: Arc<AtomicBool>,
join_count: u8,
sleep_count: usize,
@ -504,8 +502,10 @@ impl WorkerThread {
}
}
fn execute_job(&mut self, job: JobRef) {
unsafe { core::mem::transmute::<JobRef, JobRef<&mut WorkerThread>>(job).execute(self) };
fn execute_job(&mut self, job: NonNull<Job>) {
unsafe {
job.as_ref().execute();
}
}
#[cold]
@ -569,18 +569,19 @@ impl WorkerThread {
RB: Send,
{
let mut ra = None;
let latch = AtomicLatch::new();
let a = |scope: &mut WorkerThread| {
if scope.heartbeat.load(Ordering::Relaxed) {
scope.heartbeat_cold();
}
ra = Some(a(scope));
unsafe {
Latch::set_raw(&latch);
}
};
let latch = AtomicLatch::new();
let ctx = self.context.clone();
let idx = self.index;
let stack = StackJob::new(a, latch);
let stack = StackJob::new(a);
let task: JobRef =
unsafe { core::mem::transmute::<JobRef<_>, JobRef>(stack.as_task_ref()) };
@ -631,27 +632,33 @@ impl WorkerThread {
impl Context {
fn heartbeat(self: Arc<Self>, interaval: Duration) {
let mut n = 0;
loop {
if self.heartbeat_control.should_terminate.probe() {
break;
}
let sleep_for = {
let guard = self.shared.lock();
let now = Instant::now();
let num_heartbeats = guard
.heartbeats
.iter()
.filter_map(Option::as_ref)
.filter_map(|h| h.is_set.upgrade().map(|is_set| (is_set, &h.last_time)))
.inspect(|(is_set, last_time)| {
if now.duration_since(last_time.get()) >= interaval {
.filter_map(|h| h.is_set.upgrade().map(|is_set| is_set))
.enumerate()
.inspect(|(i, is_set)| {
if *i == n {
is_set.store(true, Ordering::Relaxed);
last_time.set(now);
}
})
.count();
if n >= num_heartbeats {
n = 0;
} else {
n += 1;
}
interaval.checked_div(num_heartbeats as u32)
};
@ -660,6 +667,41 @@ impl Context {
}
}
}
fn heartbeat2(self: Arc<Self>, interval: Duration) {
let mut i = 0;
loop {
if self.heartbeat_control.should_terminate.probe() {
break;
}
let sleep_for = {
let guard = self.shared.lock();
let mut num = 0;
for is_set in guard
.heartbeats
.iter()
.filter_map(Option::as_ref)
.filter_map(|h| h.is_set.upgrade())
{
if num == i {
is_set.store(true, Ordering::Relaxed);
}
num += 1;
}
if num >= i {
i = 0;
}
interval.checked_div(num)
};
if let Some(duration) = sleep_for {
thread::sleep(duration);
}
}
}
}
impl Drop for Context {
@ -714,7 +756,7 @@ impl ThreadPool {
let ctx = this.context.clone();
std::thread::spawn(|| {
ctx.heartbeat(Duration::from_micros(100));
ctx.heartbeat2(Duration::from_micros(100));
});
this

67
src/util.rs Normal file
View file

@ -0,0 +1,67 @@
use core::{
cell::Cell,
ops::{Deref, DerefMut},
ptr::NonNull,
};
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(transparent)]
pub struct SendPtr<T>(NonNull<T>);
impl<T> SendPtr<T> {
pub fn as_ptr(&self) -> *mut T {
self.0.as_ptr()
}
pub unsafe fn new_unchecked(t: *const T) -> Self {
unsafe { Self(NonNull::new_unchecked(t.cast_mut())) }
}
pub fn new(t: *const T) -> Option<Self> {
NonNull::new(t.cast_mut()).map(Self)
}
pub fn cast<U>(self) -> SendPtr<U> {
SendPtr(self.0.cast::<U>())
}
}
impl<T> Deref for SendPtr<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.0.as_ptr() }
}
}
impl<T> DerefMut for SendPtr<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.0.as_ptr() }
}
}
pub struct XorShift64Star {
state: Cell<u64>,
}
impl XorShift64Star {
/// Initializes the prng with a seed. Provided seed must be nonzero.
pub fn new(seed: u64) -> Self {
XorShift64Star {
state: Cell::new(seed),
}
}
/// Returns a pseudorandom number.
pub fn next(&self) -> u64 {
let mut x = self.state.get();
debug_assert_ne!(x, 0);
x ^= x >> 12;
x ^= x << 25;
x ^= x >> 27;
self.state.set(x);
x.wrapping_mul(0x2545_f491_4f6c_dd1d)
}
/// Return a pseudorandom number from `0..n`.
pub fn next_usize(&self, n: usize) -> usize {
(self.next() % n as u64) as usize
}
}