diff --git a/distaff/src/heartbeat.rs b/distaff/src/heartbeat.rs new file mode 100644 index 0000000..29f5516 --- /dev/null +++ b/distaff/src/heartbeat.rs @@ -0,0 +1,176 @@ +use std::{ + collections::BTreeMap, + mem::ManuallyDrop, + ops::Deref, + ptr::NonNull, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::Instant, +}; + +use parking_lot::Mutex; + +#[derive(Debug, Clone)] +pub struct HeartbeatList { + inner: Arc>, +} + +impl HeartbeatList { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(HeartbeatListInner::new())), + } + } + + pub fn new_heartbeat(&self) -> OwnedHeartbeatReceiver { + let (recv, _) = self.inner.lock().new_heartbeat(); + OwnedHeartbeatReceiver { + list: self.clone(), + receiver: ManuallyDrop::new(recv), + } + } +} + +#[derive(Debug)] +struct HeartbeatListInner { + heartbeats: BTreeMap, + heartbeat_index: u64, +} + +impl HeartbeatListInner { + fn new() -> Self { + Self { + heartbeats: BTreeMap::new(), + heartbeat_index: 0, + } + } + + fn new_heartbeat(&mut self) -> (HeartbeatReceiver, u64) { + let heartbeat = Heartbeat::new(self.heartbeat_index); + let (recv, send, i) = heartbeat.into_recv_send(); + self.heartbeats.insert(i, send); + self.heartbeat_index += 1; + (recv, i) + } + + fn remove_heartbeat(&mut self, receiver: HeartbeatReceiver) { + if let Some(send) = self.heartbeats.remove(&receiver.i) { + _ = Heartbeat::from_recv_send(receiver, send); + } + } +} + +pub struct OwnedHeartbeatReceiver { + list: HeartbeatList, + receiver: ManuallyDrop, +} + +impl Deref for OwnedHeartbeatReceiver { + type Target = HeartbeatReceiver; + + fn deref(&self) -> &Self::Target { + &self.receiver + } +} + +impl Drop for OwnedHeartbeatReceiver { + fn drop(&mut self) { + // SAFETY: + // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. + unsafe { + let receiver = ManuallyDrop::take(&mut self.receiver); + self.list.inner.lock().remove_heartbeat(receiver); + } + } +} + +#[derive(Debug)] +pub struct Heartbeat { + ptr: NonNull, + i: u64, +} + +#[derive(Debug)] +pub struct HeartbeatReceiver { + ptr: NonNull, + i: u64, +} + +unsafe impl Send for HeartbeatReceiver {} + +impl Drop for Heartbeat { + fn drop(&mut self) { + // SAFETY: + // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. + unsafe { + let _ = Box::from_raw(self.ptr.as_ptr()); + } + } +} + +#[derive(Debug)] +pub struct HeartbeatSender { + ptr: NonNull, + pub last_heartbeat: Instant, +} + +unsafe impl Send for HeartbeatSender {} + +impl Heartbeat { + pub fn new(i: u64) -> Heartbeat { + // SAFETY: + // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. + let ptr = NonNull::new(Box::into_raw(Box::new(AtomicBool::new(true)))).unwrap(); + Self { ptr, i } + } + + pub fn from_recv_send(recv: HeartbeatReceiver, send: HeartbeatSender) -> Heartbeat { + // SAFETY: + // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. + _ = send; + let ptr = recv.ptr; + let i = recv.i; + Heartbeat { ptr, i } + } + + pub fn into_recv_send(self) -> (HeartbeatReceiver, HeartbeatSender, u64) { + let Self { ptr, i } = self; + ( + HeartbeatReceiver { ptr, i }, + HeartbeatSender { + ptr, + last_heartbeat: Instant::now(), + }, + i, + ) + } +} + +impl HeartbeatReceiver { + pub fn take(&self) -> bool { + unsafe { + // SAFETY: + // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. + self.ptr.as_ref().swap(false, Ordering::Relaxed) + } + } + + pub fn id(&self) -> usize { + self.ptr.as_ptr() as usize + } + + pub fn index(&self) -> u64 { + self.i + } +} + +impl HeartbeatSender { + pub fn set(&mut self) { + // SAFETY: + // `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads. + unsafe { self.ptr.as_ref().store(true, Ordering::Relaxed) }; + self.last_heartbeat = Instant::now(); + } +} diff --git a/distaff/src/lib.rs b/distaff/src/lib.rs index f064e21..cd56bca 100644 --- a/distaff/src/lib.rs +++ b/distaff/src/lib.rs @@ -13,6 +13,7 @@ extern crate alloc; mod context; +mod heartbeat; mod job; mod join; mod latch;