From 1baf870d9c3fbfca9057f30ce46c0113513e1213 Mon Sep 17 00:00:00 2001 From: Janis Date: Tue, 17 Jun 2025 14:47:17 +0200 Subject: [PATCH] lots + removed pin in joblist and jobs + running closures in worker functions --- src/praetor/mod.rs | 704 ++++++++++++++++++++++++++++++++------------- 1 file changed, 508 insertions(+), 196 deletions(-) diff --git a/src/praetor/mod.rs b/src/praetor/mod.rs index 1e24237..afe0ba1 100644 --- a/src/praetor/mod.rs +++ b/src/praetor/mod.rs @@ -240,10 +240,9 @@ mod job { cell::UnsafeCell, fmt::{Debug, Display}, hint::cold_path, - marker::PhantomPinned, mem::{self, ManuallyDrop, MaybeUninit}, ops::{Deref, DerefMut}, - pin::Pin, + panic::resume_unwind, ptr::{self, NonNull}, sync::atomic::Ordering, thread::Thread, @@ -251,6 +250,8 @@ mod job { use parking_lot_core::SpinWait; + use crate::latch::Latch; + use super::util::TaggedAtomicPtr; #[derive(Debug)] @@ -427,14 +428,14 @@ mod job { #[derive(Debug)] pub struct JobList { - head: Pin>, - tail: Pin>, + head: Box, + tail: Box, } impl JobList { pub fn new() -> JobList { - let head = Box::pin(Job::empty()); - let tail = Box::pin(Job::empty()); + let head = Box::new(Job::empty()); + let tail = Box::new(Job::empty()); // head and tail point at themselves unsafe { @@ -464,7 +465,7 @@ mod job { } /// elem must be valid until it is popped. - pub unsafe fn push_front(&mut self, elem: Pin<&Job>) { + pub unsafe fn push_front(&mut self, elem: &Job) { let head_link = unsafe { self.head.link_mut() }; // SAFETY: head will always have a previous element. @@ -481,7 +482,7 @@ mod job { } /// elem must be valid until it is popped. - pub unsafe fn push_back(&mut self, elem: Pin<&Job>) { + pub unsafe fn push_back(&mut self, elem: &Job) { let tail_link = unsafe { self.tail.link_mut() }; // SAFETY: tail will always have a previous element. @@ -569,7 +570,7 @@ mod job { val_or_this: UnsafeCell>, /// (prev,next) before execute(), Box<...> after err_or_link: UnsafeCell>, - _phantom: PhantomPinned, + // _phantom: PhantomPinned, } impl Debug for Job { @@ -637,10 +638,21 @@ mod job { next: None, }, }), - _phantom: PhantomPinned, + // _phantom: PhantomPinned, } } + // Job is passed around type-erased as `Job<()>`, to complete the job we + // need to cast it back to the original type. + pub unsafe fn transmute_ref(&self) -> &Job { + mem::transmute::<&Job, &Job>(self) + } + + pub unsafe fn unwrap_this(&self) -> NonNull<()> { + assert!(self.state() == JobState::Empty as u8); + unsafe { (&*self.val_or_this.get()).this } + } + pub fn empty() -> Job { Self { harness_and_state: TaggedAtomicPtr::new( @@ -656,7 +668,7 @@ mod job { next: None, }, }), - _phantom: PhantomPinned, + // _phantom: PhantomPinned, } } @@ -684,7 +696,7 @@ mod job { self.harness_and_state.tag(Ordering::Relaxed) as u8 } - pub fn wait(&self) -> std::thread::Result { + pub fn wait(&self) -> JobResult { let mut spin = SpinWait::new(); loop { match self.harness_and_state.compare_exchange_weak_tag( @@ -728,7 +740,7 @@ mod job { Ok(val.into_inner()) }; - return result; + return JobResult::new(result); } else { // spin until lock is released. spin.spin(); @@ -826,10 +838,16 @@ mod job { } } + impl crate::Probe for Job { + fn probe(&self) -> bool { + self.state() == JobState::Finished as u8 + } + } + #[allow(dead_code)] pub struct HeapJob { f: F, - _phantom: PhantomPinned, + // _phantom: PhantomPinned, } impl HeapJob { @@ -837,14 +855,16 @@ mod job { pub fn new(f: F) -> Box { Box::new(Self { f, - _phantom: PhantomPinned, + // _phantom: PhantomPinned, }) } + pub fn into_inner(self) -> F { self.f } + #[allow(dead_code)] - pub fn into_boxed_job(self: Box) -> Pin>> + pub fn into_boxed_job(self: Box) -> Box> where F: FnOnce() -> T + Send, T: Send, @@ -855,71 +875,104 @@ mod job { F: FnOnce() -> T + Send, T: Sized + Send, { + eprintln!("heapjob harness"); + let job = job.cast_mut(); let this = unsafe { Box::from_raw(this.cast::>().cast_mut()) }; let f = this.f; _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); - _ = unsafe { Box::from_raw(job.cast_mut()) }; + // drop job (this is fine because the job of a HeapJob is pure POD). + ptr::drop_in_place(job); + + // free box that was allocated at (1) + _ = unsafe { Box::>>::from_raw(job.cast()) }; } - Box::pin(Job::new(harness::, unsafe { + // (1) allocate box for job + Box::new(Job::new(harness::, unsafe { NonNull::new_unchecked(Box::into_raw(self)).cast() })) } } - pub struct StackJob { + pub struct StackJob { + latch: L, f: UnsafeCell>, - _phantom: PhantomPinned, + // _phantom: PhantomPinned, } - impl StackJob { - pub fn new(f: F) -> Self { + impl StackJob { + pub fn new(f: F, latch: L) -> Self { Self { + latch, f: UnsafeCell::new(ManuallyDrop::new(f)), - _phantom: PhantomPinned, + // _phantom: PhantomPinned, } } pub unsafe fn unwrap(&self) -> F { unsafe { ManuallyDrop::take(&mut *self.f.get()) } } + } - pub fn as_job(self: Pin<&Self>) -> Job<()> + impl StackJob { + pub fn as_job(&self) -> Job<()> where F: FnOnce() -> T + Send, T: Send, { #[repr(align(8))] - unsafe fn harness(this: *const (), job: *const Job<()>) + unsafe fn harness(this: *const (), job: *const Job<()>) where F: FnOnce() -> T + Send, T: Sized + Send, { - let this = unsafe { &*this.cast::>() }; + let this = unsafe { &*this.cast::>() }; let f = unsafe { this.unwrap() }; let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); let job_ref = unsafe { &*job.cast::>() }; job_ref.complete(result); + crate::latch::Latch::set_raw(&this.latch); } - Job::new(harness::, unsafe { + Job::new(harness::, unsafe { NonNull::new_unchecked(&*self as *const _ as *mut ()) }) } } + + pub struct JobResult { + result: std::thread::Result, + } + + impl JobResult { + pub fn new(result: std::thread::Result) -> Self { + Self { result } + } + + // unwraps the result, propagating panics + pub fn into_result(self) -> T { + match self.result { + Ok(val) => val, + Err(payload) => { + cold_path(); + resume_unwind(payload); + } + } + } + } } use std::{ - cell::{Cell, UnsafeCell}, + cell::UnsafeCell, collections::BTreeMap, future::Future, + hint::cold_path, marker::PhantomData, mem::{self, MaybeUninit}, - pin::{pin, Pin}, ptr::NonNull, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -934,6 +987,8 @@ use job::*; use parking_lot::{Condvar, Mutex}; use util::{DropGuard, SendPtr}; +use crate::latch::{AtomicLatch, LatchRef, NopLatch}; + #[derive(Debug, Default)] pub struct JobCounter { jobs_pending: AtomicUsize, @@ -945,6 +1000,10 @@ impl JobCounter { self.jobs_pending.fetch_add(1, Ordering::Relaxed); } + pub fn count(&self) -> usize { + self.jobs_pending.load(Ordering::Relaxed) + } + pub fn decrement(&self) { if self.jobs_pending.fetch_sub(1, Ordering::SeqCst) == 1 { if let Some(thread) = self.waker.lock().take() { @@ -972,21 +1031,21 @@ struct WorkerThread { } pub struct Scope<'scope> { - join_count: Cell, - context: Arc, - index: usize, - heartbeat: Arc>, - queue: UnsafeCell, - + // counter for join_every_* function + join_count: AtomicUsize, + // latch to wait on before the scope finishes job_counter: JobCounter, - _pd: PhantomData<&'scope ()>, + // local threadpool + context: Arc, + // variant lifetime + _pd: PhantomData, } thread_local! { - static SCOPE: UnsafeCell>>> = const { UnsafeCell::new(None) }; + static WORKER: UnsafeCell>> = const { UnsafeCell::new(None) }; } -impl<'scope> Scope<'scope> { +impl WorkerThread { /// locks shared context #[allow(dead_code)] fn new() -> Self { @@ -1002,14 +1061,28 @@ impl<'scope> Scope<'scope> { context, index, heartbeat, - join_count: Cell::new(0), queue: UnsafeCell::new(JobList::new()), - job_counter: JobCounter::default(), - _pd: PhantomData, + // join_count: Cell::new(0), + // _pd: PhantomData, } } - unsafe fn drop_in_place_and_dealloc(this: NonNull) { + fn drop_current_guard(new: Option>) -> DropGuard { + DropGuard::new(move || unsafe { + if let Some(old) = Self::unset_current() { + Self::drop_in_place_and_dealloc(old); + } else { + cold_path(); + tracing::error!("WorkerThread drop guard tried to drop None."); + } + + if let Some(new) = new { + Self::set_current(new.as_ptr().cast_const()); + } + }) + } + + unsafe fn drop_in_place_and_dealloc(this: NonNull) { unsafe { let ptr = this.as_ptr(); ptr.drop_in_place(); @@ -1018,18 +1091,17 @@ impl<'scope> Scope<'scope> { } } - fn with_in T>(ctx: &Arc, f: F) -> T { - let mut _guard = Option::>>::None; + fn with_in T>(ctx: &Arc, f: F) -> T { + let mut _guard = None; - let scope = match Self::current_ref() { - Some(scope) if Arc::ptr_eq(&scope.context, ctx) => scope, + let worker = match Self::current_ref() { + Some(current) if Arc::ptr_eq(¤t.context, ctx) => current, Some(_) => { - let old = unsafe { Self::unset_current().unwrap().as_ptr() }; - _guard = Some(DropGuard::new(Box::new(move || unsafe { - Self::drop_in_place_and_dealloc(Self::unset_current().unwrap()); - - Self::set_current(old.cast_const()); - }))); + // this thread's worker isn't in the same threadpool as us: + // - create a new worker in our threadpool and set it as the + // current, then make sure to drop that worker when we're done + // and replace the old worker. + _guard = Some(Self::drop_current_guard(unsafe { Self::unset_current() })); let current = Box::into_raw(Box::new(Self::new_in(ctx.clone()))); unsafe { @@ -1040,9 +1112,8 @@ impl<'scope> Scope<'scope> { None => { let current = Box::into_raw(Box::new(Self::new_in(ctx.clone()))); - _guard = Some(DropGuard::new(Box::new(|| unsafe { - Self::drop_in_place_and_dealloc(Self::unset_current().unwrap()); - }))); + // drop the newly created worker thread once we're done. + _guard = Some(Self::drop_current_guard(None)); unsafe { Self::set_current(current.cast_const()); @@ -1052,40 +1123,42 @@ impl<'scope> Scope<'scope> { } }; - let t = f(scope); + let t = f(worker); t } - pub fn with T>(f: F) -> T { + pub fn with T>(f: F) -> T { Self::with_in(Context::global(), f) } - unsafe fn set_current(scope: *const Scope<'static>) { - SCOPE.with(|ptr| unsafe { - _ = (&mut *ptr.get()).insert(NonNull::new_unchecked(scope.cast_mut())); + /// sets the thread-local worker to this. + unsafe fn set_current(this: *const WorkerThread) { + WORKER.with(|ptr| unsafe { + _ = (&mut *ptr.get()).insert(NonNull::new_unchecked(this.cast_mut())); }) } - unsafe fn unset_current() -> Option>> { - SCOPE.with(|ptr| unsafe { (&mut *ptr.get()).take() }) + /// sets the thread-local worker to None and returns it, if it was occupied. + unsafe fn unset_current() -> Option> { + WORKER.with(|ptr| unsafe { (&mut *ptr.get()).take() }) } #[allow(dead_code)] - fn current() -> Option>> { - SCOPE.with(|ptr| unsafe { *ptr.get() }) + fn current() -> Option> { + WORKER.with(|ptr| unsafe { *ptr.get() }) } - fn current_ref<'a>() -> Option<&'a Scope<'scope>> { - SCOPE.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) }) + fn current_ref<'a>() -> Option<&'a WorkerThread> { + WORKER.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) }) } - fn push_front(&self, job: Pin<&Job>) { + fn push_front(&self, job: &Job) { unsafe { self.queue.as_mut_unchecked().push_front(job); } } #[allow(dead_code)] - fn push_back(&self, job: Pin<&Job>) { + fn push_back(&self, job: &Job) { unsafe { self.queue.as_mut_unchecked().push_back(job); } @@ -1105,28 +1178,201 @@ impl<'scope> Scope<'scope> { } self.execute(job); } + } + #[inline(always)] + fn tick(&self) { + if self.heartbeat.load(Ordering::Relaxed) { + self.heartbeat_cold(); + } + } + + #[inline] + fn execute(&self, job: NonNull) { + self.tick(); + Job::execute(job); + } + + #[cold] + fn heartbeat_cold(&self) { + let mut guard = self.context.shared.lock(); + + if !guard.jobs.contains_key(&self.index) { + if let Some(job) = self.pop_back() { + unsafe { + job.as_ref().set_pending(); + } + guard.jobs.insert(self.index, job); + self.context.shared_job.notify_one(); + } + } + + self.heartbeat.store(false, Ordering::Relaxed); + } + + #[cold] + fn wait_until_latch_cold(&self, latch: &Latch) { + // does this optimise? + assert!(!latch.probe()); + + 'outer: while !latch.probe() { + // take the shared job, if it exists + if let Some(shared_job) = self.context.shared.lock().jobs.remove(&self.index) { + self.execute(shared_job); + } + + while !latch.probe() { + let mut guard = self.context.shared.lock(); + + match guard.jobs.pop_first().map(|(_, job)| job) { + Some(job) => { + drop(guard); + self.execute(job); + continue 'outer; + } + None => { + // TODO: spin2win + self.context.shared_job.wait(&mut guard); + } + } + } + } + } + + pub fn wait_until_latch(&self, latch: &Latch) { + if !latch.probe() { + self.wait_until_latch_cold(latch); + } + } + + pub fn wait_until_job(&self, job: &Job) -> Option> { + // take the shared job and check if it is our job + let shared_job = self.context.shared.lock().jobs.remove(&self.index); + + if let Some(ptr) = shared_job { + if ptr.as_ptr() == &*job as *const _ as *mut _ { + // we can more efficiently run the job inline + return None; + } else { + // execute this job since it hasn't been taken + self.execute(ptr); + } + } + + while job.state() != JobState::Finished as u8 { + let Some(job) = self + .context + .shared + .lock() + .jobs + .pop_first() + .map(|(_, job)| job) + // .or_else(|| { + // self.pop_front().inspect(|job| unsafe { + // job.as_ref().set_pending(); + // }) + // }) + else { + break; + }; + + self.execute(job); + } + + // someone else has this job and is working on it, + // while job isn't done, suspend thread. + Some(job.wait()) + } +} + +impl<'scope> Scope<'scope> { + fn wait_for_jobs(&self) { unsafe { + tracing::trace!("waiting for {} jobs to finish.", self.job_counter.count()); self.job_counter.wait(); } } + pub fn scope(&self, f: F) -> R + where + F: FnOnce(&Self) -> R + Send, + R: Send, + { + self.complete(|| f(self)) + } + + fn complete(&self, f: F) -> R + where + F: FnOnce() -> R + Send, + R: Send, + { + use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe}; + #[repr(align(8))] + unsafe fn harness T, T>(this: *const (), job: *const Job) { + let f = unsafe { Box::from_raw(this.cast::().cast_mut()) }; + + let result = catch_unwind(AssertUnwindSafe(move || f())); + + let job = unsafe { Box::from_raw(job.cast_mut()) }; + job.complete(result); + } + + fn make_job T, T>(f: F) -> Job { + Job::::new(harness::, unsafe { + NonNull::new_unchecked(Box::into_raw(Box::new(f))).cast() + }) + } + + let result = WorkerThread::with_in(&self.context, |worker| { + let mut job = make_job(f); + + unsafe { + _ = worker; + job.set_pending(); + Job::execute(NonNull::new_unchecked(&mut job)); + } + + // let this = SendPtr::new_const(self).unwrap(); + // + // worker.push_front(&job); + // + // match worker.wait_until(&job) { + // Some(result) => result, + // None => unsafe { + // let f = Box::::from_non_null(job.unwrap_this().cast()); + // Ok(f(this.as_ref())) + // }, + // } + + job.wait() + }); + + self.wait_for_jobs(); + + result.into_result() + } + pub fn spawn(&self, f: F) where - F: FnOnce(&Scope<'scope>) + Send + 'scope, + F: FnOnce(&Scope<'scope>) + Send, { - self.job_counter.increment(); + WorkerThread::with_in(&self.context, |worker| { + self.job_counter.increment(); - let this = SendPtr::new_const(self).unwrap(); + let this = SendPtr::new_const(self).unwrap(); - let job = HeapJob::new(move || unsafe { - f(this.as_ref()); - this.as_ref().job_counter.decrement(); - }) - .into_boxed_job(); + let job = HeapJob::new(move || unsafe { + _ = f(this.as_ref()); + this.as_ref().job_counter.decrement(); + }) + .into_boxed_job(); - self.push_front(Pin::as_ref(&job)); - mem::forget(job); + tracing::trace!("allocated heapjob"); + + worker.push_front(&job); + Box::leak(job); + tracing::trace!("leaked heapjob"); + }); } pub fn spawn_future(&self, future: F) -> async_task::Task @@ -1134,40 +1380,41 @@ impl<'scope> Scope<'scope> { F: Future + Send + 'scope, T: Send + 'scope, { - self.job_counter.increment(); + WorkerThread::with_in(&self.context, |worker| { + self.job_counter.increment(); - let this = SendPtr::new_const(&self.job_counter).unwrap(); + let this = SendPtr::new_const(&self.job_counter).unwrap(); - let future = async move { - let _guard = DropGuard::new(move || unsafe { - this.as_ref().decrement(); - }); - future.await - }; + let future = async move { + let _guard = DropGuard::new(move || unsafe { + this.as_ref().decrement(); + }); + future.await + }; - let this = SendPtr::new_const(self).unwrap(); - let schedule = move |runnable: Runnable| { - #[repr(align(8))] - unsafe fn harness(this: *const (), job: *const Job) { - let runnable = Runnable::<()>::from_raw(NonNull::new_unchecked(this.cast_mut())); - runnable.run(); + let this = SendPtr::new_const(self).unwrap(); + let schedule = move |runnable: Runnable| { + #[repr(align(8))] + unsafe fn harness(this: *const (), job: *const Job) { + let runnable = + Runnable::<()>::from_raw(NonNull::new_unchecked(this.cast_mut())); + runnable.run(); - drop(Box::from_raw(job.cast_mut())); - } + drop(Box::from_raw(job.cast_mut())); + } - let job = Box::pin(Job::::new(harness::, runnable.into_raw())); + let job = Box::new(Job::::new(harness::, runnable.into_raw())); - unsafe { - this.as_ref().push_front(job.as_ref()); - } - mem::forget(job); - }; + worker.push_front(job.as_ref()); + mem::forget(job); + }; - let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; + let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; - runnable.schedule(); + runnable.schedule(); - task + task + }) } #[allow(dead_code)] @@ -1217,7 +1464,11 @@ impl<'scope> Scope<'scope> { { // let count = self.join_count.get(); // self.join_count.set(count.wrapping_add(1) % TIMES); - let count = self.join_count.update(|n| n.wrapping_add(1) % TIMES); + let count = self + .join_count + .update(Ordering::Relaxed, Ordering::Relaxed, |n| { + n.wrapping_add(1) % TIMES + }); if count == 1 { self.join_heartbeat(a, b) @@ -1233,16 +1484,23 @@ impl<'scope> Scope<'scope> { A: FnOnce(&Self) -> RA + Send, B: FnOnce(&Self) -> RB + Send, { + let worker = WorkerThread::current_ref().expect("join is run in workerthread."); + let this = SendPtr::new_const(self).unwrap(); - let a = pin!(StackJob::new(move || unsafe { - let scope = this.as_ref(); - scope.tick(); + let a = StackJob::new( + move || unsafe { + WorkerThread::current_ref() + .expect("stackjob is run in workerthread.") + .tick(); - a(scope) - })); + let scope = this.as_ref(); + a(scope) + }, + NopLatch, + ); - let job = pin!(a.as_ref().as_job()); - self.push_front(job.as_ref()); + let job = a.as_job(); + worker.push_front(&job); let rb = b(self); @@ -1253,11 +1511,8 @@ impl<'scope> Scope<'scope> { unsafe { a.unwrap()() } } else { - match self.wait_until::(unsafe { - mem::transmute::>, Pin<&Job>>(job.as_ref()) - }) { - Some(Ok(t)) => t, - Some(Err(payload)) => std::panic::resume_unwind(payload), + match worker.wait_until_job::(unsafe { job.transmute_ref::() }) { + Some(t) => t.into_result(), // propagate panic here None => unsafe { a.unwrap()() }, } }; @@ -1266,81 +1521,26 @@ impl<'scope> Scope<'scope> { (ra, rb) } - #[inline(always)] - fn tick(&self) { - if self.heartbeat.load(Ordering::Relaxed) { - self.heartbeat_cold(); + fn from_context(ctx: Arc) -> Self { + Self { + context: ctx, + join_count: AtomicUsize::new(0), + job_counter: JobCounter::default(), + _pd: PhantomData, } } - - #[inline] - fn execute(&self, job: NonNull) { - self.tick(); - Job::execute(job); - } - - #[cold] - fn heartbeat_cold(&self) { - let mut guard = self.context.shared.lock(); - - if !guard.jobs.contains_key(&self.index) { - if let Some(job) = self.pop_back() { - unsafe { - job.as_ref().set_pending(); - } - guard.jobs.insert(self.index, job); - self.context.shared_job.notify_one(); - } - } - - self.heartbeat.store(false, Ordering::Relaxed); - } - - pub fn wait_until(&self, job: Pin<&Job>) -> Option> { - let shared_job = self.context.shared.lock().jobs.remove(&self.index); - - if let Some(ptr) = shared_job { - if ptr.as_ptr() == &*job as *const _ as *mut _ { - return None; - } else { - self.execute(ptr); - } - } - - while job.state() != JobState::Finished as u8 { - let Some(job) = self - .context - .shared - .lock() - .jobs - .pop_first() - .map(|(_, job)| job) - // .or_else(|| { - // self.pop_front().inspect(|job| unsafe { - // job.as_ref().set_pending(); - // }) - // }) - else { - break; - }; - - self.execute(job); - } - // while job isn't done, run other jobs. - Some(job.wait()) - } } -#[allow(dead_code)] -pub fn join(a: A, b: B) -> (RA, RB) -where - RA: Send, - RB: Send, - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, -{ - Scope::with(|scope| scope.join(|_| a(), |_| b())) -} +// #[allow(dead_code)] +// pub fn join(a: A, b: B) -> (RA, RB) +// where +// RA: Send, +// RB: Send, +// A: FnOnce() -> RA + Send, +// B: FnOnce() -> RB + Send, +// { +// Scope::with(|scope| scope.join(|_| a(), |_| b())) +// } pub struct ThreadPool { context: Arc, @@ -1359,8 +1559,13 @@ impl ThreadPool { } } - pub fn scope T>(&self, f: F) -> T { - Scope::with_in(&self.context, f) + pub fn scope<'scope, R, F>(&self, f: F) -> R + where + F: FnOnce(&Scope<'scope>) -> R + Send, + R: Send, + { + let scope = Scope::from_context(self.context.clone()); + scope.scope(f) } } @@ -1372,6 +1577,7 @@ struct Context { struct SharedContext { jobs: BTreeMap>, heartbeats: BTreeMap>>, + injected_jobs: Vec>, // monotonic increasing id heartbeats_id: usize, should_stop: bool, @@ -1391,6 +1597,21 @@ impl SharedContext { (is_set, index) } + + fn pop_job(&mut self) -> Option> { + // this is unlikely, so make the function cold? + // TODO: profile this + if !self.injected_jobs.is_empty() { + return Some(unsafe { self.pop_injected_job() }); + } + + self.jobs.pop_first().map(|(_, job)| job) + } + + #[cold] + unsafe fn pop_injected_job(&mut self) -> NonNull { + self.injected_jobs.pop().unwrap() + } } impl Context { @@ -1399,13 +1620,14 @@ impl Context { shared: Mutex::new(SharedContext { jobs: BTreeMap::new(), heartbeats: BTreeMap::new(), + injected_jobs: Vec::new(), heartbeats_id: 0, should_stop: false, }), shared_job: Condvar::new(), }); - eprintln!("created threadpool {:?}", Arc::as_ptr(&this)); + tracing::trace!("created threadpool {:?}", Arc::as_ptr(&this)); let num_threads = available_parallelism(); // let num_threads = 2; @@ -1428,6 +1650,89 @@ impl Context { pub fn global() -> &'static Arc { GLOBAL_CONTEXT.get_or_init(|| Self::new()) } + + pub fn inject_job(&self, job: NonNull) { + let mut guard = self.shared.lock(); + guard.injected_jobs.push(job); + self.shared_job.notify_one(); + } + + fn run_in_worker_cross(self: &Arc, worker: &WorkerThread, f: F) -> T + where + F: FnOnce(&WorkerThread) -> T + Send, + T: Send, + { + // current thread is not in the same context, create a job and inject it into the other thread's context, then wait while working on our jobs. + + let latch = AtomicLatch::new(); + + let job = StackJob::new( + move || { + let worker = WorkerThread::current_ref() + .expect("WorkerThread::run_in_worker called outside of worker thread"); + + f(worker) + }, + LatchRef::new(&latch), + ); + + let job = job.as_job(); + + self.inject_job(Into::into(&job)); + worker.wait_until_latch(&latch); + + let t = unsafe { job.transmute_ref::().wait().into_result() }; + + t + } + + pub fn run_in_worker_cold(self: &Arc, f: F) -> T + where + F: FnOnce(&WorkerThread) -> T + Send, + T: Send, + { + use crate::latch::MutexLatch; + // current thread isn't a worker thread, create job and inject into global context + + let latch = MutexLatch::new(); + + let job = StackJob::new( + move || { + let worker = WorkerThread::current_ref() + .expect("WorkerThread::run_in_worker called outside of worker thread"); + + f(worker) + }, + LatchRef::new(&latch), + ); + + let job = job.as_job(); + + self.inject_job(Into::into(&job)); + latch.wait(); + + let t = unsafe { job.transmute_ref::().wait().into_result() }; + + t + } + + pub fn run_in_worker(self: &Arc, f: F) -> T + where + T: Send, + F: FnOnce(&WorkerThread) -> T + Send, + { + match WorkerThread::current_ref() { + Some(worker) => { + // check if worker is in the same context + if Arc::ptr_eq(&worker.context, self) { + f(worker) + } else { + self.run_in_worker_cross(worker, f) + } + } + None => self.run_in_worker_cold(f), + } + } } static GLOBAL_CONTEXT: OnceLock> = OnceLock::new(); @@ -1440,21 +1745,26 @@ fn available_parallelism() -> usize { } fn worker(ctx: Arc, barrier: Arc) { + tracing::trace!("new worker thread {:?}", std::thread::current()); + unsafe { - Scope::set_current(Box::into_raw(Box::new(Scope::new_in(ctx.clone()))).cast_const()); + WorkerThread::set_current( + Box::into_raw(Box::new(WorkerThread::new_in(ctx.clone()))).cast_const(), + ); } let _guard = DropGuard::new(|| unsafe { - Scope::drop_in_place_and_dealloc(Scope::unset_current().unwrap()); + WorkerThread::drop_in_place_and_dealloc(WorkerThread::unset_current().unwrap()); }); - let scope = Scope::current_ref().unwrap(); + let scope = WorkerThread::current_ref().unwrap(); barrier.wait(); - let mut job = ctx.shared.lock().jobs.pop_first(); + let mut job = ctx.shared.lock().pop_job(); loop { - if let Some((_, job)) = job { + tracing::trace!("worker({:?}): new job {:?}", std::thread::current(), job); + if let Some(job) = job { scope.execute(job); } @@ -1464,11 +1774,13 @@ fn worker(ctx: Arc, barrier: Arc) { } ctx.shared_job.wait(&mut guard); - job = guard.jobs.pop_first(); + job = guard.pop_job(); } } fn heartbeat_worker(ctx: Arc) { + tracing::trace!("new heartbeat thread {:?}", std::thread::current()); + let mut i = 0; loop { let sleep_for = {