diff --git a/src/praetor/mod.rs b/src/praetor/mod.rs index 28c9d0b..974147a 100644 --- a/src/praetor/mod.rs +++ b/src/praetor/mod.rs @@ -71,6 +71,8 @@ mod util { None => None, } } + + #[allow(dead_code)] pub const unsafe fn new_unchecked(ptr: *mut T) -> Self { unsafe { Self(NonNull::new_unchecked(ptr)) } } @@ -79,6 +81,7 @@ mod util { Self::new(ptr.cast_mut()) } + #[allow(dead_code)] pub const unsafe fn new_const_unchecked(ptr: *const T) -> Self { Self::new_unchecked(ptr.cast_mut()) } @@ -682,6 +685,8 @@ mod job { mem::transmute::<&Job, &Job>(self) } + /// unwraps the `this` pointer, which is only valid if the job is in the empty state. + #[allow(dead_code)] pub unsafe fn unwrap_this(&self) -> NonNull<()> { assert!(self.state() == JobState::Empty as u8); unsafe { (&*self.val_or_this.get()).this } @@ -900,6 +905,8 @@ mod job { }) } + /// unwraps the job into it's closure. + #[allow(dead_code)] pub fn into_inner(self) -> F { self.f } @@ -994,6 +1001,8 @@ mod job { Self { result } } + /// convert JobResult into a thread result. + #[allow(dead_code)] pub fn into_inner(self) -> std::thread::Result { self.result } @@ -1121,6 +1130,7 @@ impl WorkerThread { } } + #[allow(dead_code)] fn drop_current_guard(new: Option>) -> DropGuard { DropGuard::new(move || unsafe { if let Some(old) = Self::unset_current() { @@ -1185,15 +1195,6 @@ impl WorkerThread { unsafe { self.queue.as_mut_unchecked().pop_front() } } - fn complete_jobs(&self) { - while let Some(job) = self.pop_front() { - unsafe { - job.as_ref().set_pending(); - } - self.execute(job); - } - } - #[inline(always)] fn tick(&self) { if self.heartbeat.load(Ordering::Relaxed) { @@ -1402,14 +1403,13 @@ where } impl<'scope> Scope<'scope> { - fn wait_for_jobs(&self) { - let thread = WorkerThread::current_ref().unwrap(); + fn wait_for_jobs(&self, worker: &WorkerThread) { tracing::trace!("waiting for {} jobs to finish.", self.job_counter.count()); - tracing::trace!("thread id: {:?}, jobs: {:?}", thread.index, unsafe { - thread.queue.as_ref_unchecked() + tracing::trace!("thread id: {:?}, jobs: {:?}", worker.index, unsafe { + worker.queue.as_ref_unchecked() }); - thread.wait_until_latch(&self.job_counter); + worker.wait_until_latch(&self.job_counter); unsafe { self.job_counter.wait() }; } @@ -1418,9 +1418,9 @@ impl<'scope> Scope<'scope> { F: FnOnce(&Self) -> R + Send, R: Send, { - run_in_worker(|thread| { - let this = Self::from_context(thread.context.clone()); - this.complete(|| f(&this)) + run_in_worker(|worker| { + let this = Self::from_context(worker.context.clone()); + this.complete(worker, || f(&this)) }) } @@ -1429,14 +1429,14 @@ impl<'scope> Scope<'scope> { F: FnOnce(&Self) -> R + Send, R: Send, { - context.run_in_worker(|_| { + context.run_in_worker(|worker| { let this = Self::from_context(context.clone()); - this.complete(|| f(&this)) + this.complete(worker, || f(&this)) }) } /// should be called from within a worker thread. - fn complete(&self, f: F) -> R + fn complete(&self, worker: &WorkerThread, f: F) -> R where F: FnOnce() -> R + Send, R: Send, @@ -1468,7 +1468,7 @@ impl<'scope> Scope<'scope> { } }; - self.wait_for_jobs(); + self.wait_for_jobs(worker); self.maybe_propagate_panic(); // SAFETY: if result panicked, we would have propagated the panic above. @@ -1642,7 +1642,7 @@ where /// run two closures potentially in parallel, in the global threadpool. #[allow(dead_code)] -pub fn join_in(context: Arc, a: A, b: B) -> (RA, RB) +fn join_in(context: Arc, a: A, b: B) -> (RA, RB) where RA: Send, RB: Send, diff --git a/src/praetor/tests.rs b/src/praetor/tests.rs index 2da68ca..efe1abf 100644 --- a/src/praetor/tests.rs +++ b/src/praetor/tests.rs @@ -451,7 +451,7 @@ fn join() { #[test] #[traced_test] fn join_many() { - use crate::util::tree::{Tree, TREE_SIZE}; + use crate::util::tree::Tree; let pool = ThreadPool::new();