This commit is contained in:
Janis 2025-06-20 21:41:52 +02:00
parent 448d2d02b4
commit 3730952cad
2 changed files with 23 additions and 23 deletions

View file

@ -71,6 +71,8 @@ mod util {
None => None,
}
}
#[allow(dead_code)]
pub const unsafe fn new_unchecked(ptr: *mut T) -> Self {
unsafe { Self(NonNull::new_unchecked(ptr)) }
}
@ -79,6 +81,7 @@ mod util {
Self::new(ptr.cast_mut())
}
#[allow(dead_code)]
pub const unsafe fn new_const_unchecked(ptr: *const T) -> Self {
Self::new_unchecked(ptr.cast_mut())
}
@ -682,6 +685,8 @@ mod job {
mem::transmute::<&Job<T>, &Job<U>>(self)
}
/// unwraps the `this` pointer, which is only valid if the job is in the empty state.
#[allow(dead_code)]
pub unsafe fn unwrap_this(&self) -> NonNull<()> {
assert!(self.state() == JobState::Empty as u8);
unsafe { (&*self.val_or_this.get()).this }
@ -900,6 +905,8 @@ mod job {
})
}
/// unwraps the job into it's closure.
#[allow(dead_code)]
pub fn into_inner(self) -> F {
self.f
}
@ -994,6 +1001,8 @@ mod job {
Self { result }
}
/// convert JobResult into a thread result.
#[allow(dead_code)]
pub fn into_inner(self) -> std::thread::Result<T> {
self.result
}
@ -1121,6 +1130,7 @@ impl WorkerThread {
}
}
#[allow(dead_code)]
fn drop_current_guard(new: Option<NonNull<Self>>) -> DropGuard<impl FnOnce()> {
DropGuard::new(move || unsafe {
if let Some(old) = Self::unset_current() {
@ -1185,15 +1195,6 @@ impl WorkerThread {
unsafe { self.queue.as_mut_unchecked().pop_front() }
}
fn complete_jobs(&self) {
while let Some(job) = self.pop_front() {
unsafe {
job.as_ref().set_pending();
}
self.execute(job);
}
}
#[inline(always)]
fn tick(&self) {
if self.heartbeat.load(Ordering::Relaxed) {
@ -1402,14 +1403,13 @@ where
}
impl<'scope> Scope<'scope> {
fn wait_for_jobs(&self) {
let thread = WorkerThread::current_ref().unwrap();
fn wait_for_jobs(&self, worker: &WorkerThread) {
tracing::trace!("waiting for {} jobs to finish.", self.job_counter.count());
tracing::trace!("thread id: {:?}, jobs: {:?}", thread.index, unsafe {
thread.queue.as_ref_unchecked()
tracing::trace!("thread id: {:?}, jobs: {:?}", worker.index, unsafe {
worker.queue.as_ref_unchecked()
});
thread.wait_until_latch(&self.job_counter);
worker.wait_until_latch(&self.job_counter);
unsafe { self.job_counter.wait() };
}
@ -1418,9 +1418,9 @@ impl<'scope> Scope<'scope> {
F: FnOnce(&Self) -> R + Send,
R: Send,
{
run_in_worker(|thread| {
let this = Self::from_context(thread.context.clone());
this.complete(|| f(&this))
run_in_worker(|worker| {
let this = Self::from_context(worker.context.clone());
this.complete(worker, || f(&this))
})
}
@ -1429,14 +1429,14 @@ impl<'scope> Scope<'scope> {
F: FnOnce(&Self) -> R + Send,
R: Send,
{
context.run_in_worker(|_| {
context.run_in_worker(|worker| {
let this = Self::from_context(context.clone());
this.complete(|| f(&this))
this.complete(worker, || f(&this))
})
}
/// should be called from within a worker thread.
fn complete<F, R>(&self, f: F) -> R
fn complete<F, R>(&self, worker: &WorkerThread, f: F) -> R
where
F: FnOnce() -> R + Send,
R: Send,
@ -1468,7 +1468,7 @@ impl<'scope> Scope<'scope> {
}
};
self.wait_for_jobs();
self.wait_for_jobs(worker);
self.maybe_propagate_panic();
// SAFETY: if result panicked, we would have propagated the panic above.
@ -1642,7 +1642,7 @@ where
/// run two closures potentially in parallel, in the global threadpool.
#[allow(dead_code)]
pub fn join_in<A, B, RA, RB>(context: Arc<Context>, a: A, b: B) -> (RA, RB)
fn join_in<A, B, RA, RB>(context: Arc<Context>, a: A, b: B) -> (RA, RB)
where
RA: Send,
RB: Send,

View file

@ -451,7 +451,7 @@ fn join() {
#[test]
#[traced_test]
fn join_many() {
use crate::util::tree::{Tree, TREE_SIZE};
use crate::util::tree::Tree;
let pool = ThreadPool::new();