more garbage, but what did you expect

This commit is contained in:
Janis 2025-07-05 14:20:42 +02:00
parent 268879d97e
commit 26b6ef264c
8 changed files with 224 additions and 309 deletions

View file

@ -17,62 +17,9 @@ enum State {
Taken, Taken,
} }
// taken from `std` pub use werkzeug::sync::Parker;
#[derive(Debug)]
#[repr(transparent)]
pub struct Parker {
mutex: AtomicU32,
}
impl Parker { use crate::queue::Queue;
const PARKED: u32 = u32::MAX;
const EMPTY: u32 = 0;
const NOTIFIED: u32 = 1;
pub fn new() -> Self {
Self {
mutex: AtomicU32::new(Self::EMPTY),
}
}
pub fn is_parked(&self) -> bool {
self.mutex.load(Ordering::Acquire) == Self::PARKED
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all, fields(this = self as *const Self as usize)))]
pub fn park(&self) {
if self.mutex.fetch_sub(1, Ordering::Acquire) == Self::NOTIFIED {
// The thread was notified, so we can return immediately.
return;
}
loop {
atomic_wait::wait(&self.mutex, Self::PARKED);
// We check whether we were notified or woke up spuriously with
// acquire ordering in order to make-visible any writes made by the
// thread that notified us.
if self.mutex.swap(Self::EMPTY, Ordering::Acquire) == Self::NOTIFIED {
// The thread was notified, so we can return immediately.
return;
} else {
// spurious wakeup, so we need to re-park.
continue;
}
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all, fields(this = self as *const Self as usize)))]
pub fn unpark(&self) {
// write with Release ordering to ensure that any writes made by this
// thread are made-available to the unparked thread.
if self.mutex.swap(Self::NOTIFIED, Ordering::Release) == Self::PARKED {
// The thread was parked, so we need to notify it.
atomic_wait::wake_one(&self.mutex);
} else {
// The thread was not parked, so we don't need to do anything.
}
}
}
#[derive(Debug)] #[derive(Debug)]
#[repr(C)] #[repr(C)]
@ -104,6 +51,10 @@ impl<T: Send> Receiver<T> {
self.0.state.load(Ordering::Acquire) != State::Ready as u8 self.0.state.load(Ordering::Acquire) != State::Ready as u8
} }
pub fn sender(&self) -> Sender<T> {
Sender(self.0.clone())
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub fn wait(&self) { pub fn wait(&self) {
loop { loop {
@ -182,15 +133,15 @@ impl<T: Send> Receiver<T> {
// `State::Ready`. // `State::Ready`.
// //
// In either case, this thread now has unique access to `val`. // In either case, this thread now has unique access to `val`.
unsafe { self.take() }
}
unsafe fn take(&self) -> thread::Result<T> {
assert_eq!( assert_eq!(
self.0.state.swap(State::Taken as u8, Ordering::Acquire), self.0.state.swap(State::Taken as u8, Ordering::Acquire),
State::Ready as u8 State::Ready as u8
); );
unsafe { self.take() }
}
unsafe fn take(&self) -> thread::Result<T> {
let result = unsafe { (*self.0.val.get()).take().map(|b| *b).unwrap() }; let result = unsafe { (*self.0.val.get()).take().map(|b| *b).unwrap() };
result result
@ -222,6 +173,10 @@ impl<T: Send> Sender<T> {
} }
} }
pub unsafe fn parker(&self) -> &Parker {
unsafe { self.0.waiting_thread.as_ref() }
}
/// The caller must ensure that this function or `send` are only ever called once. /// The caller must ensure that this function or `send` are only ever called once.
pub unsafe fn send_as_ref(&self, val: thread::Result<T>) { pub unsafe fn send_as_ref(&self, val: thread::Result<T>) {
// SAFETY: // SAFETY:

View file

@ -1,7 +1,7 @@
use std::{ use std::{
cell::UnsafeCell, cell::UnsafeCell,
marker::PhantomPinned, marker::PhantomPinned,
mem::ManuallyDrop, mem::{self, ManuallyDrop},
panic::{AssertUnwindSafe, catch_unwind}, panic::{AssertUnwindSafe, catch_unwind},
pin::Pin, pin::Pin,
ptr::NonNull, ptr::NonNull,
@ -14,7 +14,6 @@ use std::{
use alloc::collections::BTreeMap; use alloc::collections::BTreeMap;
use async_task::Runnable; use async_task::Runnable;
use parking_lot::{Condvar, Mutex};
use crate::{ use crate::{
channel::{Parker, Sender}, channel::{Parker, Sender},
@ -34,7 +33,7 @@ pub struct Context {
pub(crate) enum Message { pub(crate) enum Message {
Shared(SharedJob), Shared(SharedJob),
Finished(werkzeug::util::Send<NonNull<std::thread::Result<()>>>), WakeUp,
Exit, Exit,
ScopeFinished, ScopeFinished,
} }
@ -146,10 +145,11 @@ impl Context {
let job = unsafe { Pin::new_unchecked(&_pinned) }; let job = unsafe { Pin::new_unchecked(&_pinned) };
let job = Job::from_stackjob(&job); let job = Job::from_stackjob(&job);
unsafe {
self.inject_job(job.share(Some(worker.receiver.get_token().as_parker())));
}
self.inject_job(job.share(Some(worker.receiver.get_token()))); let t = worker.wait_until_recv(job.take_receiver().expect("Job should have a receiver"));
let t = worker.wait_until_shared_job(&job);
// touch the job to ensure it is dropped after we are done with it. // touch the job to ensure it is dropped after we are done with it.
drop(_pinned); drop(_pinned);
@ -165,24 +165,21 @@ impl Context {
{ {
// current thread isn't a worker thread, create job and inject into context // current thread isn't a worker thread, create job and inject into context
let parker = Parker::new(); let parker = Parker::new();
let (send, recv) = crate::channel::channel::<T>(NonNull::from(&parker));
struct CrossJob<F, T> { struct CrossJob<F> {
f: UnsafeCell<ManuallyDrop<F>>, f: UnsafeCell<ManuallyDrop<F>>,
send: Sender<T>,
_pin: PhantomPinned, _pin: PhantomPinned,
} }
impl<F, T> CrossJob<F, T> { impl<F> CrossJob<F> {
fn new(f: F, send: Sender<T>) -> Self { fn new(f: F) -> Self {
Self { Self {
f: UnsafeCell::new(ManuallyDrop::new(f)), f: UnsafeCell::new(ManuallyDrop::new(f)),
send,
_pin: PhantomPinned, _pin: PhantomPinned,
} }
} }
fn into_job(self: Pin<&Self>) -> Job<T> fn into_job<T>(self: &Self) -> Job<T>
where where
F: FnOnce(&WorkerThread) -> T + Send, F: FnOnce(&WorkerThread) -> T + Send,
T: Send, T: Send,
@ -194,30 +191,41 @@ impl Context {
unsafe { ManuallyDrop::take(&mut *self.f.get()) } unsafe { ManuallyDrop::take(&mut *self.f.get()) }
} }
unsafe fn harness(worker: &WorkerThread, this: NonNull<()>, _: Option<ReceiverToken>) #[align(8)]
unsafe fn harness<T>(worker: &WorkerThread, this: NonNull<()>, sender: Option<Sender>)
where where
F: FnOnce(&WorkerThread) -> T + Send, F: FnOnce(&WorkerThread) -> T + Send,
T: Send, T: Send,
{ {
let this: &CrossJob<F, T> = unsafe { this.cast().as_ref() }; let this: &CrossJob<F> = unsafe { this.cast().as_ref() };
let f = unsafe { this.unwrap() }; let f = unsafe { this.unwrap() };
let sender: Option<Sender<T>> = unsafe { mem::transmute(sender) };
let result = catch_unwind(AssertUnwindSafe(|| f(worker)));
let sender = sender.unwrap();
unsafe { unsafe {
this.send sender.send_as_ref(result);
.send_as_ref(catch_unwind(AssertUnwindSafe(|| f(worker)))); worker
.context
.queue
.as_sender()
.unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker()));
} }
} }
} }
let _pinned = CrossJob::new(move |worker: &WorkerThread| f(worker), send); let pinned = CrossJob::new(move |worker: &WorkerThread| f(worker));
let job = unsafe { Pin::new_unchecked(&_pinned) }; let job2 = pinned.into_job();
self.inject_job(job.into_job().share(None)); self.inject_job(job2.share(Some(&parker)));
let recv = job2.take_receiver().unwrap();
let out = crate::util::unwrap_or_panic(recv.recv()); let out = crate::util::unwrap_or_panic(recv.recv());
// touch the job to ensure it is dropped after we are done with it. // touch the job to ensure it is dropped after we are done with it.
drop(_pinned); drop(pinned);
drop(parker); drop(parker);
out out
@ -276,7 +284,7 @@ impl Context {
{ {
let schedule = move |runnable: Runnable| { let schedule = move |runnable: Runnable| {
#[align(8)] #[align(8)]
unsafe fn harness<T>(_: &WorkerThread, this: NonNull<()>, _: Option<ReceiverToken>) { unsafe fn harness<T>(_: &WorkerThread, this: NonNull<()>, _: Option<Sender>) {
unsafe { unsafe {
let runnable = Runnable::<()>::from_raw(this); let runnable = Runnable::<()>::from_raw(this);
runnable.run(); runnable.run();
@ -379,7 +387,6 @@ mod tests {
let counter = Arc::new(AtomicU8::new(0)); let counter = Arc::new(AtomicU8::new(0));
let parker = Parker::new(); let parker = Parker::new();
let receiver = ctx.queue.new_receiver();
let job = StackJob::new({ let job = StackJob::new({
let counter = counter.clone(); let counter = counter.clone();
@ -405,15 +412,14 @@ mod tests {
assert!(heartbeat.is_waiting()); assert!(heartbeat.is_waiting());
}); });
ctx.inject_job(job.share(Some(receiver.get_token()))); ctx.inject_job(job.share(Some(&parker)));
// Wait for the job to be executed // Wait for the job to be executed
assert!(job.is_shared()); let recv = job.take_receiver().expect("Job should have a receiver");
let Message::Finished(werkzeug::util::Send(result)) = receiver.recv() else { let Some(result) = recv.poll() else {
panic!("Expected a finished message"); panic!("Expected a finished message");
}; };
let result = unsafe { *Box::from_non_null(result.cast()) };
let result = crate::util::unwrap_or_panic::<i32>(result); let result = crate::util::unwrap_or_panic::<i32>(result);
assert_eq!(result, 42); assert_eq!(result, 42);
assert_eq!(counter.load(Ordering::SeqCst), 1); assert_eq!(counter.load(Ordering::SeqCst), 1);

View file

@ -10,7 +10,7 @@ use alloc::boxed::Box;
use crate::{ use crate::{
WorkerThread, WorkerThread,
channel::{Parker, Sender}, channel::{Parker, Receiver, Sender},
context::Message, context::Message,
queue::ReceiverToken, queue::ReceiverToken,
}; };
@ -45,31 +45,36 @@ impl<F> HeapJob<F> {
} }
} }
type JobHarness = type JobHarness = unsafe fn(&WorkerThread, this: NonNull<()>, sender: Option<Sender>);
unsafe fn(&WorkerThread, this: NonNull<()>, sender: Option<crate::queue::ReceiverToken>);
#[repr(C)] #[repr(C)]
pub struct Job2<T = ()> { pub struct Job2<T = ()> {
harness: Cell<Option<JobHarness>>, inner: UnsafeCell<Job2Inner<T>>,
this: NonNull<()>,
_phantom: core::marker::PhantomData<fn(T)>,
_pin: PhantomPinned,
} }
impl<T> Debug for Job2<T> { impl<T> Debug for Job2<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Job2") f.debug_struct("Job2").field("inner", &self.inner).finish()
.field("harness", &self.harness)
.field("this", &self.this)
.finish_non_exhaustive()
} }
} }
#[repr(C)]
pub enum Job2Inner<T = ()> {
Local {
harness: JobHarness,
this: NonNull<()>,
_pin: PhantomPinned,
},
Shared {
receiver: Cell<Option<Receiver<T>>>,
},
}
#[derive(Debug)] #[derive(Debug)]
pub struct SharedJob { pub struct SharedJob {
harness: JobHarness, harness: JobHarness,
this: NonNull<()>, this: NonNull<()>,
sender: Option<crate::queue::ReceiverToken>, sender: Option<Sender<()>>,
} }
unsafe impl Send for SharedJob {} unsafe impl Send for SharedJob {}
@ -77,37 +82,52 @@ unsafe impl Send for SharedJob {}
impl<T: Send> Job2<T> { impl<T: Send> Job2<T> {
fn new(harness: JobHarness, this: NonNull<()>) -> Self { fn new(harness: JobHarness, this: NonNull<()>) -> Self {
let this = Self { let this = Self {
harness: Cell::new(Some(harness)), inner: UnsafeCell::new(Job2Inner::Local {
harness: harness,
this, this,
_phantom: core::marker::PhantomData,
_pin: PhantomPinned, _pin: PhantomPinned,
}),
}; };
#[cfg(feature = "tracing")]
tracing::trace!("new job: {:?}", this);
this this
} }
pub fn share(&self, parker: Option<crate::queue::ReceiverToken>) -> SharedJob { pub fn share(&self, parker: Option<&Parker>) -> SharedJob {
#[cfg(feature = "tracing")] let (sender, receiver) = parker
tracing::trace!("sharing job: {:?}", self); .map(|parker| crate::channel::channel::<T>(parker.into()))
.unzip();
// let (sender, receiver) = parker
// .map(|parker| crate::channel::channel::<T>(parker.into()))
// .unzip();
// self.receiver.set(receiver); // self.receiver.set(receiver);
if let Job2Inner::Local {
harness,
this,
_pin: _,
} = unsafe {
self.inner.replace(Job2Inner::Shared {
receiver: Cell::new(receiver),
})
} {
// SAFETY: `this` is a valid pointer to the job.
unsafe {
SharedJob { SharedJob {
harness: self.harness.take().unwrap(), harness,
this: self.this, this,
sender: parker, sender: mem::transmute(sender), // Convert `Option<Sender<T>>` to `Option<Sender<()>>`
}
}
} else {
panic!("Job2 is already shared");
} }
} }
pub fn is_shared(&self) -> bool { pub fn take_receiver(&self) -> Option<Receiver<T>> {
self.harness.clone().get().is_none() unsafe {
if let Job2Inner::Shared { receiver } = self.inner.as_ref_unchecked() {
receiver.take()
} else {
None
}
}
} }
pub fn from_stackjob<F>(job: &StackJob<F>) -> Self pub fn from_stackjob<F>(job: &StackJob<F>) -> Self
@ -119,15 +139,13 @@ impl<T: Send> Job2<T> {
feature = "tracing", feature = "tracing",
tracing::instrument(level = "trace", skip_all, name = "stack_job_harness") tracing::instrument(level = "trace", skip_all, name = "stack_job_harness")
)] )]
unsafe fn harness<F, T>( unsafe fn harness<F, T>(worker: &WorkerThread, this: NonNull<()>, sender: Option<Sender>)
worker: &WorkerThread, where
this: NonNull<()>,
sender: Option<ReceiverToken>,
) where
F: FnOnce(&WorkerThread) -> T + Send, F: FnOnce(&WorkerThread) -> T + Send,
T: Send, T: Send,
{ {
use std::panic::{AssertUnwindSafe, catch_unwind}; use std::panic::{AssertUnwindSafe, catch_unwind};
let sender: Option<Sender<T>> = unsafe { mem::transmute(sender) };
let f = unsafe { this.cast::<StackJob<F>>().as_ref().unwrap() }; let f = unsafe { this.cast::<StackJob<F>>().as_ref().unwrap() };
@ -142,15 +160,15 @@ impl<T: Send> Job2<T> {
let result = catch_unwind(AssertUnwindSafe(|| f(worker))); let result = catch_unwind(AssertUnwindSafe(|| f(worker)));
if let Some(token) = sender { if let Some(sender) = sender {
worker.context.queue.as_sender().unicast( unsafe {
Message::Finished(werkzeug::util::Send( sender.send_as_ref(result);
// SAFETY: T is guaranteed to be `Sized`, so worker
// `NonNull<T>` is the same size for any `T`. .context
Box::into_non_null(Box::new(result)).cast(), .queue
)), .as_sender()
token, .unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker()));
); }
} }
} }
@ -166,15 +184,13 @@ impl<T: Send> Job2<T> {
feature = "tracing", feature = "tracing",
tracing::instrument(level = "trace", skip_all, name = "heap_job_harness") tracing::instrument(level = "trace", skip_all, name = "heap_job_harness")
)] )]
unsafe fn harness<F, T>( unsafe fn harness<F, T>(worker: &WorkerThread, this: NonNull<()>, sender: Option<Sender>)
worker: &WorkerThread, where
this: NonNull<()>,
sender: Option<ReceiverToken>,
) where
F: FnOnce(&WorkerThread) -> T + Send, F: FnOnce(&WorkerThread) -> T + Send,
T: Send, T: Send,
{ {
use std::panic::{AssertUnwindSafe, catch_unwind}; use std::panic::{AssertUnwindSafe, catch_unwind};
let sender: Option<Sender<T>> = unsafe { mem::transmute(sender) };
// expect MIRI to complain about this, but it is actually correct. // expect MIRI to complain about this, but it is actually correct.
// because I am so much smarter than MIRI, naturally, obviously. // because I am so much smarter than MIRI, naturally, obviously.
@ -182,15 +198,15 @@ impl<T: Send> Job2<T> {
let f = unsafe { (*Box::from_non_null(this.cast::<HeapJob<F>>())).into_inner() }; let f = unsafe { (*Box::from_non_null(this.cast::<HeapJob<F>>())).into_inner() };
let result = catch_unwind(AssertUnwindSafe(|| f(worker))); let result = catch_unwind(AssertUnwindSafe(|| f(worker)));
if let Some(token) = sender { if let Some(sender) = sender {
worker.context.queue.as_sender().unicast( unsafe {
Message::Finished(werkzeug::util::Send( sender.send_as_ref(result);
// SAFETY: T is guaranteed to be `Sized`, so _ = worker
// `NonNull<T>` is the same size for any `T`. .context
Box::into_non_null(Box::new(result)).cast(), .queue
)), .as_sender()
token, .unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker()));
); }
} }
} }

View file

@ -118,16 +118,16 @@ impl WorkerThread {
cold_path(); cold_path();
// if b panicked, we need to wait for a to finish // if b panicked, we need to wait for a to finish
if job.is_shared() { if let Some(recv) = job.take_receiver() {
_ = self.wait_until_recv::<RA>(); _ = self.wait_until_recv(recv);
} }
resume_unwind(payload); resume_unwind(payload);
} }
}; };
let ra = if job.is_shared() { let ra = if let Some(recv) = job.take_receiver() {
crate::util::unwrap_or_panic(self.wait_until_recv()) crate::util::unwrap_or_panic(self.wait_until_recv(recv))
} else { } else {
self.pop_back(); self.pop_back();
@ -171,16 +171,16 @@ impl WorkerThread {
cold_path(); cold_path();
// if b panicked, we need to wait for a to finish // if b panicked, we need to wait for a to finish
if job.is_shared() { if let Some(recv) = job.take_receiver() {
_ = self.wait_until_recv::<RA>(); _ = self.wait_until_recv(recv);
} }
resume_unwind(payload); resume_unwind(payload);
} }
}; };
let ra = if job.is_shared() { let ra = if let Some(recv) = job.take_receiver() {
crate::util::unwrap_or_panic(self.wait_until_recv()) crate::util::unwrap_or_panic(self.wait_until_recv(recv))
} else { } else {
self.pop_back(); self.pop_back();

View file

@ -183,6 +183,23 @@ impl<T> Drop for Slot<T> {
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct ReceiverToken(werkzeug::util::Send<NonNull<u32>>); pub struct ReceiverToken(werkzeug::util::Send<NonNull<u32>>);
impl ReceiverToken {
pub fn as_ptr(&self) -> *mut u32 {
self.0.into_inner().as_ptr()
}
pub unsafe fn as_parker(&self) -> &Parker {
// SAFETY: The pointer is guaranteed to be valid and aligned, as it comes from a pinned Parker.
unsafe { Parker::from_ptr(self.as_ptr()) }
}
pub unsafe fn from_parker(parker: &Parker) -> Self {
// SAFETY: The pointer is guaranteed to be valid and aligned, as it comes from a pinned Parker.
let ptr = NonNull::from(parker).cast::<u32>();
ReceiverToken(werkzeug::util::Send(ptr))
}
}
impl<T> Queue<T> { impl<T> Queue<T> {
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
@ -299,9 +316,12 @@ impl<T: Send> Receiver<T> {
// there was no message for this receiver, so we need to park it // there was no message for this receiver, so we need to park it
queue.receivers.get_mut(&token).unwrap().1 = true; // mark the slot as parked queue.receivers.get_mut(&token).unwrap().1 = true; // mark the slot as parked
// wait for a message to be sent to this receiver self.lock.0.park_with_callback(move || {
// drop the lock guard after having set the lock state to waiting.
// this avoids a deadlock if the sender tries to send a message
// while the receiver is in the process of parking (I think..)
drop(_guard); drop(_guard);
self.lock.0.park(); });
} }
} }
@ -390,7 +410,7 @@ impl<T: Send> Sender<T> {
let _guard = self.queue.lock(); let _guard = self.queue.lock();
let queue = self.queue.inner(); let queue = self.queue.inner();
let Some(CachePadded((slot, is_parked))) = queue.receivers.get_mut(&receiver) else { let Some(CachePadded((slot, _))) = queue.receivers.get_mut(&receiver) else {
return Err(value); return Err(value);
}; };
@ -398,13 +418,10 @@ impl<T: Send> Sender<T> {
slot.push(value); slot.push(value);
} }
// check if the receiver is parked
if *is_parked {
// wake the receiver // wake the receiver
unsafe { unsafe {
Parker::from_ptr(receiver.0.into_inner().as_ptr()).unpark(); Parker::from_ptr(receiver.0.into_inner().as_ptr()).unpark();
} }
}
Ok(()) Ok(())
} }
@ -419,20 +436,17 @@ impl<T: Send> Sender<T> {
let queue = self.queue.inner(); let queue = self.queue.inner();
// send the message to all receivers // send the message to all receivers
for (token, CachePadded((slot, is_parked))) in queue.receivers.iter() { for (token, CachePadded((slot, _))) in queue.receivers.iter() {
// SAFETY: The slot is owned by this receiver. // SAFETY: The slot is owned by this receiver.
unsafe { slot.push(value.clone()) }; unsafe { slot.push(value.clone()) };
// check if the receiver is parked
if *is_parked {
// wake the receiver // wake the receiver
unsafe { unsafe {
Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); Parker::from_ptr(token.0.into_inner().as_ptr()).unpark();
} }
} }
} }
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub fn broadcast_with<F>(&self, mut f: F) pub fn broadcast_with<F>(&self, mut f: F)
@ -444,20 +458,18 @@ impl<T: Send> Sender<T> {
let queue = self.queue.inner(); let queue = self.queue.inner();
// send the message to all receivers // send the message to all receivers
for (token, CachePadded((slot, is_parked))) in queue.receivers.iter() { for (token, CachePadded((slot, _))) in queue.receivers.iter() {
// SAFETY: The slot is owned by this receiver. // SAFETY: The slot is owned by this receiver.
unsafe { slot.push(f()) }; unsafe { slot.push(f()) };
// check if the receiver is parked // check if the receiver is parked
if *is_parked {
// wake the receiver // wake the receiver
unsafe { unsafe {
Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); Parker::from_ptr(token.0.into_inner().as_ptr()).unpark();
} }
} }
} }
}
} }
#[cfg(test)] #[cfg(test)]
@ -597,88 +609,31 @@ mod tests {
"All DropCheck instances should have been dropped" "All DropCheck instances should have been dropped"
); );
} }
#[test]
fn send_self() {
// Test that sending a message to self works
let queue = Queue::<i32>::new();
let sender = queue.new_sender();
let receiver = queue.new_receiver();
sender.unicast(42, receiver.get_token()).unwrap();
assert_eq!(receiver.recv(), 42);
}
#[test]
fn send_self_many() {
// Test that sending multiple messages to self works
let queue = Queue::<i32>::new();
let sender = queue.new_sender();
let receiver = queue.new_receiver();
for i in 0..10 {
sender.unicast(i, receiver.get_token()).unwrap();
}
for i in (0..10).rev() {
assert_eq!(receiver.recv(), i);
}
}
} }
// struct AtomicLIFO<T> {
// cell: AtomicOption<T>,
// next: AtomicPtr<Self>,
// }
// impl<T> AtomicLIFO<T> {
// fn new() -> Self {
// Self {
// cell: AtomicOption::new(),
// next: AtomicPtr::new(ptr::null_mut()),
// }
// }
// fn new_with_value(value: T) -> Self {
// Self {
// cell: AtomicOption::from_option(Some(value)),
// next: AtomicPtr::new(ptr::null_mut()),
// }
// }
// /// inserts a value into the chain, either inserting it into the current
// /// cell, or allocating a new cell to store it in and atomically linking it
// /// to the chain.
// fn push(&self, value: T) {
// match self.cell.swap(Some(value)) {
// Some(old) => {
// // there was previously a value in this cell, so we have to
// // allocate a new cell and link it
// let next = Box::into_raw(Box::new(Self::new_with_value(old)));
// let mut current_next = self.next.load(Ordering::Acquire);
// loop {
// unsafe {
// (&*next).next.store(current_next, Ordering::Relaxed);
// }
// match self.next.compare_exchange_weak(
// current_next,
// next,
// Ordering::Release,
// Ordering::Relaxed,
// ) {
// Ok(_) => {
// // we successfully linked the new cell, so we can return
// return;
// }
// Err(next) => {
// // the next pointer was changed, so we need to try again
// current_next = next;
// }
// }
// }
// }
// None => {}
// }
// }
// fn pop(&self) -> Option<T> {
// // try to take the value from the current cell
// // if there is no value, then we are done, because there will also be no next cell
// let value = self.cell.take()?;
// // try to atomically swap
// }
// fn alloc_next(&self) -> NonNull<Self> {
// let next = Box::into_raw(Box::new(Self::new()));
// match self.next.compare_exchange_weak(
// ptr::null_mut(),
// next,
// Ordering::Release,
// Ordering::Relaxed,
// ) {
// Ok(next) => unsafe { NonNull::new_unchecked(next) },
// Err(other) => {
// // next was allocated under us, so we need to drop the slot we just allocated again.
// _ = unsafe { Box::from_raw(next) };
// unsafe { NonNull::new_unchecked(other) }
// }
// }
// }
// }

