From 26b6ef264c2b92515764fad62b67925936d4c366 Mon Sep 17 00:00:00 2001 From: Janis Date: Sat, 5 Jul 2025 14:20:42 +0200 Subject: [PATCH] more garbage, but what did you expect --- distaff/src/channel.rs | 73 +++------------- distaff/src/context.rs | 60 +++++++------ distaff/src/job.rs | 138 ++++++++++++++++------------- distaff/src/join.rs | 16 ++-- distaff/src/queue.rs | 169 +++++++++++++----------------------- distaff/src/scope.rs | 38 ++++---- distaff/src/workerthread.rs | 36 ++------ examples/join.rs | 3 +- 8 files changed, 224 insertions(+), 309 deletions(-) diff --git a/distaff/src/channel.rs b/distaff/src/channel.rs index fc7ca2d..b5f98a8 100644 --- a/distaff/src/channel.rs +++ b/distaff/src/channel.rs @@ -17,62 +17,9 @@ enum State { Taken, } -// taken from `std` -#[derive(Debug)] -#[repr(transparent)] -pub struct Parker { - mutex: AtomicU32, -} +pub use werkzeug::sync::Parker; -impl Parker { - const PARKED: u32 = u32::MAX; - const EMPTY: u32 = 0; - const NOTIFIED: u32 = 1; - pub fn new() -> Self { - Self { - mutex: AtomicU32::new(Self::EMPTY), - } - } - - pub fn is_parked(&self) -> bool { - self.mutex.load(Ordering::Acquire) == Self::PARKED - } - - #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all, fields(this = self as *const Self as usize)))] - pub fn park(&self) { - if self.mutex.fetch_sub(1, Ordering::Acquire) == Self::NOTIFIED { - // The thread was notified, so we can return immediately. - return; - } - - loop { - atomic_wait::wait(&self.mutex, Self::PARKED); - - // We check whether we were notified or woke up spuriously with - // acquire ordering in order to make-visible any writes made by the - // thread that notified us. - if self.mutex.swap(Self::EMPTY, Ordering::Acquire) == Self::NOTIFIED { - // The thread was notified, so we can return immediately. - return; - } else { - // spurious wakeup, so we need to re-park. - continue; - } - } - } - - #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all, fields(this = self as *const Self as usize)))] - pub fn unpark(&self) { - // write with Release ordering to ensure that any writes made by this - // thread are made-available to the unparked thread. - if self.mutex.swap(Self::NOTIFIED, Ordering::Release) == Self::PARKED { - // The thread was parked, so we need to notify it. - atomic_wait::wake_one(&self.mutex); - } else { - // The thread was not parked, so we don't need to do anything. - } - } -} +use crate::queue::Queue; #[derive(Debug)] #[repr(C)] @@ -104,6 +51,10 @@ impl Receiver { self.0.state.load(Ordering::Acquire) != State::Ready as u8 } + pub fn sender(&self) -> Sender { + Sender(self.0.clone()) + } + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] pub fn wait(&self) { loop { @@ -182,15 +133,15 @@ impl Receiver { // `State::Ready`. // // In either case, this thread now has unique access to `val`. - unsafe { self.take() } - } - - unsafe fn take(&self) -> thread::Result { assert_eq!( self.0.state.swap(State::Taken as u8, Ordering::Acquire), State::Ready as u8 ); + unsafe { self.take() } + } + + unsafe fn take(&self) -> thread::Result { let result = unsafe { (*self.0.val.get()).take().map(|b| *b).unwrap() }; result @@ -222,6 +173,10 @@ impl Sender { } } + pub unsafe fn parker(&self) -> &Parker { + unsafe { self.0.waiting_thread.as_ref() } + } + /// The caller must ensure that this function or `send` are only ever called once. pub unsafe fn send_as_ref(&self, val: thread::Result) { // SAFETY: diff --git a/distaff/src/context.rs b/distaff/src/context.rs index aadf1f1..f878b58 100644 --- a/distaff/src/context.rs +++ b/distaff/src/context.rs @@ -1,7 +1,7 @@ use std::{ cell::UnsafeCell, marker::PhantomPinned, - mem::ManuallyDrop, + mem::{self, ManuallyDrop}, panic::{AssertUnwindSafe, catch_unwind}, pin::Pin, ptr::NonNull, @@ -14,7 +14,6 @@ use std::{ use alloc::collections::BTreeMap; use async_task::Runnable; -use parking_lot::{Condvar, Mutex}; use crate::{ channel::{Parker, Sender}, @@ -34,7 +33,7 @@ pub struct Context { pub(crate) enum Message { Shared(SharedJob), - Finished(werkzeug::util::Send>>), + WakeUp, Exit, ScopeFinished, } @@ -146,10 +145,11 @@ impl Context { let job = unsafe { Pin::new_unchecked(&_pinned) }; let job = Job::from_stackjob(&job); + unsafe { + self.inject_job(job.share(Some(worker.receiver.get_token().as_parker()))); + } - self.inject_job(job.share(Some(worker.receiver.get_token()))); - - let t = worker.wait_until_shared_job(&job); + let t = worker.wait_until_recv(job.take_receiver().expect("Job should have a receiver")); // touch the job to ensure it is dropped after we are done with it. drop(_pinned); @@ -165,24 +165,21 @@ impl Context { { // current thread isn't a worker thread, create job and inject into context let parker = Parker::new(); - let (send, recv) = crate::channel::channel::(NonNull::from(&parker)); - struct CrossJob { + struct CrossJob { f: UnsafeCell>, - send: Sender, _pin: PhantomPinned, } - impl CrossJob { - fn new(f: F, send: Sender) -> Self { + impl CrossJob { + fn new(f: F) -> Self { Self { f: UnsafeCell::new(ManuallyDrop::new(f)), - send, _pin: PhantomPinned, } } - fn into_job(self: Pin<&Self>) -> Job + fn into_job(self: &Self) -> Job where F: FnOnce(&WorkerThread) -> T + Send, T: Send, @@ -194,30 +191,41 @@ impl Context { unsafe { ManuallyDrop::take(&mut *self.f.get()) } } - unsafe fn harness(worker: &WorkerThread, this: NonNull<()>, _: Option) + #[align(8)] + unsafe fn harness(worker: &WorkerThread, this: NonNull<()>, sender: Option) where F: FnOnce(&WorkerThread) -> T + Send, T: Send, { - let this: &CrossJob = unsafe { this.cast().as_ref() }; + let this: &CrossJob = unsafe { this.cast().as_ref() }; let f = unsafe { this.unwrap() }; + let sender: Option> = unsafe { mem::transmute(sender) }; + let result = catch_unwind(AssertUnwindSafe(|| f(worker))); + + let sender = sender.unwrap(); unsafe { - this.send - .send_as_ref(catch_unwind(AssertUnwindSafe(|| f(worker)))); + sender.send_as_ref(result); + worker + .context + .queue + .as_sender() + .unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker())); } } } - let _pinned = CrossJob::new(move |worker: &WorkerThread| f(worker), send); - let job = unsafe { Pin::new_unchecked(&_pinned) }; + let pinned = CrossJob::new(move |worker: &WorkerThread| f(worker)); + let job2 = pinned.into_job(); - self.inject_job(job.into_job().share(None)); + self.inject_job(job2.share(Some(&parker))); + + let recv = job2.take_receiver().unwrap(); let out = crate::util::unwrap_or_panic(recv.recv()); // touch the job to ensure it is dropped after we are done with it. - drop(_pinned); + drop(pinned); drop(parker); out @@ -276,7 +284,7 @@ impl Context { { let schedule = move |runnable: Runnable| { #[align(8)] - unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option) { + unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option) { unsafe { let runnable = Runnable::<()>::from_raw(this); runnable.run(); @@ -379,7 +387,6 @@ mod tests { let counter = Arc::new(AtomicU8::new(0)); let parker = Parker::new(); - let receiver = ctx.queue.new_receiver(); let job = StackJob::new({ let counter = counter.clone(); @@ -405,15 +412,14 @@ mod tests { assert!(heartbeat.is_waiting()); }); - ctx.inject_job(job.share(Some(receiver.get_token()))); + ctx.inject_job(job.share(Some(&parker))); // Wait for the job to be executed - assert!(job.is_shared()); - let Message::Finished(werkzeug::util::Send(result)) = receiver.recv() else { + let recv = job.take_receiver().expect("Job should have a receiver"); + let Some(result) = recv.poll() else { panic!("Expected a finished message"); }; - let result = unsafe { *Box::from_non_null(result.cast()) }; let result = crate::util::unwrap_or_panic::(result); assert_eq!(result, 42); assert_eq!(counter.load(Ordering::SeqCst), 1); diff --git a/distaff/src/job.rs b/distaff/src/job.rs index 5e0751b..7a6ed21 100644 --- a/distaff/src/job.rs +++ b/distaff/src/job.rs @@ -10,7 +10,7 @@ use alloc::boxed::Box; use crate::{ WorkerThread, - channel::{Parker, Sender}, + channel::{Parker, Receiver, Sender}, context::Message, queue::ReceiverToken, }; @@ -45,31 +45,36 @@ impl HeapJob { } } -type JobHarness = - unsafe fn(&WorkerThread, this: NonNull<()>, sender: Option); +type JobHarness = unsafe fn(&WorkerThread, this: NonNull<()>, sender: Option); #[repr(C)] pub struct Job2 { - harness: Cell>, - this: NonNull<()>, - _phantom: core::marker::PhantomData, - _pin: PhantomPinned, + inner: UnsafeCell>, } impl Debug for Job2 { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Job2") - .field("harness", &self.harness) - .field("this", &self.this) - .finish_non_exhaustive() + f.debug_struct("Job2").field("inner", &self.inner).finish() } } +#[repr(C)] +pub enum Job2Inner { + Local { + harness: JobHarness, + this: NonNull<()>, + _pin: PhantomPinned, + }, + Shared { + receiver: Cell>>, + }, +} + #[derive(Debug)] pub struct SharedJob { harness: JobHarness, this: NonNull<()>, - sender: Option, + sender: Option>, } unsafe impl Send for SharedJob {} @@ -77,37 +82,52 @@ unsafe impl Send for SharedJob {} impl Job2 { fn new(harness: JobHarness, this: NonNull<()>) -> Self { let this = Self { - harness: Cell::new(Some(harness)), - this, - _phantom: core::marker::PhantomData, - _pin: PhantomPinned, + inner: UnsafeCell::new(Job2Inner::Local { + harness: harness, + this, + _pin: PhantomPinned, + }), }; - #[cfg(feature = "tracing")] - tracing::trace!("new job: {:?}", this); - this } - pub fn share(&self, parker: Option) -> SharedJob { - #[cfg(feature = "tracing")] - tracing::trace!("sharing job: {:?}", self); - - // let (sender, receiver) = parker - // .map(|parker| crate::channel::channel::(parker.into())) - // .unzip(); + pub fn share(&self, parker: Option<&Parker>) -> SharedJob { + let (sender, receiver) = parker + .map(|parker| crate::channel::channel::(parker.into())) + .unzip(); // self.receiver.set(receiver); - - SharedJob { - harness: self.harness.take().unwrap(), - this: self.this, - sender: parker, + if let Job2Inner::Local { + harness, + this, + _pin: _, + } = unsafe { + self.inner.replace(Job2Inner::Shared { + receiver: Cell::new(receiver), + }) + } { + // SAFETY: `this` is a valid pointer to the job. + unsafe { + SharedJob { + harness, + this, + sender: mem::transmute(sender), // Convert `Option>` to `Option>` + } + } + } else { + panic!("Job2 is already shared"); } } - pub fn is_shared(&self) -> bool { - self.harness.clone().get().is_none() + pub fn take_receiver(&self) -> Option> { + unsafe { + if let Job2Inner::Shared { receiver } = self.inner.as_ref_unchecked() { + receiver.take() + } else { + None + } + } } pub fn from_stackjob(job: &StackJob) -> Self @@ -119,15 +139,13 @@ impl Job2 { feature = "tracing", tracing::instrument(level = "trace", skip_all, name = "stack_job_harness") )] - unsafe fn harness( - worker: &WorkerThread, - this: NonNull<()>, - sender: Option, - ) where + unsafe fn harness(worker: &WorkerThread, this: NonNull<()>, sender: Option) + where F: FnOnce(&WorkerThread) -> T + Send, T: Send, { use std::panic::{AssertUnwindSafe, catch_unwind}; + let sender: Option> = unsafe { mem::transmute(sender) }; let f = unsafe { this.cast::>().as_ref().unwrap() }; @@ -142,15 +160,15 @@ impl Job2 { let result = catch_unwind(AssertUnwindSafe(|| f(worker))); - if let Some(token) = sender { - worker.context.queue.as_sender().unicast( - Message::Finished(werkzeug::util::Send( - // SAFETY: T is guaranteed to be `Sized`, so - // `NonNull` is the same size for any `T`. - Box::into_non_null(Box::new(result)).cast(), - )), - token, - ); + if let Some(sender) = sender { + unsafe { + sender.send_as_ref(result); + worker + .context + .queue + .as_sender() + .unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker())); + } } } @@ -166,15 +184,13 @@ impl Job2 { feature = "tracing", tracing::instrument(level = "trace", skip_all, name = "heap_job_harness") )] - unsafe fn harness( - worker: &WorkerThread, - this: NonNull<()>, - sender: Option, - ) where + unsafe fn harness(worker: &WorkerThread, this: NonNull<()>, sender: Option) + where F: FnOnce(&WorkerThread) -> T + Send, T: Send, { use std::panic::{AssertUnwindSafe, catch_unwind}; + let sender: Option> = unsafe { mem::transmute(sender) }; // expect MIRI to complain about this, but it is actually correct. // because I am so much smarter than MIRI, naturally, obviously. @@ -182,15 +198,15 @@ impl Job2 { let f = unsafe { (*Box::from_non_null(this.cast::>())).into_inner() }; let result = catch_unwind(AssertUnwindSafe(|| f(worker))); - if let Some(token) = sender { - worker.context.queue.as_sender().unicast( - Message::Finished(werkzeug::util::Send( - // SAFETY: T is guaranteed to be `Sized`, so - // `NonNull` is the same size for any `T`. - Box::into_non_null(Box::new(result)).cast(), - )), - token, - ); + if let Some(sender) = sender { + unsafe { + sender.send_as_ref(result); + _ = worker + .context + .queue + .as_sender() + .unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker())); + } } } diff --git a/distaff/src/join.rs b/distaff/src/join.rs index 784829c..d5834a6 100644 --- a/distaff/src/join.rs +++ b/distaff/src/join.rs @@ -118,16 +118,16 @@ impl WorkerThread { cold_path(); // if b panicked, we need to wait for a to finish - if job.is_shared() { - _ = self.wait_until_recv::(); + if let Some(recv) = job.take_receiver() { + _ = self.wait_until_recv(recv); } resume_unwind(payload); } }; - let ra = if job.is_shared() { - crate::util::unwrap_or_panic(self.wait_until_recv()) + let ra = if let Some(recv) = job.take_receiver() { + crate::util::unwrap_or_panic(self.wait_until_recv(recv)) } else { self.pop_back(); @@ -171,16 +171,16 @@ impl WorkerThread { cold_path(); // if b panicked, we need to wait for a to finish - if job.is_shared() { - _ = self.wait_until_recv::(); + if let Some(recv) = job.take_receiver() { + _ = self.wait_until_recv(recv); } resume_unwind(payload); } }; - let ra = if job.is_shared() { - crate::util::unwrap_or_panic(self.wait_until_recv()) + let ra = if let Some(recv) = job.take_receiver() { + crate::util::unwrap_or_panic(self.wait_until_recv(recv)) } else { self.pop_back(); diff --git a/distaff/src/queue.rs b/distaff/src/queue.rs index 6d41070..2975e18 100644 --- a/distaff/src/queue.rs +++ b/distaff/src/queue.rs @@ -183,6 +183,23 @@ impl Drop for Slot { #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct ReceiverToken(werkzeug::util::Send>); +impl ReceiverToken { + pub fn as_ptr(&self) -> *mut u32 { + self.0.into_inner().as_ptr() + } + + pub unsafe fn as_parker(&self) -> &Parker { + // SAFETY: The pointer is guaranteed to be valid and aligned, as it comes from a pinned Parker. + unsafe { Parker::from_ptr(self.as_ptr()) } + } + + pub unsafe fn from_parker(parker: &Parker) -> Self { + // SAFETY: The pointer is guaranteed to be valid and aligned, as it comes from a pinned Parker. + let ptr = NonNull::from(parker).cast::(); + ReceiverToken(werkzeug::util::Send(ptr)) + } +} + impl Queue { pub fn new() -> Arc { Arc::new(Self { @@ -299,9 +316,12 @@ impl Receiver { // there was no message for this receiver, so we need to park it queue.receivers.get_mut(&token).unwrap().1 = true; // mark the slot as parked - // wait for a message to be sent to this receiver - drop(_guard); - self.lock.0.park(); + self.lock.0.park_with_callback(move || { + // drop the lock guard after having set the lock state to waiting. + // this avoids a deadlock if the sender tries to send a message + // while the receiver is in the process of parking (I think..) + drop(_guard); + }); } } @@ -390,7 +410,7 @@ impl Sender { let _guard = self.queue.lock(); let queue = self.queue.inner(); - let Some(CachePadded((slot, is_parked))) = queue.receivers.get_mut(&receiver) else { + let Some(CachePadded((slot, _))) = queue.receivers.get_mut(&receiver) else { return Err(value); }; @@ -398,12 +418,9 @@ impl Sender { slot.push(value); } - // check if the receiver is parked - if *is_parked { - // wake the receiver - unsafe { - Parker::from_ptr(receiver.0.into_inner().as_ptr()).unpark(); - } + // wake the receiver + unsafe { + Parker::from_ptr(receiver.0.into_inner().as_ptr()).unpark(); } Ok(()) @@ -419,17 +436,14 @@ impl Sender { let queue = self.queue.inner(); // send the message to all receivers - for (token, CachePadded((slot, is_parked))) in queue.receivers.iter() { + for (token, CachePadded((slot, _))) in queue.receivers.iter() { // SAFETY: The slot is owned by this receiver. unsafe { slot.push(value.clone()) }; - // check if the receiver is parked - if *is_parked { - // wake the receiver - unsafe { - Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); - } + // wake the receiver + unsafe { + Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); } } } @@ -444,17 +458,15 @@ impl Sender { let queue = self.queue.inner(); // send the message to all receivers - for (token, CachePadded((slot, is_parked))) in queue.receivers.iter() { + for (token, CachePadded((slot, _))) in queue.receivers.iter() { // SAFETY: The slot is owned by this receiver. unsafe { slot.push(f()) }; // check if the receiver is parked - if *is_parked { - // wake the receiver - unsafe { - Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); - } + // wake the receiver + unsafe { + Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); } } } @@ -597,88 +609,31 @@ mod tests { "All DropCheck instances should have been dropped" ); } + + #[test] + fn send_self() { + // Test that sending a message to self works + let queue = Queue::::new(); + let sender = queue.new_sender(); + let receiver = queue.new_receiver(); + + sender.unicast(42, receiver.get_token()).unwrap(); + assert_eq!(receiver.recv(), 42); + } + + #[test] + fn send_self_many() { + // Test that sending multiple messages to self works + let queue = Queue::::new(); + let sender = queue.new_sender(); + let receiver = queue.new_receiver(); + + for i in 0..10 { + sender.unicast(i, receiver.get_token()).unwrap(); + } + + for i in (0..10).rev() { + assert_eq!(receiver.recv(), i); + } + } } - -// struct AtomicLIFO { -// cell: AtomicOption, -// next: AtomicPtr, -// } - -// impl AtomicLIFO { -// fn new() -> Self { -// Self { -// cell: AtomicOption::new(), -// next: AtomicPtr::new(ptr::null_mut()), -// } -// } - -// fn new_with_value(value: T) -> Self { -// Self { -// cell: AtomicOption::from_option(Some(value)), -// next: AtomicPtr::new(ptr::null_mut()), -// } -// } - -// /// inserts a value into the chain, either inserting it into the current -// /// cell, or allocating a new cell to store it in and atomically linking it -// /// to the chain. -// fn push(&self, value: T) { -// match self.cell.swap(Some(value)) { -// Some(old) => { -// // there was previously a value in this cell, so we have to -// // allocate a new cell and link it -// let next = Box::into_raw(Box::new(Self::new_with_value(old))); - -// let mut current_next = self.next.load(Ordering::Acquire); -// loop { -// unsafe { -// (&*next).next.store(current_next, Ordering::Relaxed); -// } - -// match self.next.compare_exchange_weak( -// current_next, -// next, -// Ordering::Release, -// Ordering::Relaxed, -// ) { -// Ok(_) => { -// // we successfully linked the new cell, so we can return -// return; -// } -// Err(next) => { -// // the next pointer was changed, so we need to try again -// current_next = next; -// } -// } -// } -// } -// None => {} -// } -// } - -// fn pop(&self) -> Option { -// // try to take the value from the current cell -// // if there is no value, then we are done, because there will also be no next cell -// let value = self.cell.take()?; - -// // try to atomically swap -// } - -// fn alloc_next(&self) -> NonNull { -// let next = Box::into_raw(Box::new(Self::new())); - -// match self.next.compare_exchange_weak( -// ptr::null_mut(), -// next, -// Ordering::Release, -// Ordering::Relaxed, -// ) { -// Ok(next) => unsafe { NonNull::new_unchecked(next) }, -// Err(other) => { -// // next was allocated under us, so we need to drop the slot we just allocated again. -// _ = unsafe { Box::from_raw(next) }; -// unsafe { NonNull::new_unchecked(other) } -// } -// } -// } -// } diff --git a/distaff/src/scope.rs b/distaff/src/scope.rs index bcc5125..9301d76 100644 --- a/distaff/src/scope.rs +++ b/distaff/src/scope.rs @@ -212,20 +212,13 @@ impl<'scope, 'env> Scope<'scope, 'env> { Message::Shared(shared_job) => unsafe { SharedJob::execute(shared_job, self.worker()); }, - Message::Finished(util::Send(result)) => { - #[cfg(feature = "tracing")] - tracing::error!( - "received result when waiting for jobs to finish: {:p}.", - result - ); - } - Message::Exit => {} Message::ScopeFinished => { #[cfg(feature = "tracing")] tracing::trace!("scope finished, decrementing outstanding jobs."); assert_eq!(self.inner().outstanding_jobs.load(Ordering::Acquire), 0); break; } + Message::WakeUp | Message::Exit => {} } } } @@ -266,10 +259,11 @@ impl<'scope, 'env> Scope<'scope, 'env> { ) } + #[align(8)] unsafe fn harness<'scope, 'env, T>( worker: &WorkerThread, this: NonNull<()>, - _: Option, + _: Option, ) where F: FnOnce(Scope<'scope, 'env>) -> T + Send, 'env: 'scope, @@ -341,7 +335,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { let schedule = move |runnable: Runnable| { #[align(8)] - unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option) { + unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option) { unsafe { let runnable = Runnable::<()>::from_raw(this.cast()); runnable.run(); @@ -406,26 +400,32 @@ impl<'scope, 'env> Scope<'scope, 'env> { unsafe { ManuallyDrop::take(&mut *self.f.get()) } } + #[align(8)] unsafe fn harness<'scope, 'env, T>( worker: &WorkerThread, this: NonNull<()>, - sender: Option, + sender: Option, ) where F: FnOnce(Scope<'scope, 'env>) -> T + Send, 'env: 'scope, T: Send, { let this: &ScopeJob = unsafe { this.cast().as_ref() }; + let sender: Option> = unsafe { mem::transmute(sender) }; let f = unsafe { this.unwrap() }; let scope = unsafe { Scope::<'scope, 'env>::new_unchecked(worker, this.inner) }; - _ = worker.context.queue.as_sender().unicast( - Message::Finished(werkzeug::util::Send( - Box::into_non_null(Box::new(catch_unwind(AssertUnwindSafe(|| f(scope))))) - .cast(), - )), - sender.unwrap(), - ); + let result = catch_unwind(AssertUnwindSafe(|| f(scope))); + + let sender = sender.unwrap(); + unsafe { + sender.send_as_ref(result); + worker + .context + .queue + .as_sender() + .unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker())); + } } } @@ -451,7 +451,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { } } - let mut _pinned = ScopeJob::new(a, self.inner); + let _pinned = ScopeJob::new(a, self.inner); let job = unsafe { Pin::new_unchecked(&_pinned) }; let (a, b) = worker.join_heartbeat2(job, |_| b(*self)); diff --git a/distaff/src/workerthread.rs b/distaff/src/workerthread.rs index 5c418f2..f83201e 100644 --- a/distaff/src/workerthread.rs +++ b/distaff/src/workerthread.rs @@ -95,15 +95,8 @@ impl WorkerThread { Message::Shared(shared_job) => { self.execute(shared_job); } - Message::Finished(werkzeug::util::Send(ptr)) => { - #[cfg(feature = "tracing")] - tracing::error!( - "WorkerThread::run_inner: received finished message: {:?}", - ptr - ); - } Message::Exit => break, - Message::ScopeFinished => {} + Message::WakeUp | Message::ScopeFinished => {} } } } @@ -149,7 +142,8 @@ impl WorkerThread { .queue .as_sender() .try_anycast(Message::Shared(unsafe { - job.as_ref().share(Some(self.receiver.get_token())) + job.as_ref() + .share(Some(self.receiver.get_token().as_parker())) })) { unsafe { @@ -270,30 +264,18 @@ impl HeartbeatThread { } impl WorkerThread { - #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] - pub fn wait_until_shared_job(&self, job: &Job) -> std::thread::Result { - loop { - match self.receiver.recv() { - Message::Shared(shared_job) => unsafe { - SharedJob::execute(shared_job, self); - }, - Message::Finished(send) => { - break unsafe { *Box::from_non_null(send.0.cast()) }; - } - Message::Exit | Message::ScopeFinished => {} - } - } - } - #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] - pub fn wait_until_recv(&self) -> std::thread::Result { + pub fn wait_until_recv(&self, recv: Receiver) -> std::thread::Result { loop { + if let Some(result) = recv.poll() { + break result; + } + match self.receiver.recv() { Message::Shared(shared_job) => unsafe { SharedJob::execute(shared_job, self); }, - Message::Finished(send) => break unsafe { *Box::from_non_null(send.0.cast()) }, - Message::Exit | Message::ScopeFinished => {} + Message::WakeUp | Message::Exit | Message::ScopeFinished => {} } } } diff --git a/examples/join.rs b/examples/join.rs index 238cc0e..5939c9f 100644 --- a/examples/join.rs +++ b/examples/join.rs @@ -87,6 +87,7 @@ fn join_distaff(tree_size: usize) { let sum = sum(&tree, tree.root().unwrap(), s); sum }); + eprintln!("sum: {sum}"); std::hint::black_box(sum); } } @@ -134,7 +135,7 @@ fn join_rayon(tree_size: usize) { } fn main() { - //tracing_subscriber::fmt::init(); + // tracing_subscriber::fmt::init(); use tracing_subscriber::layer::SubscriberExt; tracing::subscriber::set_global_default( tracing_subscriber::registry().with(tracing_tracy::TracyLayer::default()),