This commit is contained in:
Janis 2025-06-24 19:12:27 +02:00
parent ed4acbfbd7
commit a3b9222ed9
5 changed files with 104 additions and 42 deletions

View file

@ -14,7 +14,7 @@ use parking_lot::{Condvar, Mutex};
use crate::{ use crate::{
job::{HeapJob, Job, StackJob}, job::{HeapJob, Job, StackJob},
latch::{LatchRef, MutexLatch, WakeLatch}, latch::{LatchRef, MutexLatch, UnsafeWakeLatch},
workerthread::{HeartbeatThread, WorkerThread}, workerthread::{HeartbeatThread, WorkerThread},
}; };
@ -196,7 +196,8 @@ impl Context {
{ {
// current thread is not in the same context, create a job and inject it into the other thread's context, then wait while working on our jobs. // current thread is not in the same context, create a job and inject it into the other thread's context, then wait while working on our jobs.
let latch = WakeLatch::new(self.clone(), worker.index); // SAFETY: we are waiting on this latch in this thread.
let latch = unsafe { UnsafeWakeLatch::new(&raw const worker.heartbeat.latch) };
let job = StackJob::new( let job = StackJob::new(
move || { move || {

View file

@ -1,9 +1,10 @@
use std::{hint::cold_path, sync::Arc}; use std::{hint::cold_path, ptr::NonNull, sync::Arc};
use crate::{ use crate::{
context::Context, context::Context,
job::{JobState, StackJob}, job::{JobState, StackJob},
latch::{AsCoreLatch, LatchRef, WakeLatch}, latch::{AsCoreLatch, LatchRef, UnsafeWakeLatch, WakeLatch},
util::SendPtr,
workerthread::WorkerThread, workerthread::WorkerThread,
}; };
@ -62,7 +63,10 @@ impl WorkerThread {
{ {
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind}; use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
let latch = WakeLatch::new(self.context.clone(), self.index); // SAFETY: this thread's heartbeat latch is valid until the job sets it
// because we will be waiting on it.
let latch = unsafe { UnsafeWakeLatch::new(&raw const (*self.heartbeat).latch) };
let a = StackJob::new( let a = StackJob::new(
move || { move || {
// TODO: bench whether tick'ing here is good. // TODO: bench whether tick'ing here is good.

View file

@ -6,7 +6,7 @@ use std::sync::{Arc, atomic::AtomicU8};
use parking_lot::{Condvar, Mutex}; use parking_lot::{Condvar, Mutex};
use crate::context::Context; use crate::{WorkerThread, context::Context};
pub trait Latch { pub trait Latch {
unsafe fn set_raw(this: *const Self); unsafe fn set_raw(this: *const Self);
@ -325,17 +325,16 @@ impl Probe for MutexLatch {
} }
} }
/// Must only be `set` from a worker thread.
pub struct WakeLatch { pub struct WakeLatch {
inner: AtomicLatch, inner: AtomicLatch,
context: Arc<Context>,
worker_index: AtomicUsize, worker_index: AtomicUsize,
} }
impl WakeLatch { impl WakeLatch {
pub fn new(context: Arc<Context>, worker_index: usize) -> Self { pub fn new(worker_index: usize) -> Self {
Self { Self {
inner: AtomicLatch::new(), inner: AtomicLatch::new(),
context,
worker_index: AtomicUsize::new(worker_index), worker_index: AtomicUsize::new(worker_index),
} }
} }
@ -349,10 +348,10 @@ impl Latch for WakeLatch {
#[inline] #[inline]
unsafe fn set_raw(this: *const Self) { unsafe fn set_raw(this: *const Self) {
unsafe { unsafe {
let ctx = (&*this).context.clone();
let worker_index = (&*this).worker_index.load(Ordering::Relaxed); let worker_index = (&*this).worker_index.load(Ordering::Relaxed);
if CoreLatch::set(&(&*this).inner) { if CoreLatch::set(&(&*this).inner) {
let ctx = WorkerThread::current_ref().unwrap().context.clone();
// If the latch was sleeping, wake the worker thread // If the latch was sleeping, wake the worker thread
ctx.shared().heartbeats.get(&worker_index).and_then(|weak| { ctx.shared().heartbeats.get(&worker_index).and_then(|weak| {
weak.upgrade() weak.upgrade()
@ -377,6 +376,48 @@ impl AsCoreLatch for WakeLatch {
} }
} }
pub struct UnsafeWakeLatch {
inner: AtomicLatch,
waker: *const MutexLatch,
}
impl UnsafeWakeLatch {
/// # Safety
/// The `waker` must be valid until the latch is set.
pub unsafe fn new(waker: *const MutexLatch) -> Self {
Self {
inner: AtomicLatch::new(),
waker,
}
}
}
impl Latch for UnsafeWakeLatch {
#[inline]
unsafe fn set_raw(this: *const Self) {
unsafe {
let waker = (*this).waker;
if CoreLatch::set(&(&*this).inner) {
Latch::set_raw(waker);
}
}
}
}
impl Probe for UnsafeWakeLatch {
#[inline]
fn probe(&self) -> bool {
self.inner.probe()
}
}
impl AsCoreLatch for UnsafeWakeLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
&self.inner
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Barrier; use std::sync::Barrier;
@ -505,17 +546,18 @@ mod tests {
let barrier = Arc::new(Barrier::new(2)); let barrier = Arc::new(Barrier::new(2));
tracing::info!("running scope in worker thread"); tracing::info!("running scope in worker thread");
let latch = context.run_in_worker(|worker| { context.run_in_worker(|worker| {
tracing::info!("worker thread started: {:?}", worker.index); tracing::info!("worker thread started: {:?}", worker.index);
let latch = WakeLatch::new(worker.context.clone(), worker.index); let latch = Arc::new(WakeLatch::new(worker.index));
worker.context.spawn({ worker.context.spawn({
let heartbeat = worker.heartbeat.clone(); let heartbeat = worker.heartbeat.clone();
let barrier = barrier.clone(); let barrier = barrier.clone();
let count = count.clone(); let count = count.clone();
// set sleeping outside of the closure so we don't have to deal with lifetimes let latch = latch.clone();
latch.as_core_latch().set_sleeping();
move || { move || {
tracing::info!("sleeping workerthread"); tracing::info!("sleeping workerthread");
latch.as_core_latch().set_sleeping();
heartbeat.latch.wait_and_reset(); heartbeat.latch.wait_and_reset();
tracing::info!("woken up workerthread"); tracing::info!("woken up workerthread");
count.fetch_add(1, Ordering::SeqCst); count.fetch_add(1, Ordering::SeqCst);
@ -524,13 +566,15 @@ mod tests {
} }
}); });
latch worker.context.spawn({
}); move || {
tracing::info!("setting latch in worker thread");
tracing::info!("setting latch in main thread");
unsafe { unsafe {
Latch::set_raw(&latch); Latch::set_raw(&*latch);
} }
}
});
});
tracing::info!("main thread set latch, waiting for worker thread to wake up"); tracing::info!("main thread set latch, waiting for worker thread to wake up");
barrier.wait(); barrier.wait();

View file

@ -256,10 +256,10 @@ impl<'scope, 'env> Scope<'scope, 'env> {
) )
} }
unsafe fn from_context(ctx: Arc<Context>) -> Self { unsafe fn from_context(context: Arc<Context>) -> Self {
Self { Self {
context: ctx.clone(), context,
job_counter: CountLatch::new(WakeLatch::new(ctx, 0)), job_counter: CountLatch::new(WakeLatch::new(0)),
panic: AtomicPtr::new(ptr::null_mut()), panic: AtomicPtr::new(ptr::null_mut()),
_scope: PhantomData, _scope: PhantomData,
_env: PhantomData, _env: PhantomData,

View file

@ -9,7 +9,7 @@ use crossbeam_utils::CachePadded;
use crate::{ use crate::{
context::{Context, Heartbeat}, context::{Context, Heartbeat},
job::{Job, JobResult, JobVec as JobList}, job::{Job, JobList, JobResult},
latch::{AsCoreLatch, CoreLatch, Probe}, latch::{AsCoreLatch, CoreLatch, Probe},
util::DropGuard, util::DropGuard,
}; };
@ -70,30 +70,43 @@ impl WorkerThread {
self.execute(job); self.execute(job);
} }
// we executed the shared job, now we want to check for any
// local jobs which this job might have spawned.
let next = self
.pop_front()
.map(|job| (Some(job), None))
.unwrap_or_else(|| {
let mut guard = self.context.shared(); let mut guard = self.context.shared();
(guard.pop_job(), Some(guard))
});
match next {
// no job, but guard => check if we should exit
(None, Some(guard)) => {
tracing::trace!("worker: no local job, waiting for shared job");
if guard.should_exit() { if guard.should_exit() {
// if the context is stopped, break out of the outer loop which // if the context is stopped, break out of the outer loop which
// will exit the thread. // will exit the thread.
break 'outer; break 'outer;
} }
// TODO: also check the local queue? // no local jobs, wait for shared job
match guard.pop_job() {
Some(popped) => {
tracing::trace!("worker: popping job: {:?}", popped);
job = Some(popped);
// found job, continue inner loop
continue;
}
None => {
tracing::trace!("worker: no job, waiting for shared job");
// no more jobs, break out of inner loop and wait for shared job
break guard; break guard;
} }
// some job => drop guard, continue inner loop
(Some(next), _) => {
tracing::trace!("worker: executing job: {:?}", next);
job = Some(next);
continue;
}
// no job, no guard ought to be unreachable.
_ => unreachable!(),
} }
}; };
self.context.shared_job.wait(&mut guard); self.context.shared_job.wait(&mut guard);
// a job was shared and we were notified, so we want to execute that job before any possible local jobs.
job = guard.pop_job(); job = guard.pop_job();
} }
} }
@ -101,7 +114,7 @@ impl WorkerThread {
impl WorkerThread { impl WorkerThread {
#[inline(always)] #[inline(always)]
fn tick(&self) { pub(crate) fn tick(&self) {
if self.heartbeat.is_pending() { if self.heartbeat.is_pending() {
tracing::trace!("received heartbeat, thread id: {:?}", self.index); tracing::trace!("received heartbeat, thread id: {:?}", self.index);
self.heartbeat_cold(); self.heartbeat_cold();