Compare commits

...

5 commits

10 changed files with 838 additions and 22 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target
Cargo.lock

76
Cargo.lock generated
View file

@ -2,6 +2,82 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "atomic-wait"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a55b94919229f2c42292fd71ffa4b75e83193bffdd77b1e858cd55fd2d0b0ea8"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "libc"
version = "0.2.174"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776"
[[package]]
name = "werkzeug"
version = "0.1.0"
dependencies = [
"atomic-wait",
]
[[package]]
name = "windows-sys"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"

View file

@ -6,7 +6,12 @@ edition = "2024"
[features]
default = ["alloc"]
alloc = []
std = []
std = ["alloc"]
nightly = []
[dependencies]
# libc = "0.2"
# While I could use libc / windows for this, why not just use this tiny crate
# which does exactly and only a futex
atomic-wait = "1.1.0"

1
rust-toolchain Normal file
View file

@ -0,0 +1 @@
nightly

375
src/atomic.rs Normal file
View file

@ -0,0 +1,375 @@
use core::{
cell::UnsafeCell,
mem::{self, ManuallyDrop, MaybeUninit},
sync::atomic::{AtomicU8, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, Ordering},
};
use crate::sync::SpinWait;
macro_rules! atomic {
(@check, $t:ty, $atomic:ty, $a:ident, $op:expr) => {
if crate::can_transmute::<$t, $atomic>() {
let $a: &$atomic;
break $op;
}
};
($t:ty, $a:ident, $op:expr, $fallback:expr) => {
loop {
atomic!(@check, $t, AtomicU8, $a, $op);
atomic!(@check, $t, AtomicU16, $a, $op);
atomic!(@check, $t, AtomicU32, $a, $op);
atomic!(@check, $t, AtomicU64, $a, $op);
atomic!(@check, $t, AtomicUsize, $a, $op);
// Fallback to the provided expression if no atomic type is found.
break $fallback;
}
};
}
pub struct AtomicOption<T> {
inner: AtomicCellInner<T>,
_phantom: core::marker::PhantomData<T>,
}
impl<T> AtomicOption<T> {
pub const fn new() -> Self {
Self {
inner: AtomicCellInner::none(),
_phantom: core::marker::PhantomData,
}
}
pub fn from_option(value: Option<T>) -> Self {
Self {
inner: AtomicCellInner::from_option(value),
_phantom: core::marker::PhantomData,
}
}
pub fn into_option(self) -> Option<T> {
self.inner.into_option()
}
pub fn set(&self, value: T) {
self.inner.set(value);
}
/// Set the value if the cell is empty, returning `Some(value)` if the cell
/// was already occupied.
pub fn try_set(&self, value: T) -> Option<T> {
self.inner.try_set(value)
}
pub fn take(&self) -> Option<T> {
self.inner.take()
}
pub fn get(&self) -> Option<T>
where
T: Copy,
{
self.inner.get()
}
pub fn swap(&self, value: Option<T>) -> Option<T> {
self.inner.swap(value)
}
}
struct AtomicCellInner<T> {
value: UnsafeCell<ManuallyDrop<MaybeUninit<T>>>,
state: AtomicU8,
}
impl<T> AtomicCellInner<T> {
const EMPTY: u8 = 0;
const FULL: u8 = 1;
const LOCKED: u8 = 2;
const fn none() -> Self {
Self {
value: UnsafeCell::new(ManuallyDrop::new(MaybeUninit::uninit())),
state: AtomicU8::new(Self::EMPTY),
}
}
fn into_option(self) -> Option<T> {
if self.state.load(Ordering::Relaxed) == Self::FULL {
Some(unsafe { ManuallyDrop::into_inner(self.value.into_inner()).assume_init() })
} else {
None
}
}
fn from_option(value: Option<T>) -> Self {
match value {
Some(v) => Self {
value: UnsafeCell::new(ManuallyDrop::new(MaybeUninit::new(v))),
state: AtomicU8::new(Self::FULL),
},
None => Self {
value: UnsafeCell::new(ManuallyDrop::new(MaybeUninit::uninit())),
state: AtomicU8::new(Self::EMPTY),
},
}
}
unsafe fn copy_from(&self, other: &Self, load: Ordering, store: Ordering) {
unsafe {
self.value.get().write(other.value.get().read());
self.state.store(other.state.load(load), store);
}
}
fn set(&self, value: T) {
self.swap(Some(value));
}
fn take(&self) -> Option<T> {
self.swap(None)
}
fn get(&self) -> Option<T>
where
T: Copy,
{
let this: Self;
atomic! {
Self, a,
{
unsafe {
a = &*(self as *const Self as *const _);
let old = a.load(Ordering::Acquire);
this = mem::transmute_copy(&old);
}
},
{
let mut state = self.state.load(Ordering::Acquire);
if state == Self::EMPTY {
this = Self::none();
} else {
// if the state is `FULL`, we have to lock
let mut spin_wait = SpinWait::new();
let old = loop {
// if the state is `LOCKED`, we need to wait
if state == Self::LOCKED {
spin_wait.spin();
continue;
}
// if the state is `FULL`, we can try locking and swapping the value`
if self.state.compare_exchange_weak(
state,
Self::LOCKED,
Ordering::Acquire,
Ordering::Relaxed,
).is_ok() {
break state;
} else {
// the state changed, we need to check again
state = self.state.load(Ordering::Relaxed);
continue;
}
};
let empty = Self::none();
if old == Self::FULL {
// copy the value out of the cell
unsafe {
empty.copy_from(&self, Ordering::Relaxed, Ordering::Release);
}
}
this = empty;
}
}
}
match this.state.load(Ordering::Relaxed) {
Self::FULL => {
// SAFETY: We are returning the value only if it was previously full.
unsafe { Some(ManuallyDrop::into_inner(this.value.get().read()).assume_init()) }
}
_ => None,
}
}
fn swap(&self, value: Option<T>) -> Option<T> {
let mut this = Self::from_option(value);
atomic! {
Self, a,
{
// SAFETY: this block is only executed if `Self` can be transmuted into an atomic type.
// self.state cannot be `LOCKED` here, so we can safely swap the value.
unsafe {
// turn `self` into an atomic pointer
a = &*(self as *const Self as *const _);
// swap the value atomically
let old = a.swap(mem::transmute_copy(&this), Ordering::Release);
this = mem::transmute_copy(&old);
if this.state.load(Ordering::Relaxed) == Self::FULL {
// SAFETY: We are returning the value only if it was previously full.
Some( ManuallyDrop::into_inner(this.value.into_inner()).assume_init() )
} else {
None
}
}
},
{
// Fallback if no atomic type is found.
// we need to lock the cell to swap the value.
// attempt to lock optimistically
match self.state.compare_exchange_weak(
Self::EMPTY,
Self::LOCKED,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
// SAFETY: We are the only thread that can access this cell now.
unsafe {
self.copy_from(&this, Ordering::Relaxed, Ordering::Release);
}
None
}
Err(mut state) => {
let mut spin_wait = SpinWait::new();
let old = loop {
// if the state is `LOCKED`, we need to wait
if state == Self::LOCKED {
spin_wait.spin();
continue;
}
// if the state is not `LOCKED`, we can try locking and swapping the value`
if self.state.compare_exchange_weak(
state,
Self::LOCKED,
Ordering::Acquire,
Ordering::Relaxed,
).is_ok() {
break state;
} else {
// the state changed, we need to check again
state = self.state.load(Ordering::Relaxed);
continue;
}
};
let old = if old == Self::FULL {
// SAFETY: the cell is locked, and is initialised.
unsafe {
Some(ManuallyDrop::into_inner(self.value.get().read()).assume_init())
}
} else {None};
// SAFETY: the cell is locked, so we can safely copy the value
unsafe {
self.copy_from(&this, Ordering::Relaxed, Ordering::Release);
}
old
}
}
}
};
None
}
// Try to set the value, returning `value` if the cell already contains a value.
fn try_set(&self, value: T) -> Option<T> {
let this = Self::from_option(Some(value));
atomic! {
Self, a,
{
// SAFETY: this block is only executed if `Self` can be transmuted into an atomic type.
// self.state cannot be `LOCKED` here, so we can safely swap the value.
unsafe {
// turn `self` into an atomic pointer
a = &*(self as *const Self as *const _);
let empty = Self::none();
// compare-exchange the value with an unset `Self` atomically
if a.compare_exchange(mem::transmute_copy(&empty), mem::transmute_copy(&this), Ordering::Release, Ordering::Relaxed).is_ok() {
None
} else {
this.into_option()
}
}
},
{
// Fallback if no atomic type is found.
// we need to lock the cell to swap the value.
// attempt to lock optimistically
match self.state.compare_exchange_weak(
Self::EMPTY,
Self::LOCKED,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
// SAFETY: We are the only thread that can access this cell now.
unsafe {
self.copy_from(&this, Ordering::Relaxed, Ordering::Release);
}
None
}
Err(mut state) => {
let mut spin_wait = SpinWait::new();
let old = loop {
// if the state is `FULL`, we short-circuit to
// return the provided value.
if state == Self::FULL {
break state;
}
// if the state is `LOCKED`, we need to wait
if state == Self::LOCKED {
spin_wait.spin();
continue;
}
// if the state is not `LOCKED`, we can try locking
// and swapping the value`
if self.state.compare_exchange_weak(
state,
Self::LOCKED,
Ordering::Acquire,
Ordering::Relaxed,
).is_ok() {
break state;
} else {
// the state changed, we need to check again
state = self.state.load(Ordering::Relaxed);
continue;
}
};
if old == Self::EMPTY {
// SAFETY: the cell is locked, so we can safely copy the value
unsafe {
self.copy_from(&this, Ordering::Relaxed, Ordering::Release);
None
}
} else {
this.into_option()
}
}
}
}
}
}
}

