From 9e3fa2cdb0a634151cd8b121cac44b8cc4429a4a Mon Sep 17 00:00:00 2001 From: Janis Date: Wed, 6 Aug 2025 22:51:27 +0200 Subject: [PATCH] u32,u64,u16 conversion functions, sync queue, random utilities - `NextIf` iterator trait that yields the next element iff some predicate holds - `u64_from_u32s`, `u32s_from_u64`, `u32_from_u16s`, `u16s_from_u32` functions - moved `can_transmute` to `mem` module, added `is_same_size` and `is_aligned` functions - `copy_from` function for `TaggedAtomicPtr` - attempt at a sync queue? - `is_whitespace` function --- Cargo.lock | 30 +++ Cargo.toml | 5 +- src/bytes.rs | 56 +++++ src/iter.rs | 44 ++++ src/lib.rs | 5 +- src/mem.rs | 22 ++ src/ptr.rs | 12 + src/sync.rs | 671 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/util.rs | 41 +++- 9 files changed, 877 insertions(+), 9 deletions(-) create mode 100644 src/bytes.rs create mode 100644 src/iter.rs create mode 100644 src/mem.rs diff --git a/Cargo.lock b/Cargo.lock index 9d1a574..36af936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "atomic-wait" version = "1.1.0" @@ -12,6 +18,29 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] +name = "hashbrown" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "libc" version = "0.2.174" @@ -23,6 +52,7 @@ name = "werkzeug" version = "0.1.0" dependencies = [ "atomic-wait", + "hashbrown", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b9adc3b..82f95d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [features] default = ["alloc"] -alloc = [] +alloc = ["dep:hashbrown"] std = ["alloc"] transposed-option = ["nightly"] nightly = [] @@ -15,4 +15,5 @@ nightly = [] # While I could use libc / windows for this, why not just use this tiny crate # which does exactly and only a futex -atomic-wait = "1.1.0" \ No newline at end of file +atomic-wait = "1.1.0" +hashbrown = {version = "0.15", optional = true} \ No newline at end of file diff --git a/src/bytes.rs b/src/bytes.rs new file mode 100644 index 0000000..a7251a3 --- /dev/null +++ b/src/bytes.rs @@ -0,0 +1,56 @@ +//! Collection of utilities for working with primitive integral types in Rust, and converting between them. + +/// interprets an array of two `u32`s as a `u64`. +/// Importantly, this does not account for endianness. +/// This is the inverse of `u32s_from_u64`. +pub fn u64_from_u32s(array: [u32; 2]) -> u64 { + // SAFETY: `out` and `array` are guaranteed not to overlap, we assert that + // we can transmute between the two types which guarantees that they have + // the same size. Both are well aligned and valid values for values for + // `u32` and `u64`. + unsafe { + let mut out: u64 = 0; + + assert!(crate::mem::is_same_size::()); + + core::ptr::copy_nonoverlapping(array.as_ptr(), &raw mut out as *mut u32, 2); + out + } +} + +/// interprets a `u64` as an array of two `u32`s. +/// Importantly, this does not account for endianness. +/// This is the inverse of `u64_from_u32s`. +pub fn u32s_from_u64(value: u64) -> [u32; 2] { + // SAFETY: `value` is guaranteed to be a valid `u64`, and we are creating a + // slice of two `u32`s which is also 8 bytes. + assert!(crate::mem::can_transmute::()); + unsafe { core::ptr::read(&raw const value as *const [u32; 2]) } +} + +/// interprets an array of two `u16`s as a `u32`. +/// Importantly, this does not account for endianness. +/// This is the inverse of `u16s_from_u32`. +pub fn u32_from_u16s(array: [u16; 2]) -> u32 { + // SAFETY: `out` and `array` are guaranteed not to overlap, we assert that + // we can transmute between the two types which guarantees that they have + // the same size. Both are well aligned and valid values for values for + // `u32` and `u16`. + unsafe { + let mut out = 0u32; + + // we can't use read here because [u16; 2] is not sufficiently aligned for u32 + core::ptr::copy_nonoverlapping(array.as_ptr(), &raw mut out as *mut u16, 2); + out + } +} + +/// interprets a `u32` as an array of two `u16`s. +/// Importantly, this does not account for endianness. +/// This is the inverse of `u32_from_u16s`. +pub fn u16s_from_u32(value: u32) -> [u16; 2] { + // SAFETY: `value` is guaranteed to be a valid `u32`, and we are creating a + // slice of two `u16`s which is also 4 bytes. + assert!(crate::mem::can_transmute::()); + unsafe { core::ptr::read(&raw const value as *const [u16; 2]) } +} diff --git a/src/iter.rs b/src/iter.rs new file mode 100644 index 0000000..25ecdca --- /dev/null +++ b/src/iter.rs @@ -0,0 +1,44 @@ +/// Trait for only yielding the next item in the Iterator if it tests true for some predicate +pub trait NextIf: Iterator + Clone { + /// Yield next item if `pred` returns `true`. + /// If `pred` returns `false` the Iterator is not advanced. + #[must_use] + fn next_if(&mut self, pred: F) -> Option + where + F: FnOnce(&Self::Item) -> bool, + { + let old = self.clone(); + match self.next() { + Some(item) => { + if pred(&item) { + Some(item) + } else { + *self = old; + None + } + } + None => None, + } + } + /// Yield next item if `pred` returns `Some(T)`. + /// If `pred` returns `None` the Iterator is not advanced. + #[must_use] + fn next_if_map(&mut self, pred: F) -> Option + where + F: FnOnce(Self::Item) -> Option, + { + let old = self.clone(); + match self.next() { + Some(item) => match pred(item) { + None => { + *self = old; + None + } + some => some, + }, + None => None, + } + } +} + +impl NextIf for T where T: Iterator + Clone {} diff --git a/src/lib.rs b/src/lib.rs index a4d7735..d2777ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,8 +9,11 @@ extern crate alloc; extern crate std; pub mod atomic; +pub mod bytes; pub mod cachepadded; pub mod drop_guard; +pub mod iter; +pub mod mem; #[cfg(feature = "transposed-option")] pub mod option; pub mod ptr; @@ -21,4 +24,4 @@ pub mod sync; pub mod util; pub use cachepadded::CachePadded; -pub use util::can_transmute; +pub use mem::can_transmute; diff --git a/src/mem.rs b/src/mem.rs new file mode 100644 index 0000000..a8da271 --- /dev/null +++ b/src/mem.rs @@ -0,0 +1,22 @@ +pub const fn can_transmute() -> bool { + use core::mem::{align_of, size_of}; + // We can transmute `A` to `B` iff `A` and `B` have the same size and the + // alignment of `A` is greater than or equal to the alignment of `B`. + (size_of::() == size_of::()) & (align_of::() >= align_of::()) +} + +pub const fn is_same_size() -> bool { + use core::mem::size_of; + + size_of::() == size_of::() +} + +/// Checks if `A` is aligned at least as well as `B`. e.g. `assert_aligned()` returns `true`, but `assert_aligned()` returns +/// `false`. This is useful for ensuring that a type `A` can be safely cast to a +/// type `B` without violating alignment requirements. +pub const fn is_aligned() -> bool { + use core::mem::align_of; + + align_of::() >= align_of::() +} diff --git a/src/ptr.rs b/src/ptr.rs index 568d0a6..411d636 100644 --- a/src/ptr.rs +++ b/src/ptr.rs @@ -439,6 +439,18 @@ impl TaggedAtomicPtr { let ptr = unsafe { NonNull::new_unchecked(ptr.cast()) }; (ptr, tag) } + + pub fn copy_from( + &self, + other: &Self, + load: atomic::Ordering, + store: atomic::Ordering, + ) -> (*mut T, usize) { + let old = self.ptr.swap(other.ptr.load(load), store); + + let mask = Self::mask(); + (old.map_addr(|addr| addr & !mask).cast(), old.addr() & mask) + } } #[cfg(test)] diff --git a/src/sync.rs b/src/sync.rs index 94a91c1..b57eb92 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -358,7 +358,7 @@ pub mod channel { /// Takes the value from the channel, if it is present. /// this function must only ever return `Some` once. - unsafe fn take(&mut self) -> Option { + pub unsafe fn take(&mut self) -> Option { // unset the OCCUPIED_BIT to indicate that we are taking the value, if any is present. if self .0 @@ -415,3 +415,672 @@ pub mod channel { } } } + +#[cfg(feature = "alloc")] +pub mod queue { + //! A Queue with multiple receivers and multiple producers, where a producer can send a message to one of any of the receivers (any-cast), or one of the receivers (uni-cast). + //! After being woken up from waiting on a message, the receiver will look up the index of the message in the queue and return it. + + use alloc::{boxed::Box, sync::Arc, vec::Vec}; + use core::{ + cell::UnsafeCell, + marker::{PhantomData, PhantomPinned}, + mem::{self, MaybeUninit}, + pin::Pin, + ptr::{self, NonNull}, + sync::atomic::{AtomicU32, Ordering}, + }; + + use hashbrown::HashMap; + + use crate::{CachePadded, ptr::TaggedAtomicPtr}; + + use super::Parker; + + struct QueueInner { + receivers: HashMap, bool)>>, + messages: Vec, + _phantom: core::marker::PhantomData, + } + + pub struct Queue { + inner: UnsafeCell>, + lock: AtomicU32, + } + + unsafe impl Send for Queue {} + unsafe impl Sync for Queue where T: Send {} + + pub struct Receiver { + queue: Arc>, + lock: Pin>, + } + + #[repr(transparent)] + pub struct Sender { + queue: Arc>, + } + + // TODO: make this a linked list of slots so we can queue multiple messages for + // a single receiver + const SLOT_ALIGN: u8 = core::mem::align_of::().ilog2() as u8; + struct Slot { + value: UnsafeCell>, + next_and_state: TaggedAtomicPtr, + _phantom: PhantomData, + } + + impl Slot { + fn new() -> Self { + Self { + value: UnsafeCell::new(MaybeUninit::uninit()), + next_and_state: TaggedAtomicPtr::new(ptr::null_mut(), 0), // 0 means empty + _phantom: PhantomData, + } + } + + fn is_set(&self) -> bool { + self.next_and_state.tag(Ordering::Acquire) == 1 + } + + unsafe fn pop(&self) -> Option { + NonNull::new(self.next_and_state.ptr(Ordering::Acquire)) + .and_then(|next| { + // SAFETY: The next slot is a valid pointer to a Slot that was allocated by us. + unsafe { next.as_ref().pop() } + }) + .or_else(|| { + if self + .next_and_state + .swap_tag(0, Ordering::AcqRel, Ordering::Relaxed) + == 1 + { + // SAFETY: The value is only initialized when the state is set to 1. + Some(unsafe { (&mut *self.value.get()).assume_init_read() }) + } else { + None + } + }) + } + + /// this operation isn't atomic. + #[allow(dead_code)] + unsafe fn pop_front(&self) -> Option { + // swap the slot at `next` with self, and return the value of self. + + // get next ptr, if it is non-null. + if let Some(next) = NonNull::new(self.next_and_state.ptr(Ordering::Acquire)) { + unsafe { + // copy the next slot's next_and_state into self's next_and_state + let (_, old) = self.next_and_state.copy_from( + &next.as_ref().next_and_state, + Ordering::Acquire, + Ordering::Release, + ); + + // copy the next slot's value into self's value + mem::swap(&mut *self.value.get(), &mut *next.as_ref().value.get()); + + if old == 1 { + // SAFETY: The value is only initialized when the state is set to 1. + Some(next.as_ref().value.get().read().assume_init()) + } else { + // next was empty, so we return None. + None + } + } + } else { + // next is null, so popping from the back or front is the same. + unsafe { self.pop() } + } + } + + /// the caller must ensure that they have exclusive access to the slot + unsafe fn push(&self, value: T) { + if self.is_set() { + let next = self.next_ptr(); + unsafe { + (next.as_ref()).push(value); + } + } else { + // SAFETY: The value is only initialized when the state is set to 1. + unsafe { (&mut *self.value.get()).write(value) }; + self.next_and_state + .set_tag(1, Ordering::Release, Ordering::Relaxed); + } + } + + fn next_ptr(&self) -> NonNull> { + if let Some(next) = NonNull::new(self.next_and_state.ptr(Ordering::Acquire)) { + next.cast() + } else { + self.alloc_next() + } + } + + fn alloc_next(&self) -> NonNull> { + let next = Box::into_raw(Box::new(Slot::new())); + + let next = loop { + match self.next_and_state.compare_exchange_weak_ptr( + ptr::null_mut(), + next, + Ordering::Release, + Ordering::Acquire, + ) { + Ok(_) => break next, + Err(other) => { + if other.is_null() { + continue; + } + // next was allocated under us, so we need to drop the slot we just allocated again. + _ = unsafe { Box::from_raw(next) }; + break other; + } + } + }; + + unsafe { + // SAFETY: The next slot is a valid pointer to a Slot that was allocated by us. + NonNull::new_unchecked(next) + } + } + } + + impl Drop for Slot { + fn drop(&mut self) { + // drop next chain + if let Some(next) = NonNull::new(self.next_and_state.swap_ptr( + ptr::null_mut(), + Ordering::Release, + Ordering::Relaxed, + )) { + // SAFETY: The next slot is a valid pointer to a Slot that was allocated by us. + // We drop this in place because idk.. + unsafe { + next.drop_in_place(); + _ = Box::>::from_raw(next.cast().as_ptr()); + } + } + + // SAFETY: The value is only initialized when the state is set to 1. + if mem::needs_drop::() && self.next_and_state.tag(Ordering::Acquire) == 1 { + unsafe { (&mut *self.value.get()).assume_init_drop() }; + } + } + } + + // const BLOCK_SIZE: usize = 8; + // struct Block { + // next: AtomicPtr>, + // slots: [CachePadded>; BLOCK_SIZE], + // } + + /// A token that can be used to identify a specific receiver in a queue. + #[repr(transparent)] + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] + pub struct ReceiverToken(crate::util::Send>); + + impl ReceiverToken { + pub fn as_ptr(&self) -> *mut u32 { + self.0.into_inner().as_ptr() + } + + pub unsafe fn as_parker(&self) -> &Parker { + // SAFETY: The pointer is guaranteed to be valid and aligned, as it comes from a pinned Parker. + unsafe { Parker::from_ptr(self.as_ptr()) } + } + + pub unsafe fn from_parker(parker: &Parker) -> Self { + // SAFETY: The pointer is guaranteed to be valid and aligned, as it comes from a pinned Parker. + let ptr = NonNull::from(parker).cast::(); + ReceiverToken(crate::util::Send(ptr)) + } + } + + impl Queue { + pub fn new() -> Arc { + Arc::new(Self { + inner: UnsafeCell::new(QueueInner { + messages: Vec::new(), + receivers: HashMap::new(), + _phantom: PhantomData, + }), + lock: AtomicU32::new(0), + }) + } + + pub fn new_sender(self: &Arc) -> Sender { + Sender { + queue: self.clone(), + } + } + + pub fn num_receivers(self: &Arc) -> usize { + let _guard = self.lock(); + self.inner().receivers.len() + } + + pub fn as_sender(self: &Arc) -> &Sender { + unsafe { mem::transmute::<&Arc, &Sender>(self) } + } + + pub fn new_receiver(self: &Arc) -> Receiver { + let recv = Receiver { + queue: self.clone(), + lock: Box::pin((Parker::new(), PhantomPinned)), + }; + + // allocate slot for the receiver + let token = recv.get_token(); + let _guard = recv.queue.lock(); + recv.queue + .inner() + .receivers + .insert(token, CachePadded::new((Slot::::new(), false))); + + drop(_guard); + recv + } + + fn lock(&self) -> impl Drop { + unsafe { + let lock = crate::sync::Lock::from_ptr(&self.lock as *const _ as _); + lock.lock(); + crate::drop_guard::DropGuard::new(|| lock.unlock()) + } + } + + fn inner(&self) -> &mut QueueInner { + // SAFETY: The inner is only accessed while the queue is locked. + unsafe { &mut *self.inner.get() } + } + } + + impl QueueInner { + fn poll(&mut self, token: ReceiverToken) -> Option { + // check if someone has sent a message to this receiver + let CachePadded((slot, _)) = self.receivers.get(&token)?; + + unsafe { slot.pop() }.or_else(|| { + // if the slot is empty, we can check the indexed messages + + self.messages.pop() + }) + } + } + + impl Receiver { + pub fn get_token(&self) -> ReceiverToken { + // the token is just the pointer to the lock of this receiver. + // the lock is pinned, so it's address is stable across calls to `receive`. + + ReceiverToken(crate::util::Send(NonNull::from(&self.lock.0).cast())) + } + } + + impl Drop for Receiver { + fn drop(&mut self) { + if mem::needs_drop::() { + // lock the queue + let _guard = self.queue.lock(); + let queue = self.queue.inner(); + + // remove the receiver from the queue + _ = queue.receivers.remove(&self.get_token()); + } + } + } + + impl Receiver { + pub fn recv(&self) -> T { + let token = self.get_token(); + + loop { + // lock the queue + let _guard = self.queue.lock(); + let queue = self.queue.inner(); + + // check if someone has sent a message to this receiver + if let Some(t) = queue.poll(token) { + queue.receivers.get_mut(&token).unwrap().1 = false; // mark the slot as not parked + return t; + } + + // there was no message for this receiver, so we need to park it + queue.receivers.get_mut(&token).unwrap().1 = true; // mark the slot as parked + + self.lock.0.park_with_callback(move || { + // drop the lock guard after having set the lock state to waiting. + // this avoids a deadlock if the sender tries to send a message + // while the receiver is in the process of parking (I think..) + drop(_guard); + }); + } + } + + pub fn try_recv(&self) -> Option { + let token = self.get_token(); + + // lock the queue + let _guard = self.queue.lock(); + let queue = self.queue.inner(); + + // check if someone has sent a message to this receiver + queue.poll(token) + } + } + + impl Sender { + /// Sends a message to one of the receivers in the queue, or makes it + /// available to any receiver that will park in the future. + pub fn anycast(&self, value: T) { + let _guard = self.queue.lock(); + + // SAFETY: The queue is locked, so we can safely access the inner queue. + match unsafe { self.try_anycast_inner(value) } { + Ok(_) => {} + Err(value) => { + // no parked receiver found, so we want to add the message to the indexed slots + let queue = self.queue.inner(); + queue.messages.push(value); + + // waking up a parked receiver is not necessary here, as any + // receivers that don't have a free slot are currently waking up. + } + } + } + + pub fn try_anycast(&self, value: T) -> Result<(), T> { + // lock the queue + let _guard = self.queue.lock(); + + // SAFETY: The queue is locked, so we can safely access the inner queue. + unsafe { self.try_anycast_inner(value) } + } + + /// The caller must hold the lock on the queue for the duration of this function. + unsafe fn try_anycast_inner(&self, value: T) -> Result<(), T> { + // look for a receiver that is parked + let queue = self.queue.inner(); + if let Some((token, slot)) = + queue + .receivers + .iter() + .find_map(|(token, CachePadded((slot, is_parked)))| { + // ensure the slot is available + if *is_parked && !slot.is_set() { + Some((*token, slot)) + } else { + None + } + }) + { + // we found a receiver that is parked, so we can send the message to it + unsafe { + (&mut *slot.value.get()).write(value); + slot.next_and_state + .set_tag(1, Ordering::Release, Ordering::Relaxed); + Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); + } + + return Ok(()); + } else { + return Err(value); + } + } + + /// Sends a message to a specific receiver, waking it if it is parked. + pub fn unicast(&self, value: T, receiver: ReceiverToken) -> Result<(), T> { + // lock the queue + let _guard = self.queue.lock(); + let queue = self.queue.inner(); + + let Some(CachePadded((slot, _))) = queue.receivers.get_mut(&receiver) else { + return Err(value); + }; + + unsafe { + slot.push(value); + } + + // wake the receiver + unsafe { + Parker::from_ptr(receiver.0.into_inner().as_ptr()).unpark(); + } + + Ok(()) + } + + pub fn broadcast(&self, value: T) + where + T: Clone, + { + // lock the queue + let _guard = self.queue.lock(); + let queue = self.queue.inner(); + + // send the message to all receivers + for (token, CachePadded((slot, _))) in queue.receivers.iter() { + // SAFETY: The slot is owned by this receiver. + + unsafe { slot.push(value.clone()) }; + + // wake the receiver + unsafe { + Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); + } + } + } + + pub fn broadcast_with(&self, mut f: F) + where + F: FnMut() -> T, + { + // lock the queue + let _guard = self.queue.lock(); + let queue = self.queue.inner(); + + // send the message to all receivers + for (token, CachePadded((slot, _))) in queue.receivers.iter() { + // SAFETY: The slot is owned by this receiver. + + unsafe { slot.push(f()) }; + + // check if the receiver is parked + // wake the receiver + unsafe { + Parker::from_ptr(token.0.into_inner().as_ptr()).unpark(); + } + } + } + } + + #[cfg(test)] + mod tests { + use std::println; + + use super::*; + + #[test] + fn test_queue() { + let queue = Queue::::new(); + + let sender = queue.new_sender(); + let receiver1 = queue.new_receiver(); + let receiver2 = queue.new_receiver(); + + let token2 = receiver2.get_token(); + + sender.anycast(42); + + assert_eq!(receiver1.recv(), 42); + + sender.unicast(100, token2).unwrap(); + assert_eq!(receiver1.try_recv(), None); + assert_eq!(receiver2.recv(), 100); + } + + #[test] + fn queue_broadcast() { + let queue = Queue::::new(); + + let sender = queue.new_sender(); + let receiver1 = queue.new_receiver(); + let receiver2 = queue.new_receiver(); + + sender.broadcast(42); + + assert_eq!(receiver1.recv(), 42); + assert_eq!(receiver2.recv(), 42); + } + + #[test] + fn queue_multiple_messages() { + let queue = Queue::::new(); + + let sender = queue.new_sender(); + let receiver = queue.new_receiver(); + + sender.anycast(1); + sender.unicast(2, receiver.get_token()).unwrap(); + + assert_eq!(receiver.recv(), 2); + assert_eq!(receiver.recv(), 1); + } + + #[test] + fn queue_threaded() { + #[derive(Debug, Clone, Copy)] + enum Message { + Send(i32), + Exit, + } + + let queue = Queue::::new(); + + let sender = queue.new_sender(); + + let threads = (0..5) + .map(|_| { + let queue_clone = queue.clone(); + let receiver = queue_clone.new_receiver(); + + std::thread::spawn(move || { + loop { + match receiver.recv() { + Message::Send(value) => { + println!( + "Receiver {:?} Received: {}", + receiver.get_token(), + value + ); + } + Message::Exit => { + println!("Exiting thread"); + break; + } + } + } + }) + }) + .collect::>(); + + // Send messages to the receivers + for i in 0..10 { + sender.anycast(Message::Send(i)); + } + + // Send exit messages to all receivers + sender.broadcast(Message::Exit); + for thread in threads { + thread.join().unwrap(); + } + println!("All threads have exited."); + } + + #[test] + fn drop_slot() { + // Test that dropping a slot does not cause a double free or panic + let slot = Slot::::new(); + unsafe { + slot.push(42); + drop(slot); + } + } + + #[test] + fn drop_slot_chain() { + struct DropCheck<'a>(&'a AtomicU32); + impl Drop for DropCheck<'_> { + fn drop(&mut self) { + self.0.fetch_sub(1, Ordering::SeqCst); + } + } + + impl<'a> DropCheck<'a> { + fn new(counter: &'a AtomicU32) -> Self { + counter.fetch_add(1, Ordering::SeqCst); + Self(counter) + } + } + let counter = AtomicU32::new(0); + let slot = Slot::::new(); + for _ in 0..10 { + unsafe { + slot.push(DropCheck::new(&counter)); + } + } + assert_eq!(counter.load(Ordering::SeqCst), 10); + drop(slot); + assert_eq!( + counter.load(Ordering::SeqCst), + 0, + "All DropCheck instances should have been dropped" + ); + } + + #[test] + fn send_self() { + // Test that sending a message to self works + let queue = Queue::::new(); + let sender = queue.new_sender(); + let receiver = queue.new_receiver(); + + sender.unicast(42, receiver.get_token()).unwrap(); + assert_eq!(receiver.recv(), 42); + } + + #[test] + fn send_self_many() { + // Test that sending multiple messages to self works + let queue = Queue::::new(); + let sender = queue.new_sender(); + let receiver = queue.new_receiver(); + + for i in 0..10 { + sender.unicast(i, receiver.get_token()).unwrap(); + } + + for i in (0..10).rev() { + assert_eq!(receiver.recv(), i); + } + } + + #[test] + fn slot_pop_front() { + // Test that popping from the front of a slot works correctly + let slot = Slot::::new(); + unsafe { + slot.push(1); + slot.push(2); + slot.push(3); + } + + assert_eq!(unsafe { slot.pop_front() }, Some(1)); + assert_eq!(unsafe { slot.pop_front() }, Some(2)); + assert_eq!(unsafe { slot.pop_front() }, Some(3)); + assert_eq!(unsafe { slot.pop_front() }, None); + } + } +} diff --git a/src/util.rs b/src/util.rs index c759785..f1ef7a0 100644 --- a/src/util.rs +++ b/src/util.rs @@ -44,9 +44,40 @@ pub fn unwrap_or_panic(result: std::thread::Result) -> T { } } -pub const fn can_transmute() -> bool { - use core::mem::{align_of, size_of}; - // We can transmute `A` to `B` iff `A` and `B` have the same size and the - // alignment of `A` is greater than or equal to the alignment of `B`. - (size_of::() == size_of::()) & (align_of::() >= align_of::()) +#[deprecated( + since = "0.1.0", + note = "use `can_transmute` from `mem` module instead" +)] +pub use super::mem::can_transmute; + +/// True if `c` is considered a whitespace according to Rust language definition. +/// See [Rust language reference](https://doc.rust-lang.org/reference/whitespace.html) +/// for definitions of these classes. +pub fn is_whitespace(c: char) -> bool { + // This is Pattern_White_Space. + // + // Note that this set is stable (ie, it doesn't change with different + // Unicode versions), so it's ok to just hard-code the values. + + matches!( + c, + // Usual ASCII suspects + '\u{0009}' // \t + | '\u{000A}' // \n + | '\u{000B}' // vertical tab + | '\u{000C}' // form feed + | '\u{000D}' // \r + | '\u{0020}' // space + + // NEXT LINE from latin1 + | '\u{0085}' + + // Bidi markers + | '\u{200E}' // LEFT-TO-RIGHT MARK + | '\u{200F}' // RIGHT-TO-LEFT MARK + + // Dedicated whitespace characters from Unicode + | '\u{2028}' // LINE SEPARATOR + | '\u{2029}' // PARAGRAPH SEPARATOR + ) }