sync channel inspired by chili

This commit is contained in:
Janis 2025-07-06 14:26:51 +02:00
parent 5a3b6447f3
commit 568d14aa9c

View file

@ -278,3 +278,133 @@ impl Parker {
}
}
}
#[cfg(feature = "alloc")]
pub mod channel {
use alloc::sync::Arc;
use core::{
cell::{Cell, UnsafeCell},
marker::PhantomData,
mem::MaybeUninit,
sync::atomic::{AtomicU32, Ordering},
};
#[repr(C)]
#[derive(Debug)]
struct Channel<T = ()> {
state: AtomicU32,
val: UnsafeCell<MaybeUninit<T>>,
}
unsafe impl<T: Send> Send for Channel<T> {}
unsafe impl<T: Send> Sync for Channel<T> {}
impl<T> Channel<T> {
const OCCUPIED_BIT: u32 = 0b01;
const WAITING_BIT: u32 = 0b10;
fn new() -> Self {
Self {
state: AtomicU32::new(0),
val: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let channel = Arc::new(Channel::<T>::new());
let receiver = Receiver(channel.clone(), PhantomData);
let sender = Sender(channel);
(sender, receiver)
}
#[derive(Debug)]
#[repr(transparent)]
// `PhantomData<Cell<()>>` is used to ensure that `Receiver` is `!Sync` but `Send`.
pub struct Receiver<T = ()>(Arc<Channel<T>>, PhantomData<Cell<()>>);
#[derive(Debug)]
#[repr(transparent)]
pub struct Sender<T = ()>(Arc<Channel<T>>);
impl<T> Receiver<T> {
pub fn is_empty(&self) -> bool {
self.0.state.load(Ordering::Acquire) & Channel::<T>::OCCUPIED_BIT == 0
}
pub fn as_sender(self) -> Sender<T> {
Sender(self.0.clone())
}
fn wait(&mut self) {
loop {
let state = self
.0
.state
.fetch_or(Channel::<T>::WAITING_BIT, Ordering::Acquire);
if state & Channel::<T>::OCCUPIED_BIT == 0 {
// The channel is empty, so we need to wait for a value to be sent.
// We will block until the sender wakes us up.
atomic_wait::wait(&self.0.state, Channel::<T>::WAITING_BIT);
} else {
// The channel is occupied, so we can return.
self.0
.state
.fetch_and(!Channel::<T>::WAITING_BIT, Ordering::Release);
break;
}
}
}
/// Takes the value from the channel, if it is present.
fn take(&mut self) -> Option<T> {
// unset the OCCUPIED_BIT to indicate that we are taking the value, if any is present.
if self
.0
.state
.fetch_and(!Channel::<T>::OCCUPIED_BIT, Ordering::Acquire)
& Channel::<T>::OCCUPIED_BIT
== 0
{
// The channel was empty, so we return None.
None
} else {
unsafe { Some(self.0.val.get().read().assume_init_read()) }
}
}
pub fn recv(mut self) -> T {
loop {
if let Some(t) = self.take() {
return t;
}
self.wait();
}
}
}
impl<T> Sender<T> {
pub fn send(self, value: T) {
unsafe {
self.0.val.get().write(MaybeUninit::new(value));
}
// Set the OCCUPIED_BIT to indicate that a value is present.
let state = self
.0
.state
.fetch_or(Channel::<T>::OCCUPIED_BIT, Ordering::Release);
assert!(
state & Channel::<T>::OCCUPIED_BIT == 0,
"Channel is already occupied"
);
// If there are any receivers waiting, we need to wake them up.
if state & Channel::<T>::WAITING_BIT != 0 {
// There are receivers waiting, so we need to wake them up.
atomic_wait::wake_all(&self.0.state);
}
}
}
}