always use heartbeat join

This commit is contained in:
Janis 2025-06-28 14:07:51 +02:00
parent 8b4eba5a19
commit 2a0372a8a0
2 changed files with 15 additions and 13 deletions

View file

@ -24,14 +24,20 @@ impl WorkerThread {
(ra, rb) (ra, rb)
} }
/// This function must be called from a worker thread.
#[inline] #[inline]
#[tracing::instrument(level = "trace", skip_all)] pub(crate) fn join_heartbeat_every<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
pub(crate) fn join_heartbeat_every<A, B, RA, RB, const TIMES: usize>( where
&self, A: FnOnce() -> RA + Send,
a: A, B: FnOnce() -> RB,
b: B, RA: Send,
) -> (RA, RB) {
// self.join_heartbeat_every_inner::<A, B, RA, RB, 2>(a, b)
self.join_heartbeat(a, b)
}
/// This function must be called from a worker thread.
#[inline(always)]
fn join_heartbeat_every_inner<A, B, RA, RB, const TIMES: usize>(&self, a: A, b: B) -> (RA, RB)
where where
RA: Send, RA: Send,
A: FnOnce() -> RA + Send, A: FnOnce() -> RA + Send,
@ -48,10 +54,6 @@ impl WorkerThread {
// SAFETY: this function runs in a worker thread, so we can access the queue safely. // SAFETY: this function runs in a worker thread, so we can access the queue safely.
if count == 0 || queue_len < 3 { if count == 0 || queue_len < 3 {
cold_path(); cold_path();
tracing::trace!(
queue_len = queue_len,
"join_heartbeat_every: using heartbeat join",
);
self.join_heartbeat(a, b) self.join_heartbeat(a, b)
} else { } else {
self.join_seq(a, b) self.join_seq(a, b)
@ -122,7 +124,7 @@ impl Context {
RB: Send, RB: Send,
{ {
// SAFETY: join_heartbeat_every is safe to call from a worker thread. // SAFETY: join_heartbeat_every is safe to call from a worker thread.
self.run_in_worker(move |worker| worker.join_heartbeat_every::<_, _, _, _, 64>(a, b)) self.run_in_worker(move |worker| worker.join_heartbeat_every::<_, _, _, _>(a, b))
} }
} }

View file

@ -277,7 +277,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
let worker = WorkerThread::current_ref().expect("join is run in workerthread."); let worker = WorkerThread::current_ref().expect("join is run in workerthread.");
let this = SendPtr::new_const(self).unwrap(); let this = SendPtr::new_const(self).unwrap();
worker.join_heartbeat_every::<_, _, _, _, 64>( worker.join_heartbeat_every::<_, _, _, _>(
{ {
let this = this; let this = this;
move || a(unsafe { this.as_ref() }) move || a(unsafe { this.as_ref() })