diff --git a/distaff/Cargo.toml b/distaff/Cargo.toml index 08bb343..9f05591 100644 --- a/distaff/Cargo.toml +++ b/distaff/Cargo.toml @@ -5,13 +5,14 @@ edition = "2024" [features] default = ["metrics"] +tracing = ["dep:tracing"] std = [] metrics = [] [dependencies] parking_lot = {version = "0.12.3"} atomic-wait = "1.1.0" -tracing = "0.1.40" +tracing = {version = "0.1", optional = true} parking_lot_core = "0.9.10" crossbeam-utils = "0.8.21" either = "1.15.0" @@ -19,6 +20,6 @@ either = "1.15.0" async-task = "4.7.1" [dev-dependencies] -tracing-test = "0.2.5" -tracing-tracy = "0.11.4" +tracing-test = {version = "0.2"} +tracing-tracy = {version = "0.11"} futures = "0.3" \ No newline at end of file diff --git a/distaff/src/channel.rs b/distaff/src/channel.rs index 082610b..4774284 100644 --- a/distaff/src/channel.rs +++ b/distaff/src/channel.rs @@ -38,6 +38,7 @@ impl Parker { self.mutex.load(Ordering::Acquire) == Self::PARKED } + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all, fields(this = self as *const Self as usize)))] pub fn park(&self) { if self.mutex.fetch_sub(1, Ordering::Acquire) == Self::NOTIFIED { // The thread was notified, so we can return immediately. @@ -60,6 +61,7 @@ impl Parker { } } + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all, fields(this = self as *const Self as usize)))] pub fn unpark(&self) { // write with Release ordering to ensure that any writes made by this // thread are made-available to the unparked thread. @@ -102,6 +104,7 @@ impl Receiver { self.0.state.load(Ordering::Acquire) != State::Ready as u8 } + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn wait(&self) { loop { match self.0.state.compare_exchange( @@ -146,6 +149,7 @@ impl Receiver { } } + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn poll(&self) -> Option> { if self .0 @@ -164,6 +168,7 @@ impl Receiver { } } + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn recv(self) -> thread::Result { self.wait(); @@ -194,6 +199,7 @@ impl Receiver { pub struct Sender(Arc>); impl Sender { + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn send(self, val: thread::Result) { // SAFETY: // Only this thread can write to `val` and none can read it diff --git a/distaff/src/context.rs b/distaff/src/context.rs index 40742d6..72a6d76 100644 --- a/distaff/src/context.rs +++ b/distaff/src/context.rs @@ -59,6 +59,7 @@ impl Context { } pub fn new_with_threads(num_threads: usize) -> Arc { + #[cfg(feature = "tracing")] tracing::trace!("Creating context with {} threads", num_threads); let this = Arc::new(Self { @@ -142,9 +143,11 @@ impl Context { .iter() .find(|(_, heartbeat)| heartbeat.is_waiting()) { + #[cfg(feature = "tracing")] tracing::trace!("Notifying worker thread {} about job sharing", i); sender.wake(); } else { + #[cfg(feature = "tracing")] tracing::warn!("No worker found to notify about job sharing"); } } @@ -206,29 +209,33 @@ impl Context { } /// Run closure in this context. - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn run_in_worker(self: &Arc, f: F) -> T where T: Send, F: FnOnce(&WorkerThread) -> T + Send, { let _guard = DropGuard::new(|| { + #[cfg(feature = "tracing")] tracing::trace!("run_in_worker: finished"); }); match WorkerThread::current_ref() { Some(worker) => { // check if worker is in the same context if Arc::ptr_eq(&worker.context, self) { + #[cfg(feature = "tracing")] tracing::trace!("run_in_worker: current thread"); f(worker) } else { // current thread is a worker for a different context + #[cfg(feature = "tracing")] tracing::trace!("run_in_worker: cross-context"); self.run_in_worker_cross(worker, f) } } None => { // current thread is not a worker for any context + #[cfg(feature = "tracing")] tracing::trace!("run_in_worker: inject into context"); self.run_in_worker_cold(f) } @@ -242,6 +249,7 @@ impl Context { F: FnOnce() + Send + 'static, { let job = Job::from_heapjob(Box::new(HeapJob::new(f))); + #[cfg(feature = "tracing")] tracing::trace!("Context::spawn: spawning job: {:?}", job); self.inject_job(job.share(None)); } @@ -297,12 +305,10 @@ where mod tests { use std::sync::atomic::AtomicU8; - use tracing_test::traced_test; - use super::*; #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn run_in_worker() { let ctx = Context::global_context().clone(); let result = ctx.run_in_worker(|_| 42); @@ -310,7 +316,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn context_spawn_future() { let ctx = Context::global_context().clone(); let task = ctx.spawn_future(async { 42 }); @@ -321,7 +327,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn context_spawn_async() { let ctx = Context::global_context().clone(); let task = ctx.spawn_async(|| async { 42 }); @@ -332,7 +338,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn context_spawn() { let ctx = Context::global_context().clone(); let counter = Arc::new(AtomicU8::new(0)); @@ -352,7 +358,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn inject_job_and_wake_worker() { let ctx = Context::new_with_threads(1); let counter = Arc::new(AtomicU8::new(0)); @@ -363,6 +369,7 @@ mod tests { { let counter = counter.clone(); move || { + #[cfg(feature = "tracing")] tracing::info!("Job running"); counter.fetch_add(1, Ordering::SeqCst); diff --git a/distaff/src/job.rs b/distaff/src/job.rs index 47fe44e..dc86272 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -514,6 +514,7 @@ impl Job { return JobResult::new(result); } else { // spin until lock is released. + #[cfg(feature = "tracing")] tracing::trace!("spin-waiting for job: {:?}", self); spin.spin(); } @@ -543,6 +544,7 @@ impl Job { Err(_) => { // debug_assert_ne!(state, JobState::Empty as usize); + #[cfg(feature = "tracing")] tracing::error!("######## what the sigma?"); spin.spin(); } @@ -551,6 +553,7 @@ impl Job { } pub fn execute(job: NonNull) { + #[cfg(feature = "tracing")] tracing::trace!("executing job: {:?}", job); // SAFETY: self is non-null @@ -655,6 +658,7 @@ mod stackjob { let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f())); + #[cfg(feature = "tracing")] tracing::trace!("stack job completed: {:?}", job); let job = unsafe { &*job.cast::>() }; job.complete(result); @@ -720,6 +724,7 @@ mod heapjob { ptr::drop_in_place(job); } + #[cfg(feature = "tracing")] tracing::trace!("heap job completed: {:?}", job); // free box that was allocated at (1) @@ -1018,12 +1023,14 @@ impl Job2 { receiver: Cell::new(None), }; + #[cfg(feature = "tracing")] tracing::trace!("new job: {:?}", this); this } pub fn share(&self, parker: Option<&Parker>) -> SharedJob { + #[cfg(feature = "tracing")] tracing::trace!("sharing job: {:?}", self); let (sender, receiver) = parker @@ -1048,7 +1055,10 @@ impl Job2 { F: FnOnce() -> T + Send, { #[align(8)] - #[tracing::instrument(level = "trace", skip_all, name = "stack_job_harness")] + #[cfg_attr( + feature = "tracing", + tracing::instrument(level = "trace", skip_all, name = "stack_job_harness") + )] unsafe fn harness( _worker: &WorkerThread, this: NonNull<()>, @@ -1082,7 +1092,10 @@ impl Job2 { F: FnOnce() -> T + Send, { #[align(8)] - #[tracing::instrument(level = "trace", skip_all, name = "heap_job_harness")] + #[cfg_attr( + feature = "tracing", + tracing::instrument(level = "trace", skip_all, name = "heap_job_harness") + )] unsafe fn harness(_worker: &WorkerThread, this: NonNull<()>, sender: Option) where F: FnOnce() -> T + Send, @@ -1121,6 +1134,7 @@ impl Job2 { impl SharedJob { pub unsafe fn execute(self, worker: &WorkerThread) { + #[cfg(feature = "tracing")] tracing::trace!("executing shared job: {:?}", self); let Self { diff --git a/distaff/src/join.rs b/distaff/src/join.rs index 5989ade..4e8e4ee 100644 --- a/distaff/src/join.rs +++ b/distaff/src/join.rs @@ -1,5 +1,6 @@ #[cfg(feature = "metrics")] use std::sync::atomic::Ordering; + use std::{hint::cold_path, sync::Arc}; use crate::{ @@ -11,15 +12,12 @@ use crate::{ impl WorkerThread { #[inline] - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] fn join_seq(&self, a: A, b: B) -> (RA, RB) where A: FnOnce() -> RA, B: FnOnce() -> RB, { - let span = tracing::trace_span!("join_seq"); - let _guard = span.enter(); - let rb = b(); let ra = a(); @@ -64,7 +62,7 @@ impl WorkerThread { /// This function must be called from a worker thread. #[inline] - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] fn join_heartbeat(&self, a: A, b: B) -> (RA, RB) where RA: Send, @@ -86,6 +84,7 @@ impl WorkerThread { let rb = match catch_unwind(AssertUnwindSafe(|| b())) { Ok(val) => val, Err(payload) => { + #[cfg(feature = "tracing")] tracing::debug!("join_heartbeat: b panicked, waiting for a to finish"); cold_path(); @@ -107,6 +106,7 @@ impl WorkerThread { match self.wait_until_recv(recv) { Some(t) => crate::util::unwrap_or_panic(t), None => { + #[cfg(feature = "tracing")] tracing::trace!( "join_heartbeat: job was shared, but reclaimed, running a() inline" ); @@ -120,6 +120,7 @@ impl WorkerThread { unsafe { // SAFETY: we just popped the job from the queue, so it is safe to unwrap. + #[cfg(feature = "tracing")] tracing::trace!("join_heartbeat: job was not shared, running a() inline"); a.unwrap()() } diff --git a/distaff/src/latch.rs b/distaff/src/latch.rs index dafea62..81d1bd1 100644 --- a/distaff/src/latch.rs +++ b/distaff/src/latch.rs @@ -229,6 +229,7 @@ impl Latch for CountLatch { unsafe fn set_raw(this: *const Self) { unsafe { if (&*this).count.fetch_sub(1, Ordering::Relaxed) == 1 { + #[cfg(feature = "tracing")] tracing::trace!("CountLatch set_raw: count was 1, setting inner latch"); // If the count was 1, we need to set the inner latch. let inner = (*this).inner.load(Ordering::Relaxed); @@ -343,12 +344,19 @@ impl WorkerLatch { } } - #[tracing::instrument(level = "trace", skip_all, fields( - this = self as *const Self as usize, - ))] + #[cfg_attr( + feature = "tracing", + tracing::instrument( + level = "trace", + skip_all, + fields(this = self as *const Self as usize) + ) + )] pub fn lock(&self) -> parking_lot::MutexGuard<'_, bool> { + #[cfg(feature = "tracing")] tracing::trace!("aquiring mutex.."); let guard = self.mutex.lock(); + #[cfg(feature = "tracing")] tracing::trace!("mutex acquired."); guard @@ -374,9 +382,11 @@ impl WorkerLatch { **guard = false; } - #[tracing::instrument(level = "trace", skip_all, fields( + #[cfg_attr( + feature = "tracing", + tracing::instrument(level = "trace", skip_all, fields( this = self as *const Self as usize, - ))] + )))] pub fn wait_unless(&self, mut f: F) where F: FnMut() -> bool, @@ -387,9 +397,11 @@ impl WorkerLatch { } } - #[tracing::instrument(level = "trace", skip_all, fields( - this = self as *const Self as usize, - ))] + #[cfg_attr( + feature = "tracing", + tracing::instrument(level = "trace", skip_all, fields( + this = self as *const Self as usize, + )))] pub fn wait_until(&self, mut f: F) -> T where F: FnMut() -> Option, @@ -407,11 +419,14 @@ impl WorkerLatch { *self.mutex.lock() } - #[tracing::instrument(level = "trace", skip_all, fields( + #[cfg_attr( + feature = "tracing", +tracing::instrument(level = "trace", skip_all, fields( this = self as *const Self as usize, - ))] + )))] fn notify(&self) { let n = self.condvar.notify_all(); + #[cfg(feature = "tracing")] tracing::trace!("WorkerLatch notify: notified {} threads", n); } @@ -424,8 +439,6 @@ impl WorkerLatch { mod tests { use std::{ptr, sync::Arc}; - use tracing_test::traced_test; - use super::*; #[test] @@ -491,7 +504,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn mutex_latch() { let latch = Arc::new(MutexLatch::new()); assert!(!latch.probe()); @@ -503,8 +516,10 @@ mod tests { // Test wait functionality let latch_clone = latch.clone(); let handle = std::thread::spawn(move || { + #[cfg(feature = "tracing")] tracing::info!("Thread waiting on latch"); latch_clone.wait_and_reset(); + #[cfg(feature = "tracing")] tracing::info!("Thread woke up from latch"); }); @@ -512,8 +527,10 @@ mod tests { std::thread::sleep(std::time::Duration::from_millis(100)); assert!(!latch.probe()); + #[cfg(feature = "tracing")] tracing::info!("Setting latch from main thread"); latch.set(); + #[cfg(feature = "tracing")] tracing::info!("Latch set, joining waiting thread"); handle.join().expect("Thread should join successfully"); } diff --git a/distaff/src/scope.rs b/distaff/src/scope.rs index beb5803..18e3ffc 100644 --- a/distaff/src/scope.rs +++ b/distaff/src/scope.rs @@ -87,11 +87,13 @@ where } impl<'scope, 'env> Scope<'scope, 'env> { - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] fn wait_for_jobs(&self, worker: &WorkerThread) { self.job_counter.set_inner(worker.heartbeat.parker()); if self.job_counter.count() > 0 { + #[cfg(feature = "tracing")] tracing::trace!("waiting for {} jobs to finish.", self.job_counter.count()); + #[cfg(feature = "tracing")] tracing::trace!( "thread id: {:?}, jobs: {:?}", worker.heartbeat.index(), @@ -104,7 +106,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { } /// should be called from within a worker thread. - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] fn complete(&self, worker: &WorkerThread, f: F) -> R where F: FnOnce() -> R + Send, @@ -128,7 +130,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { } /// resumes the panic if one happened in this scope. - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] fn maybe_propagate_panic(&self) { let err_ptr = self.panic.load(Ordering::Relaxed); if !err_ptr.is_null() { @@ -140,8 +142,9 @@ impl<'scope, 'env> Scope<'scope, 'env> { } /// stores the first panic that happened in this scope. - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] fn panicked(&self, err: Box) { + #[cfg(feature = "tracing")] tracing::debug!("panicked in scope, storing error: {:?}", err); self.panic.load(Ordering::Relaxed).is_null().then(|| { use core::mem::ManuallyDrop; @@ -292,13 +295,11 @@ impl<'scope, 'env> Scope<'scope, 'env> { mod tests { use std::sync::atomic::AtomicU8; - use tracing_test::traced_test; - use super::*; use crate::ThreadPool; #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn scope_spawn_sync() { let pool = ThreadPool::new_with_threads(1); let count = Arc::new(AtomicU8::new(0)); @@ -313,7 +314,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn scope_join_one() { let pool = ThreadPool::new_with_threads(1); @@ -326,7 +327,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn scope_join_many() { let pool = ThreadPool::new_with_threads(1); @@ -348,7 +349,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn scope_spawn_future() { let pool = ThreadPool::new_with_threads(1); let mut x = 0; @@ -364,7 +365,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn scope_spawn_many() { let pool = ThreadPool::new_with_threads(1); let count = Arc::new(AtomicU8::new(0)); diff --git a/distaff/src/threadpool.rs b/distaff/src/threadpool.rs index 8185750..31b16dd 100644 --- a/distaff/src/threadpool.rs +++ b/distaff/src/threadpool.rs @@ -53,17 +53,16 @@ impl ThreadPool { #[cfg(test)] mod tests { - use tracing_test::traced_test; - use super::*; #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn pool_spawn_borrow() { let pool = ThreadPool::new_with_threads(1); let mut x = 0; pool.scope(|scope| { scope.spawn(|_| { + #[cfg(feature = "tracing")] tracing::info!("Incrementing x"); x += 1; }); @@ -72,7 +71,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn pool_spawn_future() { let pool = ThreadPool::new_with_threads(1); let mut x = 0; @@ -89,7 +88,7 @@ mod tests { } #[test] - #[cfg_attr(not(miri), traced_test)] + #[cfg_attr(all(not(miri), feature = "tracing"), tracing_test::traced_test)] fn pool_join() { let pool = ThreadPool::new_with_threads(1); let (a, b) = pool.join(|| 3 + 4, || 5 * 6); diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index 79918b2..42fea7a 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -1,5 +1,6 @@ #[cfg(feature = "metrics")] use std::sync::atomic::Ordering; + use std::{ cell::{Cell, UnsafeCell}, ptr::NonNull, @@ -22,6 +23,7 @@ pub struct WorkerThread { pub(crate) queue: UnsafeCell, pub(crate) heartbeat: OwnedHeartbeatReceiver, pub(crate) join_count: Cell, + #[cfg(feature = "metrics")] pub(crate) metrics: CachePadded, } @@ -46,9 +48,9 @@ impl WorkerThread { } impl WorkerThread { - #[tracing::instrument(level = "trace", skip_all, fields( + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all, fields( worker = self.heartbeat.index(), - ))] + )))] pub fn run(self: Box, barrier: Arc) { let this = Box::into_raw(self); unsafe { @@ -61,6 +63,7 @@ impl WorkerThread { Self::drop_in_place(this); }); + #[cfg(feature = "tracing")] tracing::trace!("WorkerThread::run: starting worker thread"); barrier.wait(); @@ -73,10 +76,11 @@ impl WorkerThread { eprintln!("{:?}", (&*this).metrics); } + #[cfg(feature = "tracing")] tracing::trace!("WorkerThread::run: worker thread finished"); } - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] fn run_inner(&self) { let mut job = None; 'outer: loop { @@ -104,19 +108,21 @@ impl WorkerThread { /// which it returns `None`. /// The caller should then check for `should_exit` to determine if the /// thread should exit, or look for work again. - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub(crate) fn find_work_or_wait(&self) -> Option { if let Some(job) = self.find_work() { return Some(job); } + #[cfg(feature = "tracing")] tracing::trace!("waiting for new job"); self.heartbeat.parker().park(); + #[cfg(feature = "tracing")] tracing::trace!("woken up from wait"); None } - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub(crate) fn find_work_or_wait_unless(&self, mut pred: F) -> Option where F: FnMut() -> bool, @@ -130,10 +136,12 @@ impl WorkerThread { // job we scheduled. // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // no jobs found, wait for a heartbeat or a new job + #[cfg(feature = "tracing")] tracing::trace!(worker = self.heartbeat.index(), "waiting for new job"); if !pred() { self.heartbeat.parker().park(); } + #[cfg(feature = "tracing")] tracing::trace!(worker = self.heartbeat.index(), "woken up from wait"); None @@ -146,6 +154,7 @@ impl WorkerThread { if let Some(job) = guard.pop_job() { #[cfg(feature = "metrics")] self.metrics.num_jobs_stolen.fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "tracing")] tracing::trace!("WorkerThread::find_work_inner: found shared job: {:?}", job); return Some(job); } @@ -158,6 +167,7 @@ impl WorkerThread { if self.heartbeat.take() { #[cfg(feature = "metrics")] self.metrics.num_heartbeats.fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "tracing")] tracing::trace!( "received heartbeat, thread id: {:?}", self.heartbeat.index() @@ -167,7 +177,7 @@ impl WorkerThread { } #[inline] - #[tracing::instrument(level = "trace", skip(self))] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] fn execute(&self, job: SharedJob) { unsafe { SharedJob::execute(job, self) }; self.tick(); @@ -179,6 +189,7 @@ impl WorkerThread { if !guard.jobs.contains_key(&self.heartbeat.id()) { if let Some(job) = self.pop_back() { + #[cfg(feature = "tracing")] tracing::trace!("heartbeat: sharing job: {:?}", job); #[cfg(feature = "metrics")] @@ -270,8 +281,9 @@ impl HeartbeatThread { Self { ctx } } - #[tracing::instrument(level = "trace", skip(self))] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub fn run(self, barrier: Arc) { + #[cfg(feature = "tracing")] tracing::trace!("new heartbeat thread {:?}", std::thread::current()); barrier.wait(); @@ -302,7 +314,7 @@ impl HeartbeatThread { } impl WorkerThread { - #[tracing::instrument(level = "trace", skip(self))] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub fn wait_until_shared_job(&self, job: &Job) -> Option> { let recv = (*job).take_receiver().unwrap(); @@ -321,7 +333,7 @@ impl WorkerThread { out } - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn wait_until_recv(&self, recv: Receiver) -> Option> { if self .context @@ -330,6 +342,7 @@ impl WorkerThread { .remove(&self.heartbeat.id()) .is_some() { + #[cfg(feature = "tracing")] tracing::trace!("reclaiming shared job"); return None; } @@ -347,12 +360,13 @@ impl WorkerThread { Some(recv.recv()) } - #[tracing::instrument(level = "trace", skip_all)] + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn wait_until_pred(&self, mut pred: F) where F: FnMut() -> bool, { if !pred() { + #[cfg(feature = "tracing")] tracing::trace!("thread {:?} waiting on predicate", self.heartbeat.index()); self.wait_until_latch_cold(pred); } @@ -364,6 +378,7 @@ impl WorkerThread { F: FnMut() -> bool, { if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) { + #[cfg(feature = "tracing")] tracing::trace!( "thread {:?} reclaiming shared job: {:?}", self.heartbeat.index(),