Compare commits
2 commits
4c70dbfc71
...
568d14aa9c
Author | SHA1 | Date | |
---|---|---|---|
|
568d14aa9c | ||
|
5a3b6447f3 |
153
src/sync.rs
153
src/sync.rs
|
@ -3,9 +3,6 @@ use core::{
|
||||||
sync::atomic::{AtomicU32, Ordering},
|
sync::atomic::{AtomicU32, Ordering},
|
||||||
};
|
};
|
||||||
|
|
||||||
const LOCKED_BIT: u32 = 0b001;
|
|
||||||
const EMPTY: u32 = 0;
|
|
||||||
|
|
||||||
/// A simple lock implementation using an atomic u32.
|
/// A simple lock implementation using an atomic u32.
|
||||||
#[repr(transparent)]
|
#[repr(transparent)]
|
||||||
pub struct Lock {
|
pub struct Lock {
|
||||||
|
@ -13,6 +10,9 @@ pub struct Lock {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Lock {
|
impl Lock {
|
||||||
|
const LOCKED: u32 = 0b001;
|
||||||
|
const EMPTY: u32 = 0;
|
||||||
|
|
||||||
/// Creates a new lock in the unlocked state.
|
/// Creates a new lock in the unlocked state.
|
||||||
pub const fn new() -> Self {
|
pub const fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -35,7 +35,12 @@ impl Lock {
|
||||||
// attempt acquiring the lock with no contention.
|
// attempt acquiring the lock with no contention.
|
||||||
if self
|
if self
|
||||||
.inner
|
.inner
|
||||||
.compare_exchange_weak(EMPTY, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
|
.compare_exchange_weak(
|
||||||
|
Self::EMPTY,
|
||||||
|
Self::LOCKED,
|
||||||
|
Ordering::Acquire,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
)
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
// We successfully acquired the lock.
|
// We successfully acquired the lock.
|
||||||
|
@ -48,7 +53,7 @@ impl Lock {
|
||||||
pub fn unlock(&self) {
|
pub fn unlock(&self) {
|
||||||
// use release semantics to ensure that all previous writes are
|
// use release semantics to ensure that all previous writes are
|
||||||
// available to other threads.
|
// available to other threads.
|
||||||
self.inner.fetch_and(!LOCKED_BIT, Ordering::Release);
|
self.inner.fetch_and(!Self::LOCKED, Ordering::Release);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lock_slow(&self) {
|
fn lock_slow(&self) {
|
||||||
|
@ -58,11 +63,11 @@ impl Lock {
|
||||||
let mut state = self.inner.load(Ordering::Acquire);
|
let mut state = self.inner.load(Ordering::Acquire);
|
||||||
loop {
|
loop {
|
||||||
// If the lock isn't locked, we can try to acquire it.
|
// If the lock isn't locked, we can try to acquire it.
|
||||||
if state & LOCKED_BIT == 0 {
|
if state & Self::LOCKED == 0 {
|
||||||
// Try to acquire the lock.
|
// Try to acquire the lock.
|
||||||
match self.inner.compare_exchange_weak(
|
match self.inner.compare_exchange_weak(
|
||||||
state,
|
state,
|
||||||
state | LOCKED_BIT,
|
state | Self::LOCKED,
|
||||||
Ordering::Acquire,
|
Ordering::Acquire,
|
||||||
Ordering::Relaxed,
|
Ordering::Relaxed,
|
||||||
) {
|
) {
|
||||||
|
@ -97,13 +102,13 @@ impl Lock {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we reach here, we need to park the thread.
|
// If we reach here, we need to park the thread.
|
||||||
atomic_wait::wait(&self.inner, LOCKED_BIT);
|
atomic_wait::wait(&self.inner, Self::LOCKED);
|
||||||
|
|
||||||
if self
|
if self
|
||||||
.inner
|
.inner
|
||||||
.compare_exchange_weak(
|
.compare_exchange_weak(
|
||||||
state,
|
state,
|
||||||
state | LOCKED_BIT,
|
state | Self::LOCKED,
|
||||||
Ordering::Acquire,
|
Ordering::Acquire,
|
||||||
Ordering::Relaxed,
|
Ordering::Relaxed,
|
||||||
)
|
)
|
||||||
|
@ -273,3 +278,133 @@ impl Parker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "alloc")]
|
||||||
|
pub mod channel {
|
||||||
|
use alloc::sync::Arc;
|
||||||
|
use core::{
|
||||||
|
cell::{Cell, UnsafeCell},
|
||||||
|
marker::PhantomData,
|
||||||
|
mem::MaybeUninit,
|
||||||
|
sync::atomic::{AtomicU32, Ordering},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Channel<T = ()> {
|
||||||
|
state: AtomicU32,
|
||||||
|
val: UnsafeCell<MaybeUninit<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<T: Send> Send for Channel<T> {}
|
||||||
|
unsafe impl<T: Send> Sync for Channel<T> {}
|
||||||
|
|
||||||
|
impl<T> Channel<T> {
|
||||||
|
const OCCUPIED_BIT: u32 = 0b01;
|
||||||
|
const WAITING_BIT: u32 = 0b10;
|
||||||
|
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
state: AtomicU32::new(0),
|
||||||
|
val: UnsafeCell::new(MaybeUninit::uninit()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||||
|
let channel = Arc::new(Channel::<T>::new());
|
||||||
|
let receiver = Receiver(channel.clone(), PhantomData);
|
||||||
|
let sender = Sender(channel);
|
||||||
|
(sender, receiver)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[repr(transparent)]
|
||||||
|
// `PhantomData<Cell<()>>` is used to ensure that `Receiver` is `!Sync` but `Send`.
|
||||||
|
pub struct Receiver<T = ()>(Arc<Channel<T>>, PhantomData<Cell<()>>);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct Sender<T = ()>(Arc<Channel<T>>);
|
||||||
|
|
||||||
|
impl<T> Receiver<T> {
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.0.state.load(Ordering::Acquire) & Channel::<T>::OCCUPIED_BIT == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn as_sender(self) -> Sender<T> {
|
||||||
|
Sender(self.0.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wait(&mut self) {
|
||||||
|
loop {
|
||||||
|
let state = self
|
||||||
|
.0
|
||||||
|
.state
|
||||||
|
.fetch_or(Channel::<T>::WAITING_BIT, Ordering::Acquire);
|
||||||
|
if state & Channel::<T>::OCCUPIED_BIT == 0 {
|
||||||
|
// The channel is empty, so we need to wait for a value to be sent.
|
||||||
|
// We will block until the sender wakes us up.
|
||||||
|
atomic_wait::wait(&self.0.state, Channel::<T>::WAITING_BIT);
|
||||||
|
} else {
|
||||||
|
// The channel is occupied, so we can return.
|
||||||
|
self.0
|
||||||
|
.state
|
||||||
|
.fetch_and(!Channel::<T>::WAITING_BIT, Ordering::Release);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes the value from the channel, if it is present.
|
||||||
|
fn take(&mut self) -> Option<T> {
|
||||||
|
// unset the OCCUPIED_BIT to indicate that we are taking the value, if any is present.
|
||||||
|
if self
|
||||||
|
.0
|
||||||
|
.state
|
||||||
|
.fetch_and(!Channel::<T>::OCCUPIED_BIT, Ordering::Acquire)
|
||||||
|
& Channel::<T>::OCCUPIED_BIT
|
||||||
|
== 0
|
||||||
|
{
|
||||||
|
// The channel was empty, so we return None.
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
unsafe { Some(self.0.val.get().read().assume_init_read()) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn recv(mut self) -> T {
|
||||||
|
loop {
|
||||||
|
if let Some(t) = self.take() {
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Sender<T> {
|
||||||
|
pub fn send(self, value: T) {
|
||||||
|
unsafe {
|
||||||
|
self.0.val.get().write(MaybeUninit::new(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the OCCUPIED_BIT to indicate that a value is present.
|
||||||
|
let state = self
|
||||||
|
.0
|
||||||
|
.state
|
||||||
|
.fetch_or(Channel::<T>::OCCUPIED_BIT, Ordering::Release);
|
||||||
|
assert!(
|
||||||
|
state & Channel::<T>::OCCUPIED_BIT == 0,
|
||||||
|
"Channel is already occupied"
|
||||||
|
);
|
||||||
|
|
||||||
|
// If there are any receivers waiting, we need to wake them up.
|
||||||
|
if state & Channel::<T>::WAITING_BIT != 0 {
|
||||||
|
// There are receivers waiting, so we need to wake them up.
|
||||||
|
atomic_wait::wake_all(&self.0.state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue