diff --git a/distaff/Cargo.toml b/distaff/Cargo.toml index 6398fe3..eb54ac8 100644 --- a/distaff/Cargo.toml +++ b/distaff/Cargo.toml @@ -18,4 +18,5 @@ async-task = "4.7.1" [dev-dependencies] tracing-test = "0.2.5" +tracing-tracy = "0.11.4" futures = "0.3" \ No newline at end of file diff --git a/distaff/src/context.rs b/distaff/src/context.rs index fc2b9ed..bfae9e2 100644 --- a/distaff/src/context.rs +++ b/distaff/src/context.rs @@ -130,6 +130,7 @@ impl Context { pub fn set_should_exit(&self) { self.should_exit.store(true, Ordering::Relaxed); + self.heartbeats.notify_all(); } pub fn should_exit(&self) -> bool { @@ -227,6 +228,7 @@ impl Context { } /// Run closure in this context. + #[tracing::instrument(level = "trace", skip_all)] pub fn run_in_worker(self: &Arc, f: F) -> T where T: Send, diff --git a/distaff/src/job.rs b/distaff/src/job.rs index a0056b5..70d3d26 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -1052,6 +1052,7 @@ const FINISHED: usize = 1 << 0; const ERROR: usize = 1 << 1; impl JobSender { + #[tracing::instrument(level = "trace", skip_all)] pub fn send(&self, result: std::thread::Result, mutex: *const WorkerLatch) { // We want to lock here so that we can be sure that we wake the worker // only if it was waiting, and not immediately after having received the @@ -1125,6 +1126,12 @@ impl JobSender { } impl JobReceiver { + #[tracing::instrument(level = "trace", skip_all)] + pub fn is_finished(&self) -> bool { + self.channel.tag.tag(Ordering::Acquire) & FINISHED != 0 + } + + #[tracing::instrument(level = "trace", skip_all)] pub fn poll(&self) -> Option> { let tag = self.channel.tag.take_tag(Ordering::Acquire); @@ -1169,6 +1176,7 @@ impl QueuedJob { T: Send, { #[align(8)] + #[tracing::instrument(level = "trace", skip_all, name = "stack_job_harness")] unsafe fn harness( this: *const (), sender: *const JobSender, @@ -1200,6 +1208,7 @@ impl QueuedJob { T: Send, { #[align(8)] + #[tracing::instrument(level = "trace", skip_all, name = "heap_job_harness")] unsafe fn harness( this: *const (), sender: *const JobSender, @@ -1258,6 +1267,7 @@ impl QueuedJob { } /// this function will drop `_self` and execute the job. + #[tracing::instrument(level = "trace", skip_all)] pub unsafe fn execute(_self: *mut Self) { let (harness, this, sender, mutex) = unsafe { let job = &*_self; diff --git a/distaff/src/join.rs b/distaff/src/join.rs index 542f4f9..29f6a78 100644 --- a/distaff/src/join.rs +++ b/distaff/src/join.rs @@ -9,13 +9,15 @@ use crate::{ impl WorkerThread { #[inline] + #[tracing::instrument(level = "trace", skip_all)] fn join_seq(&self, a: A, b: B) -> (RA, RB) where - RA: Send, - RB: Send, - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, + A: FnOnce() -> RA, + B: FnOnce() -> RB, { + let span = tracing::trace_span!("join_seq"); + let _guard = span.enter(); + let rb = b(); let ra = a(); @@ -24,6 +26,7 @@ impl WorkerThread { /// This function must be called from a worker thread. #[inline] + #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn join_heartbeat_every( &self, a: A, @@ -31,20 +34,24 @@ impl WorkerThread { ) -> (RA, RB) where RA: Send, - RB: Send, A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, + B: FnOnce() -> RB, { // SAFETY: each worker is only ever used by one thread, so this is safe. let count = self.join_count.get(); + let queue_len = unsafe { self.queue.as_ref_unchecked().len() }; self.join_count.set(count.wrapping_add(1) % TIMES as u8); // TODO: add counter to job queue, check for low job count to decide whether to use heartbeat or seq. // see: chili // SAFETY: this function runs in a worker thread, so we can access the queue safely. - if count == 0 || unsafe { self.queue.as_ref_unchecked().len() } < 3 { + if count == 0 || queue_len < 3 { cold_path(); + tracing::trace!( + queue_len = queue_len, + "join_heartbeat_every: using heartbeat join", + ); self.join_heartbeat(a, b) } else { self.join_seq(a, b) @@ -53,12 +60,12 @@ impl WorkerThread { /// This function must be called from a worker thread. #[inline] + #[tracing::instrument(level = "trace", skip_all)] fn join_heartbeat(&self, a: A, b: B) -> (RA, RB) where RA: Send, - RB: Send, A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, + B: FnOnce() -> RB, { use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind}; @@ -72,8 +79,8 @@ impl WorkerThread { let rb = match catch_unwind(AssertUnwindSafe(|| b())) { Ok(val) => val, Err(payload) => { - cold_path(); tracing::debug!("join_heartbeat: b panicked, waiting for a to finish"); + cold_path(); // if b panicked, we need to wait for a to finish self.wait_until_latch(&job); resume_unwind(payload); @@ -109,10 +116,10 @@ impl Context { #[inline] pub fn join(self: &Arc, a: A, b: B) -> (RA, RB) where - RA: Send, - RB: Send, A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, { // SAFETY: join_heartbeat_every is safe to call from a worker thread. self.run_in_worker(move |worker| worker.join_heartbeat_every::<_, _, _, _, 64>(a, b)) @@ -123,10 +130,10 @@ impl Context { #[allow(dead_code)] pub fn join(a: A, b: B) -> (RA, RB) where - RA: Send, - RB: Send, A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, { join_in(Context::global_context().clone(), a, b) } @@ -135,10 +142,10 @@ where #[allow(dead_code)] fn join_in(context: Arc, a: A, b: B) -> (RA, RB) where - RA: Send, - RB: Send, A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, { context.join(a, b) } diff --git a/distaff/src/latch.rs b/distaff/src/latch.rs index bd39273..17f843a 100644 --- a/distaff/src/latch.rs +++ b/distaff/src/latch.rs @@ -337,6 +337,7 @@ impl AsCoreLatch for MutexLatch { } // The worker waits on this latch whenever it has nothing to do. +#[derive(Debug)] pub struct WorkerLatch { // this boolean is set when the worker is waiting. mutex: Mutex, @@ -468,6 +469,7 @@ impl WorkerLatch { *guard = false; // reset the mutex to false after waking up } + #[tracing::instrument(level = "trace", skip(other))] pub fn wait_with_lock(&self, other: &mut parking_lot::MutexGuard<'_, T>) { self.wait_with_lock_internal(other); } @@ -481,6 +483,7 @@ impl WorkerLatch { } } + #[tracing::instrument(level = "trace", skip(f))] pub fn wait_until(&self, mut f: F) -> T where F: FnMut() -> Option, @@ -498,6 +501,7 @@ impl WorkerLatch { *self.mutex.lock() } + #[tracing::instrument(level = "trace")] fn notify(&self) { let key = &self.condvar as *const _ as usize; diff --git a/distaff/src/scope.rs b/distaff/src/scope.rs index 025e322..6195f74 100644 --- a/distaff/src/scope.rs +++ b/distaff/src/scope.rs @@ -86,6 +86,7 @@ where } impl<'scope, 'env> Scope<'scope, 'env> { + #[tracing::instrument(level = "trace", skip_all)] fn wait_for_jobs(&self, worker: &WorkerThread) { self.job_counter.set_inner(worker.heartbeat.raw_latch()); if self.job_counter.count() > 0 { @@ -102,6 +103,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { } /// should be called from within a worker thread. + #[tracing::instrument(level = "trace", skip_all)] fn complete(&self, worker: &WorkerThread, f: F) -> R where F: FnOnce() -> R + Send, @@ -125,6 +127,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { } /// resumes the panic if one happened in this scope. + #[tracing::instrument(level = "trace", skip_all)] fn maybe_propagate_panic(&self) { let err_ptr = self.panic.load(Ordering::Relaxed); if !err_ptr.is_null() { @@ -136,6 +139,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { } /// stores the first panic that happened in this scope. + #[tracing::instrument(level = "trace", skip_all)] fn panicked(&self, err: Box) { tracing::debug!("panicked in scope, storing error: {:?}", err); self.panic.load(Ordering::Relaxed).is_null().then(|| { @@ -349,8 +353,8 @@ mod tests { } pool.scope(|scope| { - let total = sum(scope, 5); - assert_eq!(total, 31); + let total = sum(scope, 10); + assert_eq!(total, 1023); // eprintln!("Total sum: {}", total); }); } diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index ea412f9..b709a03 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -41,6 +41,7 @@ impl WorkerThread { } impl WorkerThread { + #[tracing::instrument(level = "trace", skip_all)] pub fn run(self: Box) { let this = Box::into_raw(self); unsafe { @@ -62,6 +63,7 @@ impl WorkerThread { tracing::trace!("WorkerThread::run: worker thread finished"); } + #[tracing::instrument(level = "trace", skip_all)] fn run_inner(&self) { let mut job = None; 'outer: loop { @@ -91,6 +93,7 @@ impl WorkerThread { /// which it returns `None`. /// The caller should then check for `should_exit` to determine if the /// thread should exit, or look for work again. + #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn find_work_or_wait(&self) -> Option> { match self.find_work_inner() { either::Either::Left(job) => { @@ -139,6 +142,7 @@ impl WorkerThread { } #[inline] + #[tracing::instrument(level = "trace", skip(self))] fn execute(&self, job: NonNull) { self.tick(); unsafe { Job::execute(job.as_ptr()) }; @@ -236,6 +240,7 @@ impl HeartbeatThread { Self { ctx } } + #[tracing::instrument(level = "trace", skip(self))] pub fn run(self) { tracing::trace!("new heartbeat thread {:?}", std::thread::current()); @@ -266,6 +271,7 @@ impl HeartbeatThread { } impl WorkerThread { + #[tracing::instrument(level = "trace", skip(self))] pub fn wait_until_queued_job( &self, job: *const QueuedJob, @@ -273,19 +279,31 @@ impl WorkerThread { let recv = unsafe { (*job).as_receiver::() }; // we've already checked that the job was popped from the queue // check if shared job is our job - if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) { - if core::ptr::eq(shared_job.as_ptr(), job as *const Job as _) { - // this is the job we are looking for, so we want to - // short-circuit and call it inline - return None; - } else { - // this isn't the job we are looking for, but we still need to - // execute it - unsafe { Job::execute(shared_job.as_ptr()) }; - } - } - // do the usual thing and wait for the job's latch + // if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) { + // if core::ptr::eq(shared_job.as_ptr(), job as *const Job as _) { + // // this is the job we are looking for, so we want to + // // short-circuit and call it inline + // tracing::trace!( + // thread = self.heartbeat.index(), + // "reclaiming shared job: {:?}", + // shared_job + // ); + + // return None; + // } else { + // // this isn't the job we are looking for, but we still need to + // // execute it + // tracing::trace!( + // thread = self.heartbeat.index(), + // "executing reclaimed shared job: {:?}", + // shared_job + // ); + + // unsafe { Job::execute(shared_job.as_ptr()) }; + // } + // } + loop { match recv.poll() { Some(t) => { @@ -316,6 +334,7 @@ impl WorkerThread { } } + #[tracing::instrument(level = "trace", skip_all)] pub fn wait_until_latch(&self, latch: &L) where L: Probe,