add job count to queue, loop over jobs when woken by heartbeat

This commit is contained in:
Janis 2025-06-23 13:54:48 +02:00
parent bfbcc2868f
commit f09d4e05d5

View file

@ -435,6 +435,7 @@ mod job {
// these cannot be boxes because boxes are noalias. // these cannot be boxes because boxes are noalias.
head: NonNull<Job>, head: NonNull<Job>,
tail: NonNull<Job>, tail: NonNull<Job>,
job_count: usize,
} }
impl Debug for JobList { impl Debug for JobList {
@ -488,6 +489,7 @@ mod job {
Self { Self {
head: NonNull::new_unchecked(head), head: NonNull::new_unchecked(head),
tail: NonNull::new_unchecked(tail), tail: NonNull::new_unchecked(tail),
job_count: 0,
} }
} }
} }
@ -501,6 +503,7 @@ mod job {
/// elem must be valid until it is popped. /// elem must be valid until it is popped.
pub unsafe fn push_front<T>(&mut self, elem: *const Job<T>) { pub unsafe fn push_front<T>(&mut self, elem: *const Job<T>) {
self.job_count += 1;
let head_link = unsafe { self.head.as_ref().link_mut() }; let head_link = unsafe { self.head.as_ref().link_mut() };
// SAFETY: head will always have a previous element. // SAFETY: head will always have a previous element.
@ -518,6 +521,7 @@ mod job {
/// elem must be valid until it is popped. /// elem must be valid until it is popped.
pub unsafe fn push_back<T>(&mut self, elem: *const Job<T>) { pub unsafe fn push_back<T>(&mut self, elem: *const Job<T>) {
self.job_count += 1;
let tail_link = unsafe { self.tail.as_ref().link_mut() }; let tail_link = unsafe { self.tail.as_ref().link_mut() };
// SAFETY: tail will always have a previous element. // SAFETY: tail will always have a previous element.
@ -535,6 +539,7 @@ mod job {
#[allow(dead_code)] #[allow(dead_code)]
pub fn pop_front(&mut self) -> Option<NonNull<Job>> { pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
self.job_count -= 1;
let head_link = unsafe { self.head.as_ref().link_mut() }; let head_link = unsafe { self.head.as_ref().link_mut() };
// SAFETY: head will always have a previous element. // SAFETY: head will always have a previous element.
@ -551,6 +556,7 @@ mod job {
} }
pub fn pop_back(&mut self) -> Option<NonNull<Job>> { pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
self.job_count -= 1;
// TODO: next and elem might be the same // TODO: next and elem might be the same
let tail_link = unsafe { self.tail.as_ref().link_mut() }; let tail_link = unsafe { self.tail.as_ref().link_mut() };
@ -566,6 +572,15 @@ mod job {
Some(elem) Some(elem)
} }
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.job_count == 0
}
pub fn len(&self) -> usize {
self.job_count
}
} }
impl Drop for JobList { impl Drop for JobList {
@ -1272,7 +1287,10 @@ impl WorkerThread {
// TODO: add counter to job queue, check for low job count to decide whether to use heartbeat or seq. // TODO: add counter to job queue, check for low job count to decide whether to use heartbeat or seq.
// see: chili // see: chili
if self.join_count.get() == 1 {
// SAFETY: this function runs in a worker thread, so we can access the queue safely.
if count == 0 || unsafe { self.queue.as_ref_unchecked().len() } < 3 {
cold_path();
self.join_heartbeat(a, b) self.join_heartbeat(a, b)
} else { } else {
self.join_seq(a, b) self.join_seq(a, b)
@ -1946,16 +1964,33 @@ fn worker(ctx: Arc<Context>, barrier: Arc<std::sync::Barrier>) {
barrier.wait(); barrier.wait();
let mut job = ctx.shared.lock().pop_job(); let mut job = ctx.shared.lock().pop_job();
loop { 'outer: loop {
let mut guard = loop {
if let Some(job) = job { if let Some(job) = job {
worker.execute(job); worker.execute(job);
} }
let mut guard = ctx.shared.lock(); let mut guard = ctx.shared.lock();
if guard.should_stop { if guard.should_stop {
break; // if the context is stopped, break out of the outer loop which
// will exit the thread.
break 'outer;
} }
match guard.pop_job() {
Some(job) => {
tracing::trace!("worker: popping job: {:?}", job);
// found job, continue inner loop
continue;
}
None => {
tracing::trace!("worker: no job, waiting for shared job");
// no more jobs, break out of inner loop and wait for shared job
break guard;
}
}
};
ctx.shared_job.wait(&mut guard); ctx.shared_job.wait(&mut guard);
job = guard.pop_job(); job = guard.pop_job();
} }