tracing-tracy/instrumentation

This commit is contained in:
Janis 2025-06-28 01:24:22 +02:00
parent 5fae03dc06
commit a1e1c90f90
7 changed files with 77 additions and 30 deletions

View file

@ -18,4 +18,5 @@ async-task = "4.7.1"
[dev-dependencies]
tracing-test = "0.2.5"
tracing-tracy = "0.11.4"
futures = "0.3"

View file

@ -130,6 +130,7 @@ impl Context {
pub fn set_should_exit(&self) {
self.should_exit.store(true, Ordering::Relaxed);
self.heartbeats.notify_all();
}
pub fn should_exit(&self) -> bool {
@ -227,6 +228,7 @@ impl Context {
}
/// Run closure in this context.
#[tracing::instrument(level = "trace", skip_all)]
pub fn run_in_worker<T, F>(self: &Arc<Self>, f: F) -> T
where
T: Send,

View file

@ -1052,6 +1052,7 @@ const FINISHED: usize = 1 << 0;
const ERROR: usize = 1 << 1;
impl<T> JobSender<T> {
#[tracing::instrument(level = "trace", skip_all)]
pub fn send(&self, result: std::thread::Result<T>, mutex: *const WorkerLatch) {
// We want to lock here so that we can be sure that we wake the worker
// only if it was waiting, and not immediately after having received the
@ -1125,6 +1126,12 @@ impl<T> JobSender<T> {
}
impl<T> JobReceiver<T> {
#[tracing::instrument(level = "trace", skip_all)]
pub fn is_finished(&self) -> bool {
self.channel.tag.tag(Ordering::Acquire) & FINISHED != 0
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn poll(&self) -> Option<std::thread::Result<T>> {
let tag = self.channel.tag.take_tag(Ordering::Acquire);
@ -1169,6 +1176,7 @@ impl QueuedJob {
T: Send,
{
#[align(8)]
#[tracing::instrument(level = "trace", skip_all, name = "stack_job_harness")]
unsafe fn harness<F, T, L>(
this: *const (),
sender: *const JobSender,
@ -1200,6 +1208,7 @@ impl QueuedJob {
T: Send,
{
#[align(8)]
#[tracing::instrument(level = "trace", skip_all, name = "heap_job_harness")]
unsafe fn harness<F, T>(
this: *const (),
sender: *const JobSender,
@ -1258,6 +1267,7 @@ impl QueuedJob {
}
/// this function will drop `_self` and execute the job.
#[tracing::instrument(level = "trace", skip_all)]
pub unsafe fn execute(_self: *mut Self) {
let (harness, this, sender, mutex) = unsafe {
let job = &*_self;

View file

@ -9,13 +9,15 @@ use crate::{
impl WorkerThread {
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
fn join_seq<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
where
RA: Send,
RB: Send,
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
A: FnOnce() -> RA,
B: FnOnce() -> RB,
{
let span = tracing::trace_span!("join_seq");
let _guard = span.enter();
let rb = b();
let ra = a();
@ -24,6 +26,7 @@ impl WorkerThread {
/// This function must be called from a worker thread.
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn join_heartbeat_every<A, B, RA, RB, const TIMES: usize>(
&self,
a: A,
@ -31,20 +34,24 @@ impl WorkerThread {
) -> (RA, RB)
where
RA: Send,
RB: Send,
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
B: FnOnce() -> RB,
{
// SAFETY: each worker is only ever used by one thread, so this is safe.
let count = self.join_count.get();
let queue_len = unsafe { self.queue.as_ref_unchecked().len() };
self.join_count.set(count.wrapping_add(1) % TIMES as u8);
// TODO: add counter to job queue, check for low job count to decide whether to use heartbeat or seq.
// see: chili
// SAFETY: this function runs in a worker thread, so we can access the queue safely.
if count == 0 || unsafe { self.queue.as_ref_unchecked().len() } < 3 {
if count == 0 || queue_len < 3 {
cold_path();
tracing::trace!(
queue_len = queue_len,
"join_heartbeat_every: using heartbeat join",
);
self.join_heartbeat(a, b)
} else {
self.join_seq(a, b)
@ -53,12 +60,12 @@ impl WorkerThread {
/// This function must be called from a worker thread.
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
fn join_heartbeat<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
where
RA: Send,
RB: Send,
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
B: FnOnce() -> RB,
{
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
@ -72,8 +79,8 @@ impl WorkerThread {
let rb = match catch_unwind(AssertUnwindSafe(|| b())) {
Ok(val) => val,
Err(payload) => {
cold_path();
tracing::debug!("join_heartbeat: b panicked, waiting for a to finish");
cold_path();
// if b panicked, we need to wait for a to finish
self.wait_until_latch(&job);
resume_unwind(payload);
@ -109,10 +116,10 @@ impl Context {
#[inline]
pub fn join<A, B, RA, RB>(self: &Arc<Self>, a: A, b: B) -> (RA, RB)
where
RA: Send,
RB: Send,
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
// SAFETY: join_heartbeat_every is safe to call from a worker thread.
self.run_in_worker(move |worker| worker.join_heartbeat_every::<_, _, _, _, 64>(a, b))
@ -123,10 +130,10 @@ impl Context {
#[allow(dead_code)]
pub fn join<A, B, RA, RB>(a: A, b: B) -> (RA, RB)
where
RA: Send,
RB: Send,
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
join_in(Context::global_context().clone(), a, b)
}
@ -135,10 +142,10 @@ where
#[allow(dead_code)]
fn join_in<A, B, RA, RB>(context: Arc<Context>, a: A, b: B) -> (RA, RB)
where
RA: Send,
RB: Send,
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
context.join(a, b)
}

View file

@ -337,6 +337,7 @@ impl AsCoreLatch for MutexLatch {
}
// The worker waits on this latch whenever it has nothing to do.
#[derive(Debug)]
pub struct WorkerLatch {
// this boolean is set when the worker is waiting.
mutex: Mutex<bool>,
@ -468,6 +469,7 @@ impl WorkerLatch {
*guard = false; // reset the mutex to false after waking up
}
#[tracing::instrument(level = "trace", skip(other))]
pub fn wait_with_lock<T>(&self, other: &mut parking_lot::MutexGuard<'_, T>) {
self.wait_with_lock_internal(other);
}
@ -481,6 +483,7 @@ impl WorkerLatch {
}
}
#[tracing::instrument(level = "trace", skip(f))]
pub fn wait_until<F, T>(&self, mut f: F) -> T
where
F: FnMut() -> Option<T>,
@ -498,6 +501,7 @@ impl WorkerLatch {
*self.mutex.lock()
}
#[tracing::instrument(level = "trace")]
fn notify(&self) {
let key = &self.condvar as *const _ as usize;

View file

@ -86,6 +86,7 @@ where
}
impl<'scope, 'env> Scope<'scope, 'env> {
#[tracing::instrument(level = "trace", skip_all)]
fn wait_for_jobs(&self, worker: &WorkerThread) {
self.job_counter.set_inner(worker.heartbeat.raw_latch());
if self.job_counter.count() > 0 {
@ -102,6 +103,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
}
/// should be called from within a worker thread.
#[tracing::instrument(level = "trace", skip_all)]
fn complete<F, R>(&self, worker: &WorkerThread, f: F) -> R
where
F: FnOnce() -> R + Send,
@ -125,6 +127,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
}
/// resumes the panic if one happened in this scope.
#[tracing::instrument(level = "trace", skip_all)]
fn maybe_propagate_panic(&self) {
let err_ptr = self.panic.load(Ordering::Relaxed);
if !err_ptr.is_null() {
@ -136,6 +139,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
}
/// stores the first panic that happened in this scope.
#[tracing::instrument(level = "trace", skip_all)]
fn panicked(&self, err: Box<dyn Any + Send + 'static>) {
tracing::debug!("panicked in scope, storing error: {:?}", err);
self.panic.load(Ordering::Relaxed).is_null().then(|| {
@ -349,8 +353,8 @@ mod tests {
}
pool.scope(|scope| {
let total = sum(scope, 5);
assert_eq!(total, 31);
let total = sum(scope, 10);
assert_eq!(total, 1023);
// eprintln!("Total sum: {}", total);
});
}

View file

@ -41,6 +41,7 @@ impl WorkerThread {
}
impl WorkerThread {
#[tracing::instrument(level = "trace", skip_all)]
pub fn run(self: Box<Self>) {
let this = Box::into_raw(self);
unsafe {
@ -62,6 +63,7 @@ impl WorkerThread {
tracing::trace!("WorkerThread::run: worker thread finished");
}
#[tracing::instrument(level = "trace", skip_all)]
fn run_inner(&self) {
let mut job = None;
'outer: loop {
@ -91,6 +93,7 @@ impl WorkerThread {
/// which it returns `None`.
/// The caller should then check for `should_exit` to determine if the
/// thread should exit, or look for work again.
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn find_work_or_wait(&self) -> Option<NonNull<Job>> {
match self.find_work_inner() {
either::Either::Left(job) => {
@ -139,6 +142,7 @@ impl WorkerThread {
}
#[inline]
#[tracing::instrument(level = "trace", skip(self))]
fn execute(&self, job: NonNull<Job>) {
self.tick();
unsafe { Job::execute(job.as_ptr()) };
@ -236,6 +240,7 @@ impl HeartbeatThread {
Self { ctx }
}
#[tracing::instrument(level = "trace", skip(self))]
pub fn run(self) {
tracing::trace!("new heartbeat thread {:?}", std::thread::current());
@ -266,6 +271,7 @@ impl HeartbeatThread {
}
impl WorkerThread {
#[tracing::instrument(level = "trace", skip(self))]
pub fn wait_until_queued_job<T>(
&self,
job: *const QueuedJob,
@ -273,19 +279,31 @@ impl WorkerThread {
let recv = unsafe { (*job).as_receiver::<T>() };
// we've already checked that the job was popped from the queue
// check if shared job is our job
if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) {
if core::ptr::eq(shared_job.as_ptr(), job as *const Job as _) {
// this is the job we are looking for, so we want to
// short-circuit and call it inline
return None;
} else {
// this isn't the job we are looking for, but we still need to
// execute it
unsafe { Job::execute(shared_job.as_ptr()) };
}
}
// do the usual thing and wait for the job's latch
// if let Some(shared_job) = self.context.shared().jobs.remove(&self.heartbeat.id()) {
// if core::ptr::eq(shared_job.as_ptr(), job as *const Job as _) {
// // this is the job we are looking for, so we want to
// // short-circuit and call it inline
// tracing::trace!(
// thread = self.heartbeat.index(),
// "reclaiming shared job: {:?}",
// shared_job
// );
// return None;
// } else {
// // this isn't the job we are looking for, but we still need to
// // execute it
// tracing::trace!(
// thread = self.heartbeat.index(),
// "executing reclaimed shared job: {:?}",
// shared_job
// );
// unsafe { Job::execute(shared_job.as_ptr()) };
// }
// }
loop {
match recv.poll() {
Some(t) => {
@ -316,6 +334,7 @@ impl WorkerThread {
}
}
#[tracing::instrument(level = "trace", skip_all)]
pub fn wait_until_latch<L>(&self, latch: &L)
where
L: Probe,