Queue stuff

This commit is contained in:
Janis 2025-07-04 13:30:47 +02:00
parent d1244026ca
commit b635ea5579
2 changed files with 288 additions and 46 deletions

View file

@ -69,6 +69,8 @@ pub struct SharedJob {
sender: Option<crate::channel::Sender>,
}
unsafe impl Send for SharedJob {}
impl<T: Send> Job2<T> {
fn new(harness: JobHarness, this: NonNull<()>) -> Self {
let this = Self {

View file

@ -4,14 +4,17 @@ use std::{
marker::{PhantomData, PhantomPinned},
mem::{self, MaybeUninit},
pin::Pin,
ptr::{self, NonNull},
sync::{
Arc,
atomic::{AtomicU8, AtomicU32, Ordering},
atomic::{AtomicU32, Ordering},
},
};
use crossbeam_utils::CachePadded;
use werkzeug::ptr::TaggedAtomicPtr;
// A Queue with multiple receivers and multiple producers, where a producer can send a message to one of any of the receivers (any-cast), or one of the receivers (uni-cast).
// After being woken up from waiting on a message, the receiver will look up the index of the message in the queue and return it.
@ -22,11 +25,14 @@ struct QueueInner<T> {
_phantom: std::marker::PhantomData<T>,
}
struct Queue<T> {
pub struct Queue<T> {
inner: UnsafeCell<QueueInner<T>>,
lock: AtomicU32,
}
unsafe impl<T> Send for Queue<T> {}
unsafe impl<T> Sync for Queue<T> where T: Send {}
enum SlotKey {
Owned(ReceiverToken),
Indexed(usize),
@ -37,32 +43,120 @@ struct Receiver<T> {
lock: Pin<Box<(AtomicU32, PhantomPinned)>>,
}
#[repr(transparent)]
struct Sender<T> {
queue: Arc<Queue<T>>,
}
// TODO: make this a linked list of slots so we can queue multiple messages for
// a single receiver
const SLOT_ALIGN: u8 = core::mem::align_of::<usize>().ilog2() as u8;
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
state: AtomicU8,
next_and_state: TaggedAtomicPtr<Self, SLOT_ALIGN>,
_phantom: PhantomData<Self>,
}
impl<T> Slot<T> {
fn new() -> Self {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicU8::new(0), // 0 means empty
next_and_state: TaggedAtomicPtr::new(ptr::null_mut(), 0), // 0 means empty
_phantom: PhantomData,
}
}
fn set(&self, value: T) {}
fn is_set(&self) -> bool {
self.next_and_state.tag(Ordering::Acquire) == 1
}
unsafe fn pop(&self) -> Option<T> {
NonNull::new(self.next_and_state.ptr(Ordering::Acquire))
.and_then(|next| {
// SAFETY: The next slot is a valid pointer to a Slot<T> that was allocated by us.
unsafe { next.as_ref().pop() }
})
.or_else(|| {
if self
.next_and_state
.swap_tag(0, Ordering::Acquire, Ordering::Relaxed)
== 1
{
// SAFETY: The value is only initialized when the state is set to 1.
Some(unsafe { self.value.as_ref_unchecked().assume_init_read() })
} else {
None
}
})
}
/// the caller must ensure that they have exclusive access to the slot
unsafe fn push(&self, value: T) {
if self.is_set() {
let next = self.next_ptr();
unsafe {
(next.as_ref()).push(value);
}
} else {
// SAFETY: The value is only initialized when the state is set to 1.
unsafe { self.value.as_mut_unchecked().write(value) };
self.next_and_state
.set_tag(1, Ordering::Release, Ordering::Relaxed);
}
}
fn next_ptr(&self) -> NonNull<Slot<T>> {
if let Some(next) = NonNull::new(self.next_and_state.ptr(Ordering::Acquire)) {
next.cast()
} else {
self.alloc_next()
}
}
fn alloc_next(&self) -> NonNull<Slot<T>> {
let next = Box::into_raw(Box::new(Slot::new()));
let next = loop {
match self.next_and_state.compare_exchange_weak_ptr(
ptr::null_mut(),
next,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break next,
Err(other) => {
// next was allocated under us, so we need to drop the slot we just allocated again.
_ = unsafe { Box::from_raw(next) };
break other;
}
}
};
unsafe {
// SAFETY: The next slot is a valid pointer to a Slot<T> that was allocated by us.
NonNull::new_unchecked(next)
}
}
}
impl<T> Drop for Slot<T> {
fn drop(&mut self) {
// drop next chain
if let Some(next) = NonNull::new(self.next_and_state.swap_ptr(
ptr::null_mut(),
Ordering::Release,
Ordering::Relaxed,
)) {
// SAFETY: The next slot is a valid pointer to a Slot<T> that was allocated by us.
// We drop this in place because idk..
unsafe {
next.drop_in_place();
_ = Box::<mem::ManuallyDrop<Self>>::from_non_null(next.cast());
}
}
// SAFETY: The value is only initialized when the state is set to 1.
if mem::needs_drop::<T>() && self.state.load(Ordering::Acquire) == 1 {
if mem::needs_drop::<T>() && self.next_and_state.tag(Ordering::Acquire) == 1 {
unsafe { self.value.as_mut_unchecked().assume_init_drop() };
}
}
@ -80,8 +174,8 @@ impl<T> Drop for Slot<T> {
pub struct ReceiverToken(werkzeug::util::Send<*const u32>);
impl<T> Queue<T> {
pub fn new() -> Self {
Self {
pub fn new() -> Arc<Self> {
Arc::new(Self {
inner: UnsafeCell::new(QueueInner {
parked: HashSet::new(),
messages: Vec::new(),
@ -89,7 +183,7 @@ impl<T> Queue<T> {
_phantom: PhantomData,
}),
lock: AtomicU32::new(0),
}
})
}
pub fn new_sender(self: &Arc<Self>) -> Sender<T> {
@ -98,6 +192,10 @@ impl<T> Queue<T> {
}
}
pub fn as_sender(self: &Arc<Self>) -> &Sender<T> {
unsafe { mem::transmute::<&Arc<Self>, &Sender<T>>(self) }
}
pub fn new_receiver(self: &Arc<Self>) -> Receiver<T> {
let recv = Receiver {
queue: self.clone(),
@ -107,13 +205,10 @@ impl<T> Queue<T> {
// allocate slot for the receiver
let token = recv.get_token();
let _guard = recv.queue.lock();
recv.queue.inner().owned.insert(
token,
CachePadded::new(Slot {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicU8::new(0), // 0 means empty
}),
);
recv.queue
.inner()
.owned
.insert(token, CachePadded::new(Slot::new()));
drop(_guard);
recv
@ -137,14 +232,11 @@ impl<T> QueueInner<T> {
fn poll(&mut self, token: ReceiverToken) -> Option<T> {
// check if someone has sent a message to this receiver
let slot = self.owned.get(&token).unwrap();
if slot.state.swap(0, Ordering::Acquire) == 1 {
// SAFETY: the slot is owned by this receiver and contains a message.
return Some(unsafe { slot.value.as_ref_unchecked().assume_init_read() });
} else if let Some(t) = self.messages.pop() {
return Some(t);
} else {
None
}
unsafe { slot.pop() }.or_else(|| {
// if the slot is empty, we can check the indexed messages
self.messages.pop()
})
}
}
@ -220,18 +312,16 @@ impl<T: Send> Sender<T> {
let queue = self.queue.inner();
if let Some((token, slot)) = queue.parked.iter().find_map(|token| {
// ensure the slot is available
queue.owned.get(token).and_then(|s| {
if s.state.load(Ordering::Acquire) == 0 {
Some((*token, s))
} else {
None
}
})
queue
.owned
.get(token)
.and_then(|s| if !s.is_set() { Some((*token, s)) } else { None })
}) {
// we found a receiver that is parked, so we can send the message to it
unsafe {
slot.value.as_mut_unchecked().write(value);
slot.state.store(1, Ordering::Release);
slot.next_and_state
.set_tag(1, Ordering::Release, Ordering::Relaxed);
werkzeug::sync::Lock::from_ptr(token.0.into_inner().cast_mut()).wake_one();
}
@ -254,9 +344,10 @@ impl<T: Send> Sender<T> {
let Some(slot) = queue.owned.get_mut(&receiver) else {
return Err(value);
};
// SAFETY: The slot is owned by this receiver.
unsafe { slot.value.as_mut_unchecked().write(value) };
slot.state.store(1, Ordering::Release);
unsafe {
slot.push(value);
}
// check if the receiver is parked
if queue.parked.contains(&receiver) {
@ -281,15 +372,7 @@ impl<T: Send> Sender<T> {
for (token, slot) in queue.owned.iter() {
// SAFETY: The slot is owned by this receiver.
if slot.state.load(Ordering::Acquire) != 0 {
// the slot is not available, so we skip it
continue;
}
unsafe {
slot.value.as_mut_unchecked().write(value.clone());
}
slot.state.store(1, Ordering::Release);
unsafe { slot.push(value.clone()) };
// check if the receiver is parked
if queue.parked.contains(token) {
@ -308,13 +391,12 @@ mod tests {
#[test]
fn test_queue() {
let queue = Arc::new(Queue::<i32>::new());
let queue = Queue::<i32>::new();
let sender = queue.new_sender();
let receiver1 = queue.new_receiver();
let receiver2 = queue.new_receiver();
let token1 = receiver1.get_token();
let token2 = receiver2.get_token();
sender.anycast(42);
@ -325,4 +407,162 @@ mod tests {
assert_eq!(receiver1.try_recv(), None);
assert_eq!(receiver2.recv(), 100);
}
#[test]
fn queue_broadcast() {
let queue = Queue::<i32>::new();
let sender = queue.new_sender();
let receiver1 = queue.new_receiver();
let receiver2 = queue.new_receiver();
sender.broadcast(42);
assert_eq!(receiver1.recv(), 42);
assert_eq!(receiver2.recv(), 42);
}
#[test]
fn queue_multiple_messages() {
let queue = Queue::<i32>::new();
let sender = queue.new_sender();
let receiver = queue.new_receiver();
sender.anycast(1);
sender.unicast(2, receiver.get_token()).unwrap();
assert_eq!(receiver.recv(), 2);
assert_eq!(receiver.recv(), 1);
}
#[test]
fn queue_threaded() {
#[derive(Debug, Clone, Copy)]
enum Message {
Send(i32),
Exit,
}
let queue = Queue::<Message>::new();
let sender = queue.new_sender();
let threads = (0..5)
.map(|_| {
let queue_clone = queue.clone();
let receiver = queue_clone.new_receiver();
std::thread::spawn(move || {
loop {
match receiver.recv() {
Message::Send(value) => {
println!("Receiver {:?} Received: {}", receiver.get_token(), value);
}
Message::Exit => {
println!("Exiting thread");
break;
}
}
}
})
})
.collect::<Vec<_>>();
// Send messages to the receivers
for i in 0..10 {
sender.anycast(Message::Send(i));
}
// Send exit messages to all receivers
sender.broadcast(Message::Exit);
for thread in threads {
thread.join().unwrap();
}
println!("All threads have exited.");
}
}
// struct AtomicLIFO<T> {
// cell: AtomicOption<T>,
// next: AtomicPtr<Self>,
// }
// impl<T> AtomicLIFO<T> {
// fn new() -> Self {
// Self {
// cell: AtomicOption::new(),
// next: AtomicPtr::new(ptr::null_mut()),
// }
// }
// fn new_with_value(value: T) -> Self {
// Self {
// cell: AtomicOption::from_option(Some(value)),
// next: AtomicPtr::new(ptr::null_mut()),
// }
// }
// /// inserts a value into the chain, either inserting it into the current
// /// cell, or allocating a new cell to store it in and atomically linking it
// /// to the chain.
// fn push(&self, value: T) {
// match self.cell.swap(Some(value)) {
// Some(old) => {
// // there was previously a value in this cell, so we have to
// // allocate a new cell and link it
// let next = Box::into_raw(Box::new(Self::new_with_value(old)));
// let mut current_next = self.next.load(Ordering::Acquire);
// loop {
// unsafe {
// (&*next).next.store(current_next, Ordering::Relaxed);
// }
// match self.next.compare_exchange_weak(
// current_next,
// next,
// Ordering::Release,
// Ordering::Relaxed,
// ) {
// Ok(_) => {
// // we successfully linked the new cell, so we can return
// return;
// }
// Err(next) => {
// // the next pointer was changed, so we need to try again
// current_next = next;
// }
// }
// }
// }
// None => {}
// }
// }
// fn pop(&self) -> Option<T> {
// // try to take the value from the current cell
// // if there is no value, then we are done, because there will also be no next cell
// let value = self.cell.take()?;
// // try to atomically swap
// }
// fn alloc_next(&self) -> NonNull<Self> {
// let next = Box::into_raw(Box::new(Self::new()));
// match self.next.compare_exchange_weak(
// ptr::null_mut(),
// next,
// Ordering::Release,
// Ordering::Relaxed,
// ) {
// Ok(next) => unsafe { NonNull::new_unchecked(next) },
// Err(other) => {
// // next was allocated under us, so we need to drop the slot we just allocated again.
// _ = unsafe { Box::from_raw(next) };
// unsafe { NonNull::new_unchecked(other) }
// }
// }
// }
// }