View file

@ -212,20 +212,13 @@ impl<'scope, 'env> Scope<'scope, 'env> {
Message::Shared(shared_job) => unsafe { Message::Shared(shared_job) => unsafe {
SharedJob::execute(shared_job, self.worker()); SharedJob::execute(shared_job, self.worker());
}, },
Message::Finished(util::Send(result)) => {
#[cfg(feature = "tracing")]
tracing::error!(
"received result when waiting for jobs to finish: {:p}.",
result
);
}
Message::Exit => {}
Message::ScopeFinished => { Message::ScopeFinished => {
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
tracing::trace!("scope finished, decrementing outstanding jobs."); tracing::trace!("scope finished, decrementing outstanding jobs.");
assert_eq!(self.inner().outstanding_jobs.load(Ordering::Acquire), 0); assert_eq!(self.inner().outstanding_jobs.load(Ordering::Acquire), 0);
break; break;
} }
Message::WakeUp | Message::Exit => {}
} }
} }
} }
@ -266,10 +259,11 @@ impl<'scope, 'env> Scope<'scope, 'env> {
) )
} }
#[align(8)]
unsafe fn harness<'scope, 'env, T>( unsafe fn harness<'scope, 'env, T>(
worker: &WorkerThread, worker: &WorkerThread,
this: NonNull<()>, this: NonNull<()>,
_: Option<ReceiverToken>, _: Option<Sender>,
) where ) where
F: FnOnce(Scope<'scope, 'env>) -> T + Send, F: FnOnce(Scope<'scope, 'env>) -> T + Send,
'env: 'scope, 'env: 'scope,
@ -341,7 +335,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
let schedule = move |runnable: Runnable| { let schedule = move |runnable: Runnable| {
#[align(8)] #[align(8)]
unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option<ReceiverToken>) { unsafe fn harness(_: &WorkerThread, this: NonNull<()>, _: Option<Sender>) {
unsafe { unsafe {
let runnable = Runnable::<()>::from_raw(this.cast()); let runnable = Runnable::<()>::from_raw(this.cast());
runnable.run(); runnable.run();
@ -406,26 +400,32 @@ impl<'scope, 'env> Scope<'scope, 'env> {
unsafe { ManuallyDrop::take(&mut *self.f.get()) } unsafe { ManuallyDrop::take(&mut *self.f.get()) }
} }
#[align(8)]
unsafe fn harness<'scope, 'env, T>( unsafe fn harness<'scope, 'env, T>(
worker: &WorkerThread, worker: &WorkerThread,
this: NonNull<()>, this: NonNull<()>,
sender: Option<ReceiverToken>, sender: Option<Sender>,
) where ) where
F: FnOnce(Scope<'scope, 'env>) -> T + Send, F: FnOnce(Scope<'scope, 'env>) -> T + Send,
'env: 'scope, 'env: 'scope,
T: Send, T: Send,
{ {
let this: &ScopeJob<F> = unsafe { this.cast().as_ref() }; let this: &ScopeJob<F> = unsafe { this.cast().as_ref() };
let sender: Option<Sender<T>> = unsafe { mem::transmute(sender) };
let f = unsafe { this.unwrap() }; let f = unsafe { this.unwrap() };
let scope = unsafe { Scope::<'scope, 'env>::new_unchecked(worker, this.inner) }; let scope = unsafe { Scope::<'scope, 'env>::new_unchecked(worker, this.inner) };
_ = worker.context.queue.as_sender().unicast( let result = catch_unwind(AssertUnwindSafe(|| f(scope)));
Message::Finished(werkzeug::util::Send(
Box::into_non_null(Box::new(catch_unwind(AssertUnwindSafe(|| f(scope))))) let sender = sender.unwrap();
.cast(), unsafe {
)), sender.send_as_ref(result);
sender.unwrap(), worker
); .context
.queue
.as_sender()
.unicast(Message::WakeUp, ReceiverToken::from_parker(sender.parker()));
}
} }
} }
@ -451,7 +451,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
} }
} }
let mut _pinned = ScopeJob::new(a, self.inner); let _pinned = ScopeJob::new(a, self.inner);
let job = unsafe { Pin::new_unchecked(&_pinned) }; let job = unsafe { Pin::new_unchecked(&_pinned) };
let (a, b) = worker.join_heartbeat2(job, |_| b(*self)); let (a, b) = worker.join_heartbeat2(job, |_| b(*self));