View file

@ -1,4 +1,8 @@
use core::{cell::UnsafeCell, mem::ManuallyDrop};
use core::{
cell::UnsafeCell,
mem::ManuallyDrop,
ops::{Deref, DerefMut},
};
/// A guard that runs a closure when it is dropped.
pub struct DropGuard<F: FnOnce()>(UnsafeCell<ManuallyDrop<F>>);
@ -10,6 +14,10 @@ where
pub fn new(f: F) -> DropGuard<F> {
Self(UnsafeCell::new(ManuallyDrop::new(f)))
}
pub fn guard<T>(self, t: T) -> DropGuarded<T, F> {
DropGuarded(t, self)
}
}
impl<F> Drop for DropGuard<F>
@ -32,3 +40,45 @@ where
DropGuard::new(f)
}
}
pub struct DropGuarded<T, F: FnOnce()>(T, DropGuard<F>);
impl<T, F> DropGuarded<T, F>
where
F: FnOnce(),
{
pub fn new(value: T, f: F) -> Self {
Self(value, DropGuard::new(f))
}
pub fn into_inner(self) -> T {
self.0
}
pub fn map<U, G>(self, f: G) -> DropGuarded<U, F>
where
G: FnOnce(T) -> U,
{
DropGuarded(f(self.0), self.1)
}
}
impl<T, F> Deref for DropGuarded<T, F>
where
F: FnOnce(),
{
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T, F> DerefMut for DropGuarded<T, F>
where
F: FnOnce(),
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

View file

@ -7,12 +7,14 @@ extern crate alloc;
#[cfg(any(test, feature = "std"))]
extern crate std;
pub mod atomic;
pub mod cachepadded;
pub mod drop_guard;
pub mod ptr;
pub mod rand;
#[cfg(feature = "alloc")]
pub mod smallbox;
pub mod sync;
pub mod util;
pub use cachepadded::CachePadded;

View file

@ -183,17 +183,15 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
}
}
pub fn ptr(&self, order: atomic::Ordering) -> NonNull<T> {
unsafe {
NonNull::new_unchecked(
self.ptr
.load(order)
.map_addr(|addr| addr & !Self::mask())
.cast(),
)
}
#[doc(alias = "load_ptr")]
pub fn ptr(&self, order: atomic::Ordering) -> *mut T {
self.ptr
.load(order)
.map_addr(|addr| addr & !Self::mask())
.cast()
}
#[doc(alias = "load_tag")]
pub fn tag(&self, order: atomic::Ordering) -> usize {
self.ptr.load(order).addr() & Self::mask()
}
@ -238,6 +236,68 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
}
}
/// returns tag
#[inline(always)]
fn compare_exchange_ptr_inner(
&self,
old: *mut T,
new: *mut T,
success: atomic::Ordering,
failure: atomic::Ordering,
cmpxchg: fn(
&AtomicPtr<()>,
*mut (),
*mut (),
atomic::Ordering,
atomic::Ordering,
) -> Result<*mut (), *mut ()>,
) -> Result<*mut T, *mut T> {
let mask = Self::mask();
let old_tag = self.ptr.load(failure).addr() & mask;
// old and new must be aligned to the mask, so no need to & with the mask.
let old = old.map_addr(|addr| addr | old_tag).cast();
let new = new.map_addr(|addr| addr | old_tag).cast();
let result = cmpxchg(&self.ptr, old, new, success, failure);
result
.map(|ptr| ptr.map_addr(|addr| addr & !mask).cast())
.map_err(|ptr| ptr.map_addr(|addr| addr & !mask).cast())
}
pub fn compare_exchange_ptr(
&self,
old: *mut T,
new: *mut T,
success: atomic::Ordering,
failure: atomic::Ordering,
) -> Result<*mut T, *mut T> {
self.compare_exchange_ptr_inner(
old,
new,
success,
failure,
AtomicPtr::<()>::compare_exchange,
)
}
pub fn compare_exchange_weak_ptr(
&self,
old: *mut T,
new: *mut T,
success: atomic::Ordering,
failure: atomic::Ordering,
) -> Result<*mut T, *mut T> {
self.compare_exchange_ptr_inner(
old,
new,
success,
failure,
AtomicPtr::<()>::compare_exchange_weak,
)
}
/// returns tag
#[inline(always)]
fn compare_exchange_tag_inner(
@ -268,7 +328,6 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
}
/// returns tag
#[allow(dead_code)]
pub fn compare_exchange_tag(
&self,
old: usize,
@ -302,7 +361,7 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
)
}
#[allow(dead_code)]
#[doc(alias = "store_ptr")]
pub fn set_ptr(&self, ptr: *mut T, success: atomic::Ordering, failure: atomic::Ordering) {
let mask = Self::mask();
let ptr = ptr.cast::<()>();
@ -319,6 +378,7 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
}
}
#[doc(alias = "store_tag")]
pub fn set_tag(&self, tag: usize, success: atomic::Ordering, failure: atomic::Ordering) {
let mask = Self::mask();
loop {
@ -335,6 +395,42 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
}
}
pub fn swap_tag(
&self,
new: usize,
success: atomic::Ordering,
failure: atomic::Ordering,
) -> usize {
let mask = Self::mask();
loop {
let ptr = self.ptr.load(failure);
let new = ptr.map_addr(|addr| (addr & !mask) | (new & mask));
if let Ok(old) = self.ptr.compare_exchange_weak(ptr, new, success, failure) {
break old.addr() & mask;
}
}
}
pub fn swap_ptr(
&self,
new: *mut T,
success: atomic::Ordering,
failure: atomic::Ordering,
) -> *mut T {
let mask = Self::mask();
let new = new.cast::<()>();
loop {
let old = self.ptr.load(failure);
let new = new.map_addr(|addr| (addr & !mask) | (old.addr() & mask));
if let Ok(old) = self.ptr.compare_exchange_weak(old, new, success, failure) {
break old.map_addr(|addr| addr & !mask).cast();
}
}
}
pub fn ptr_and_tag(&self, order: atomic::Ordering) -> (NonNull<T>, usize) {
let mask = Self::mask();
let ptr = self.ptr.load(order);
@ -357,7 +453,7 @@ mod tests {
let ptr = Box::into_raw(Box::new(42u32));
let tagged_ptr = TaggedAtomicPtr::<u32, 2>::new(ptr, 0);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed).as_ptr(), ptr);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
unsafe {
_ = Box::from_raw(ptr);
@ -369,11 +465,11 @@ mod tests {
let ptr = Box::into_raw(Box::new(42u32));
let tagged_ptr = TaggedAtomicPtr::<u32, 2>::new(ptr, 0b11);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0b11);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed).as_ptr(), ptr);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
assert_eq!(tagged_ptr.take_tag(Ordering::Relaxed), 0b11);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed).as_ptr(), ptr);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
unsafe {
_ = Box::from_raw(ptr);
@ -385,11 +481,11 @@ mod tests {
let ptr = Box::into_raw(Box::new(42u32));
let tagged_ptr = TaggedAtomicPtr::<u32, 2>::new(ptr, 0b11);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0b11);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed).as_ptr(), ptr);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
assert_eq!(tagged_ptr.fetch_or_tag(0b10, Ordering::Relaxed), 0b11);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0b11 | 0b10);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed).as_ptr(), ptr);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
unsafe {
_ = Box::from_raw(ptr);
@ -397,11 +493,11 @@ mod tests {
}
#[test]
fn tagged_ptr_exchange() {
fn tagged_ptr_exchange_tag() {
let ptr = Box::into_raw(Box::new(42u32));
let tagged_ptr = TaggedAtomicPtr::<u32, 2>::new(ptr, 0b11);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0b11);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed).as_ptr(), ptr);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
assert_eq!(
tagged_ptr
@ -411,10 +507,34 @@ mod tests {
);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0b10);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed).as_ptr(), ptr);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
unsafe {
_ = Box::from_raw(ptr);
}
}
#[test]
fn tagged_ptr_exchange_ptr() {
let ptr = Box::into_raw(Box::new(42u32));
let tagged_ptr = TaggedAtomicPtr::<u32, 2>::new(ptr, 0b11);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0b11);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), ptr);
let new_ptr = Box::into_raw(Box::new(43u32));
assert_eq!(
tagged_ptr
.compare_exchange_ptr(ptr, new_ptr, Ordering::Relaxed, Ordering::Relaxed)
.unwrap(),
ptr
);
assert_eq!(tagged_ptr.tag(Ordering::Relaxed), 0b11);
assert_eq!(tagged_ptr.ptr(Ordering::Relaxed), new_ptr);
unsafe {
_ = Box::from_raw(ptr);
_ = Box::from_raw(new_ptr);
}
}
}

