use parker instead of lock for queue receiver

This commit is contained in:
Janis 2025-07-04 18:46:27 +02:00
parent 7c6e338b77
commit 268879d97e
4 changed files with 12 additions and 12 deletions

View file

@ -218,6 +218,7 @@ impl Context {
// touch the job to ensure it is dropped after we are done with it.
drop(_pinned);
drop(parker);
out
}

View file

@ -1,6 +1,6 @@
use std::{
cell::UnsafeCell,
collections::{HashMap, HashSet},
collections::HashMap,
marker::{PhantomData, PhantomPinned},
mem::{self, MaybeUninit},
pin::Pin,
@ -12,6 +12,7 @@ use std::{
};
use werkzeug::CachePadded;
use werkzeug::sync::Parker;
use werkzeug::ptr::TaggedAtomicPtr;
@ -39,7 +40,7 @@ enum SlotKey {
pub struct Receiver<T> {
queue: Arc<Queue<T>>,
lock: Pin<Box<(AtomicU32, PhantomPinned)>>,
lock: Pin<Box<(Parker, PhantomPinned)>>,
}
#[repr(transparent)]
@ -212,7 +213,7 @@ impl<T> Queue<T> {
pub fn new_receiver(self: &Arc<Self>) -> Receiver<T> {
let recv = Receiver {
queue: self.clone(),
lock: Box::pin((AtomicU32::new(0), PhantomPinned)),
lock: Box::pin((Parker::new(), PhantomPinned)),
};
// allocate slot for the receiver
@ -300,10 +301,7 @@ impl<T: Send> Receiver<T> {
// wait for a message to be sent to this receiver
drop(_guard);
unsafe {
let lock = werkzeug::sync::Lock::from_ptr(token.0.into_inner().as_ptr());
lock.wait();
}
self.lock.0.park();
}
}
@ -376,7 +374,7 @@ impl<T: Send> Sender<T> {
slot.value.as_mut_unchecked().write(value);
slot.next_and_state
.set_tag(1, Ordering::Release, Ordering::Relaxed);
werkzeug::sync::Lock::from_ptr(token.0.into_inner().as_ptr()).wake_one();
Parker::from_ptr(token.0.into_inner().as_ptr()).unpark();
}
return Ok(());
@ -404,7 +402,7 @@ impl<T: Send> Sender<T> {
if *is_parked {
// wake the receiver
unsafe {
werkzeug::sync::Lock::from_ptr(receiver.0.into_inner().as_ptr()).wake_one();
Parker::from_ptr(receiver.0.into_inner().as_ptr()).unpark();
}
}
@ -430,7 +428,7 @@ impl<T: Send> Sender<T> {
if *is_parked {
// wake the receiver
unsafe {
werkzeug::sync::Lock::from_ptr(token.0.into_inner().as_ptr()).wake_one();
Parker::from_ptr(token.0.into_inner().as_ptr()).unpark();
}
}
}
@ -455,7 +453,7 @@ impl<T: Send> Sender<T> {
if *is_parked {
// wake the receiver
unsafe {
werkzeug::sync::Lock::from_ptr(token.0.into_inner().as_ptr()).wake_one();
Parker::from_ptr(token.0.into_inner().as_ptr()).unpark();
}
}
}

View file

@ -454,7 +454,7 @@ impl<'scope, 'env> Scope<'scope, 'env> {
let mut _pinned = ScopeJob::new(a, self.inner);
let job = unsafe { Pin::new_unchecked(&_pinned) };
let (a, b) = worker.join_heartbeat2_every::<_, _, _, _, 64>(job, |_| b(*self));
let (a, b) = worker.join_heartbeat2(job, |_| b(*self));
// touch job here to ensure it is not dropped before we run the join.
drop(_pinned);

View file

@ -166,6 +166,7 @@ fn main() {
}
eprintln!("Done!");
println!("Done!");
// // wait for user input before exiting
// std::io::stdin().read_line(&mut String::new()).unwrap();
}