diff --git a/src/praetor/mod.rs b/src/praetor/mod.rs index ebad441..90191cd 100644 --- a/src/praetor/mod.rs +++ b/src/praetor/mod.rs @@ -435,6 +435,7 @@ mod job { // these cannot be boxes because boxes are noalias. head: NonNull, tail: NonNull, + job_count: usize, } impl Debug for JobList { @@ -488,6 +489,7 @@ mod job { Self { head: NonNull::new_unchecked(head), tail: NonNull::new_unchecked(tail), + job_count: 0, } } } @@ -501,6 +503,7 @@ mod job { /// elem must be valid until it is popped. pub unsafe fn push_front(&mut self, elem: *const Job) { + self.job_count += 1; let head_link = unsafe { self.head.as_ref().link_mut() }; // SAFETY: head will always have a previous element. @@ -518,6 +521,7 @@ mod job { /// elem must be valid until it is popped. pub unsafe fn push_back(&mut self, elem: *const Job) { + self.job_count += 1; let tail_link = unsafe { self.tail.as_ref().link_mut() }; // SAFETY: tail will always have a previous element. @@ -535,6 +539,7 @@ mod job { #[allow(dead_code)] pub fn pop_front(&mut self) -> Option> { + self.job_count -= 1; let head_link = unsafe { self.head.as_ref().link_mut() }; // SAFETY: head will always have a previous element. @@ -551,6 +556,7 @@ mod job { } pub fn pop_back(&mut self) -> Option> { + self.job_count -= 1; // TODO: next and elem might be the same let tail_link = unsafe { self.tail.as_ref().link_mut() }; @@ -566,6 +572,15 @@ mod job { Some(elem) } + + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.job_count == 0 + } + + pub fn len(&self) -> usize { + self.job_count + } } impl Drop for JobList { @@ -1272,7 +1287,10 @@ impl WorkerThread { // TODO: add counter to job queue, check for low job count to decide whether to use heartbeat or seq. // see: chili - if self.join_count.get() == 1 { + + // SAFETY: this function runs in a worker thread, so we can access the queue safely. + if count == 0 || unsafe { self.queue.as_ref_unchecked().len() } < 3 { + cold_path(); self.join_heartbeat(a, b) } else { self.join_seq(a, b) @@ -1946,15 +1964,32 @@ fn worker(ctx: Arc, barrier: Arc) { barrier.wait(); let mut job = ctx.shared.lock().pop_job(); - loop { - if let Some(job) = job { - worker.execute(job); - } + 'outer: loop { + let mut guard = loop { + if let Some(job) = job { + worker.execute(job); + } - let mut guard = ctx.shared.lock(); - if guard.should_stop { - break; - } + let mut guard = ctx.shared.lock(); + if guard.should_stop { + // if the context is stopped, break out of the outer loop which + // will exit the thread. + break 'outer; + } + + match guard.pop_job() { + Some(job) => { + tracing::trace!("worker: popping job: {:?}", job); + // found job, continue inner loop + continue; + } + None => { + tracing::trace!("worker: no job, waiting for shared job"); + // no more jobs, break out of inner loop and wait for shared job + break guard; + } + } + }; ctx.shared_job.wait(&mut guard); job = guard.pop_job();