182
src/sync.rs Normal file
View file

@ -0,0 +1,182 @@
use core::{
mem,
sync::atomic::{AtomicU32, Ordering},
};
const LOCKED_BIT: u32 = 0b001;
const EMPTY: u32 = 0;
/// A simple lock implementation using an atomic u32.
#[repr(transparent)]
pub struct Lock {
inner: AtomicU32,
}
impl Lock {
/// Creates a new lock in the unlocked state.
pub const fn new() -> Self {
Self {
inner: AtomicU32::new(0),
}
}
pub fn as_ptr(&self) -> *mut u32 {
self.inner.as_ptr()
}
pub unsafe fn from_ptr<'a>(ptr: *mut u32) -> &'a Self {
// SAFETY: The caller must ensure that `ptr` is not aliased, and lasts
// for the lifetime of the `Lock`.
unsafe { mem::transmute(AtomicU32::from_ptr(ptr)) }
}
/// Acquires the lock, blocking until it is available.
pub fn lock(&self) {
// attempt acquiring the lock with no contention.
if self
.inner
.compare_exchange_weak(EMPTY, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
// We successfully acquired the lock.
return;
} else {
self.lock_slow();
}
}
pub fn unlock(&self) {
// use release semantics to ensure that all previous writes are
// available to other threads.
self.inner.fetch_and(!LOCKED_BIT, Ordering::Release);
}
fn lock_slow(&self) {
// The lock is either locked, or someone is waiting for it:
let mut spin_wait = SpinWait::new();
let mut state = self.inner.load(Ordering::Acquire);
loop {
// If the lock isn't locked, we can try to acquire it.
if state & LOCKED_BIT == 0 {
// Try to acquire the lock.
match self.inner.compare_exchange_weak(
state,
state | LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
// We successfully acquired the lock.
return;
}
Err(new_state) => {
// The state changed, we need to check again.
state = new_state;
continue;
}
}
}
if {
let spun: bool;
#[cfg(feature = "std")]
{
spun = spin_wait.spin_yield();
}
#[cfg(not(feature = "std"))]
{
spun = spin_wait.spin();
}
spun
} {
// We can spin for a little while and see if it becomes available.
state = self.inner.load(Ordering::Relaxed);
continue;
}
// If we reach here, we need to park the thread.
atomic_wait::wait(&self.inner, LOCKED_BIT);
if self
.inner
.compare_exchange_weak(
state,
state | LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
// We successfully acquired the lock after being woken up.
return;
}
spin_wait.reset();
state = self.inner.load(Ordering::Relaxed);
}
}
pub fn wait(&self) {
let state = self.inner.load(Ordering::Acquire);
atomic_wait::wait(&self.inner, state);
}
pub fn wake_one(&self) {
// Notify one thread waiting on this lock.
atomic_wait::wake_one(&self.inner);
}
}
pub struct SpinWait {
counter: u32,
}
impl SpinWait {
/// Creates a new `SpinWait` with an initial counter value.
pub const fn new() -> Self {
Self { counter: 0 }
}
/// Resets the counter to zero.
pub fn reset(&mut self) {
self.counter = 0;
}
pub fn spin(&mut self) -> bool {
if self.counter >= 10 {
// If the counter is too high, we signal the caller to potentially park.
return false;
}
self.counter += 1;
// spin for a small number of iterations based on the counter value.
for _ in 0..(1 << self.counter) {
core::hint::spin_loop();
}
true
}
#[cfg(feature = "std")]
pub fn spin_yield(&mut self) -> bool {
if self.counter >= 10 {
// If the counter is too high, we signal the caller to potentially park.
return false;
}
self.counter += 1;
if self.counter >= 3 {
// spin for a small number of iterations based on the counter value.
for _ in 0..(1 << self.counter) {
core::hint::spin_loop();
}
} else {
// yield the thread and wait for the OS to reschedule us.
std::thread::yield_now();
}
true
}
}

View file

@ -1,7 +1,8 @@
use core::ops::{Deref, DerefMut};
#[repr(transparent)]
pub struct Send<T>(pub(self) T);
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Copy)]
pub struct Send<T>(pub T);
unsafe impl<T> core::marker::Send for Send<T> {}
@ -22,6 +23,9 @@ impl<T> Send<T> {
pub unsafe fn new(value: T) -> Self {
Self(value)
}
pub fn into_inner(self) -> T {
self.0
}
}
/// returns the number of available hardware threads, or 1 if it cannot be determined.