diff --git a/src/sync.rs b/src/sync.rs index e1b90cc..b75f4a4 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -129,6 +129,7 @@ impl Lock { } } +// from parking_lot_core pub struct SpinWait { counter: u32, } @@ -180,3 +181,68 @@ impl SpinWait { true } } + +// taken from `std` +#[derive(Debug)] +#[repr(transparent)] +pub struct Parker { + mutex: AtomicU32, +} + +impl Parker { + const PARKED: u32 = u32::MAX; + const EMPTY: u32 = 0; + const NOTIFIED: u32 = 1; + pub fn new() -> Self { + Self { + mutex: AtomicU32::new(Self::EMPTY), + } + } + + pub fn as_ptr(&self) -> *mut u32 { + self.mutex.as_ptr() + } + + pub unsafe fn from_ptr<'a>(ptr: *mut u32) -> &'a Self { + // SAFETY: The caller must ensure that `ptr` is not aliased, and lasts + // for the lifetime of the `Parker`. + unsafe { mem::transmute(AtomicU32::from_ptr(ptr)) } + } + + pub fn is_parked(&self) -> bool { + self.mutex.load(Ordering::Acquire) == Self::PARKED + } + + pub fn park(&self) { + if self.mutex.fetch_sub(1, Ordering::Acquire) == Self::NOTIFIED { + // The thread was notified, so we can return immediately. + return; + } + + loop { + atomic_wait::wait(&self.mutex, Self::PARKED); + + // We check whether we were notified or woke up spuriously with + // acquire ordering in order to make-visible any writes made by the + // thread that notified us. + if self.mutex.swap(Self::EMPTY, Ordering::Acquire) == Self::NOTIFIED { + // The thread was notified, so we can return immediately. + return; + } else { + // spurious wakeup, so we need to re-park. + continue; + } + } + } + + pub fn unpark(&self) { + // write with Release ordering to ensure that any writes made by this + // thread are made-available to the unparked thread. + if self.mutex.swap(Self::NOTIFIED, Ordering::Release) == Self::PARKED { + // The thread was parked, so we need to notify it. + atomic_wait::wake_one(&self.mutex); + } else { + // The thread was not parked, so we don't need to do anything. + } + } +}