#[cfg(feature = "metrics")] use std::sync::atomic::Ordering; use std::{ cell::{Cell, UnsafeCell}, ptr::NonNull, sync::{Arc, Barrier}, time::Duration, }; use crossbeam_utils::CachePadded; use crate::{ context::Context, heartbeat::OwnedHeartbeatReceiver, job::{Job2 as Job, JobQueue as JobList, SharedJob}, latch::Probe, util::DropGuard, }; pub struct WorkerThread { pub(crate) context: Arc, pub(crate) queue: UnsafeCell, pub(crate) heartbeat: OwnedHeartbeatReceiver, pub(crate) join_count: Cell, #[cfg(feature = "metrics")] pub(crate) metrics: CachePadded, } thread_local! { static WORKER: UnsafeCell>> = const { UnsafeCell::new(None) }; } impl WorkerThread { pub fn new_in(context: Arc) -> Self { let heartbeat = context.heartbeats.new_heartbeat(); Self { context, queue: UnsafeCell::new(JobList::new()), heartbeat, join_count: Cell::new(0), #[cfg(feature = "metrics")] metrics: CachePadded::new(crate::metrics::WorkerMetrics::default()), } } } impl WorkerThread { #[tracing::instrument(level = "trace", skip_all, fields( worker = self.heartbeat.index(), ))] pub fn run(self: Box, barrier: Arc) { let this = Box::into_raw(self); unsafe { Self::set_current(this); } let _guard = DropGuard::new(|| unsafe { // SAFETY: this is only called when the thread is exiting Self::unset_current(); Self::drop_in_place(this); }); tracing::trace!("WorkerThread::run: starting worker thread"); barrier.wait(); unsafe { (&*this).run_inner(); } #[cfg(feature = "metrics")] unsafe { eprintln!("{:?}", (&*this).metrics); } tracing::trace!("WorkerThread::run: worker thread finished"); } #[tracing::instrument(level = "trace", skip_all)] fn run_inner(&self) { let mut job = None; 'outer: loop { if let Some(job) = job.take() { self.execute(job); } // no more jobs, wait to be notified of a new job or a heartbeat. while job.is_none() { if self.context.should_exit() { // if the context is stopped, break out of the outer loop which // will exit the thread. break 'outer; } job = self.find_work_or_wait(); } } } } impl WorkerThread { /// Looks for work in the local queue, then in the shared context, and if no /// work is found, waits for the thread to be notified of a new job, after /// which it returns `None`. /// The caller should then check for `should_exit` to determine if the /// thread should exit, or look for work again. #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn find_work_or_wait(&self) -> Option { if let Some(job) = self.find_work() { return Some(job); } tracing::trace!("waiting for new job"); self.heartbeat.parker().park(); tracing::trace!("woken up from wait"); None } #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn find_work_or_wait_unless(&self, mut pred: F) -> Option where F: FnMut() -> bool, { if let Some(job) = self.find_work() { return Some(job); } // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // Check the predicate while holding the lock. This is very important, // because the lock must be held when notifying us of the result of a // job we scheduled. // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // no jobs found, wait for a heartbeat or a new job tracing::trace!(worker = self.heartbeat.index(), "waiting for new job"); if !pred() { self.heartbeat.parker().park(); } tracing::trace!(worker = self.heartbeat.index(), "woken up from wait"); None } #[inline] fn find_work(&self) -> Option { let mut guard = self.context.shared(); if let Some(job) = guard.pop_job() { #[cfg(feature = "metrics")] self.metrics.num_jobs_stolen.fetch_add(1, Ordering::Relaxed); tracing::trace!("WorkerThread::find_work_inner: found shared job: {:?}", job); return Some(job); } None } #[inline(always)] pub(crate) fn tick(&self) { if self.heartbeat.take() { #[cfg(feature = "metrics")] self.metrics.num_heartbeats.fetch_add(1, Ordering::Relaxed); tracing::trace!( "received heartbeat, thread id: {:?}", self.heartbeat.index() ); self.heartbeat_cold(); } } #[inline] #[tracing::instrument(level = "trace", skip(self))] fn execute(&self, job: SharedJob) { unsafe { SharedJob::execute(job, self) }; self.tick(); } #[cold] fn heartbeat_cold(&self) { let mut guard = self.context.shared(); if !guard.jobs.contains_key(&self.heartbeat.id()) { if let Some(job) = self.pop_back() { tracing::trace!("heartbeat: sharing job: {:?}", job); #[cfg(feature = "metrics")] self.metrics.num_jobs_shared.fetch_add(1, Ordering::Relaxed); unsafe { guard.jobs.insert( self.heartbeat.id(), job.as_ref().share(Some(self.heartbeat.parker())), ); // SAFETY: we are holding the lock on the shared context. self.context.notify_job_shared(); } } } } } impl WorkerThread { #[inline] pub fn pop_back(&self) -> Option> { unsafe { self.queue.as_mut_unchecked().pop_back() } } #[inline] pub fn push_back(&self, job: *const Job) { unsafe { self.queue.as_mut_unchecked().push_back(job.cast()) } } #[inline] pub fn push_front(&self, job: *const Job) { unsafe { self.queue.as_mut_unchecked().push_front(job.cast()) } } #[inline] pub fn pop_front(&self) -> Option> { unsafe { self.queue.as_mut_unchecked().pop_front() } } } impl WorkerThread { #[inline] pub fn current_ref<'a>() -> Option<&'a Self> { unsafe { (*WORKER.with(UnsafeCell::get)).map(|ptr| ptr.as_ref()) } } unsafe fn set_current(this: *const Self) { WORKER.with(|cell| { unsafe { // SAFETY: this cell is only ever accessed from the current thread assert!( (&mut *cell.get()) .replace(NonNull::new_unchecked( this as *const WorkerThread as *mut WorkerThread, )) .is_none() ); } }); } unsafe fn unset_current() { WORKER.with(|cell| { unsafe { // SAFETY: this cell is only ever accessed from the current thread (&mut *cell.get()).take(); } }); } unsafe fn drop_in_place(this: *mut Self) { unsafe { // SAFETY: this is only called when the thread is exiting, so we can // safely drop the thread. We use `drop_in_place` to prevent `Box` // from creating a no-alias reference to the worker thread. core::ptr::drop_in_place(this); _ = Box::>::from_raw(this as _); } } } pub struct HeartbeatThread { ctx: Arc, } impl HeartbeatThread { const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(100); pub fn new(ctx: Arc) -> Self { Self { ctx } } #[tracing::instrument(level = "trace", skip(self))] pub fn run(self, barrier: Arc) { tracing::trace!("new heartbeat thread {:?}", std::thread::current()); barrier.wait(); let mut i = 0; loop { let sleep_for = { if self.ctx.should_exit() { break; } self.ctx.heartbeats.notify_nth(i); let num_heartbeats = self.ctx.heartbeats.len(); if i >= num_heartbeats { i = 0; } else { i += 1; } Self::HEARTBEAT_INTERVAL.checked_div(num_heartbeats as u32) }; if let Some(duration) = sleep_for { std::thread::sleep(duration); } } } } impl WorkerThread { #[tracing::instrument(level = "trace", skip(self))] pub fn wait_until_shared_job(&self, job: &Job) -> Option> { let recv = (*job).take_receiver()?; let mut out = recv.poll(); while std::hint::unlikely(out.is_none()) { if let Some(job) = self.find_work() { unsafe { SharedJob::execute(job, self); } } out = recv.poll(); } out } #[tracing::instrument(level = "trace", skip_all)] pub fn wait_until_pred(&self, mut pred: F) where F: FnMut() -> bool, { if !pred() { tracing::trace!("thread {:?} waiting on predicate", self.heartbeat.index()); self.wait_until_latch_cold(pred); } } #[cold] fn wait_until_latch_cold(&self, mut pred: F) where F: FnMut() -> bool, { if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) { tracing::trace!( "thread {:?} reclaiming shared job: {:?}", self.heartbeat.index(), shared_job ); unsafe { SharedJob::execute(shared_job, self) }; } // do the usual thing and wait for the job's latch // do the usual thing??? chatgipity really said this.. while !pred() { // check local jobs before locking shared context if let Some(job) = self.find_work() { unsafe { SharedJob::execute(job, self); } } } } }