View file

@ -95,15 +95,8 @@ impl WorkerThread {
Message::Shared(shared_job) => { Message::Shared(shared_job) => {
self.execute(shared_job); self.execute(shared_job);
} }
Message::Finished(werkzeug::util::Send(ptr)) => {
#[cfg(feature = "tracing")]
tracing::error!(
"WorkerThread::run_inner: received finished message: {:?}",
ptr
);
}
Message::Exit => break, Message::Exit => break,
Message::ScopeFinished => {} Message::WakeUp | Message::ScopeFinished => {}
} }
} }
} }
@ -149,7 +142,8 @@ impl WorkerThread {
.queue .queue
.as_sender() .as_sender()
.try_anycast(Message::Shared(unsafe { .try_anycast(Message::Shared(unsafe {
job.as_ref().share(Some(self.receiver.get_token())) job.as_ref()
.share(Some(self.receiver.get_token().as_parker()))
})) }))
{ {
unsafe { unsafe {
@ -270,30 +264,18 @@ impl HeartbeatThread {
} }
impl WorkerThread { impl WorkerThread {
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub fn wait_until_shared_job<T: Send>(&self, job: &Job<T>) -> std::thread::Result<T> { pub fn wait_until_recv<T: Send>(&self, recv: Receiver<T>) -> std::thread::Result<T> {
loop { loop {
match self.receiver.recv() { if let Some(result) = recv.poll() {
Message::Shared(shared_job) => unsafe { break result;
SharedJob::execute(shared_job, self);
},
Message::Finished(send) => {
break unsafe { *Box::from_non_null(send.0.cast()) };
}
Message::Exit | Message::ScopeFinished => {}
}
}
} }
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub fn wait_until_recv<T: Send>(&self) -> std::thread::Result<T> {
loop {
match self.receiver.recv() { match self.receiver.recv() {
Message::Shared(shared_job) => unsafe { Message::Shared(shared_job) => unsafe {
SharedJob::execute(shared_job, self); SharedJob::execute(shared_job, self);
}, },
Message::Finished(send) => break unsafe { *Box::from_non_null(send.0.cast()) }, Message::WakeUp | Message::Exit | Message::ScopeFinished => {}
Message::Exit | Message::ScopeFinished => {}
} }
} }
} }

View file

@ -87,6 +87,7 @@ fn join_distaff(tree_size: usize) {
let sum = sum(&tree, tree.root().unwrap(), s); let sum = sum(&tree, tree.root().unwrap(), s);
sum sum
}); });
eprintln!("sum: {sum}");
std::hint::black_box(sum); std::hint::black_box(sum);
} }
} }
@ -134,7 +135,7 @@ fn join_rayon(tree_size: usize) {
} }
fn main() { fn main() {
//tracing_subscriber::fmt::init(); // tracing_subscriber::fmt::init();
use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::layer::SubscriberExt;
tracing::subscriber::set_global_default( tracing::subscriber::set_global_default(
tracing_subscriber::registry().with(tracing_tracy::TracyLayer::default()), tracing_subscriber::registry().with(tracing_tracy::TracyLayer::default()),