heatbeat module
This commit is contained in:
parent
eb8fd314f5
commit
bdbe207e7e
176
distaff/src/heartbeat.rs
Normal file
176
distaff/src/heartbeat.rs
Normal file
|
@ -0,0 +1,176 @@
|
|||
use std::{
|
||||
collections::BTreeMap,
|
||||
mem::ManuallyDrop,
|
||||
ops::Deref,
|
||||
ptr::NonNull,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HeartbeatList {
|
||||
inner: Arc<Mutex<HeartbeatListInner>>,
|
||||
}
|
||||
|
||||
impl HeartbeatList {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(HeartbeatListInner::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_heartbeat(&self) -> OwnedHeartbeatReceiver {
|
||||
let (recv, _) = self.inner.lock().new_heartbeat();
|
||||
OwnedHeartbeatReceiver {
|
||||
list: self.clone(),
|
||||
receiver: ManuallyDrop::new(recv),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct HeartbeatListInner {
|
||||
heartbeats: BTreeMap<u64, HeartbeatSender>,
|
||||
heartbeat_index: u64,
|
||||
}
|
||||
|
||||
impl HeartbeatListInner {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
heartbeats: BTreeMap::new(),
|
||||
heartbeat_index: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn new_heartbeat(&mut self) -> (HeartbeatReceiver, u64) {
|
||||
let heartbeat = Heartbeat::new(self.heartbeat_index);
|
||||
let (recv, send, i) = heartbeat.into_recv_send();
|
||||
self.heartbeats.insert(i, send);
|
||||
self.heartbeat_index += 1;
|
||||
(recv, i)
|
||||
}
|
||||
|
||||
fn remove_heartbeat(&mut self, receiver: HeartbeatReceiver) {
|
||||
if let Some(send) = self.heartbeats.remove(&receiver.i) {
|
||||
_ = Heartbeat::from_recv_send(receiver, send);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OwnedHeartbeatReceiver {
|
||||
list: HeartbeatList,
|
||||
receiver: ManuallyDrop<HeartbeatReceiver>,
|
||||
}
|
||||
|
||||
impl Deref for OwnedHeartbeatReceiver {
|
||||
type Target = HeartbeatReceiver;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.receiver
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for OwnedHeartbeatReceiver {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY:
|
||||
// `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads.
|
||||
unsafe {
|
||||
let receiver = ManuallyDrop::take(&mut self.receiver);
|
||||
self.list.inner.lock().remove_heartbeat(receiver);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Heartbeat {
|
||||
ptr: NonNull<AtomicBool>,
|
||||
i: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct HeartbeatReceiver {
|
||||
ptr: NonNull<AtomicBool>,
|
||||
i: u64,
|
||||
}
|
||||
|
||||
unsafe impl Send for HeartbeatReceiver {}
|
||||
|
||||
impl Drop for Heartbeat {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY:
|
||||
// `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads.
|
||||
unsafe {
|
||||
let _ = Box::from_raw(self.ptr.as_ptr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct HeartbeatSender {
|
||||
ptr: NonNull<AtomicBool>,
|
||||
pub last_heartbeat: Instant,
|
||||
}
|
||||
|
||||
unsafe impl Send for HeartbeatSender {}
|
||||
|
||||
impl Heartbeat {
|
||||
pub fn new(i: u64) -> Heartbeat {
|
||||
// SAFETY:
|
||||
// `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads.
|
||||
let ptr = NonNull::new(Box::into_raw(Box::new(AtomicBool::new(true)))).unwrap();
|
||||
Self { ptr, i }
|
||||
}
|
||||
|
||||
pub fn from_recv_send(recv: HeartbeatReceiver, send: HeartbeatSender) -> Heartbeat {
|
||||
// SAFETY:
|
||||
// `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads.
|
||||
_ = send;
|
||||
let ptr = recv.ptr;
|
||||
let i = recv.i;
|
||||
Heartbeat { ptr, i }
|
||||
}
|
||||
|
||||
pub fn into_recv_send(self) -> (HeartbeatReceiver, HeartbeatSender, u64) {
|
||||
let Self { ptr, i } = self;
|
||||
(
|
||||
HeartbeatReceiver { ptr, i },
|
||||
HeartbeatSender {
|
||||
ptr,
|
||||
last_heartbeat: Instant::now(),
|
||||
},
|
||||
i,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl HeartbeatReceiver {
|
||||
pub fn take(&self) -> bool {
|
||||
unsafe {
|
||||
// SAFETY:
|
||||
// `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads.
|
||||
self.ptr.as_ref().swap(false, Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn id(&self) -> usize {
|
||||
self.ptr.as_ptr() as usize
|
||||
}
|
||||
|
||||
pub fn index(&self) -> u64 {
|
||||
self.i
|
||||
}
|
||||
}
|
||||
|
||||
impl HeartbeatSender {
|
||||
pub fn set(&mut self) {
|
||||
// SAFETY:
|
||||
// `AtomicBool` is `Sync` and `Send`, so it can be safely shared between threads.
|
||||
unsafe { self.ptr.as_ref().store(true, Ordering::Relaxed) };
|
||||
self.last_heartbeat = Instant::now();
|
||||
}
|
||||
}
|
|
@ -13,6 +13,7 @@
|
|||
extern crate alloc;
|
||||
|
||||
mod context;
|
||||
mod heartbeat;
|
||||
mod job;
|
||||
mod join;
|
||||
mod latch;
|
||||
|
|
Loading…
Reference in a new issue