diff --git a/Cargo.toml b/Cargo.toml index 16f0313..23c44e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,13 +14,14 @@ never-local = [] [profile.bench] debug = true +# opt-level = 0 [dependencies] futures = "0.3" rayon = "1.10" bevy_tasks = "0.15.1" -parking_lot = {version = "0.12.3", features = ["deadlock_detection"]} +parking_lot = {version = "0.12.3"} thread_local = "1.1.8" crossbeam = "0.8.4" st3 = "0.4" diff --git a/src/praetor/mod.rs b/src/praetor/mod.rs index 84f3e83..e8fd850 100644 --- a/src/praetor/mod.rs +++ b/src/praetor/mod.rs @@ -101,7 +101,7 @@ mod util { let mask = Self::mask(); loop { let ptr = self.0.load(failure); - let new = ptr.with_addr(ptr.addr() | (tag & mask)); + let new = ptr.with_addr((ptr.addr() & !mask) | (tag & mask)); if self .0 .compare_exchange_weak(ptr, new, success, failure) @@ -433,7 +433,7 @@ mod job { val_or_this: UnsafeCell>, /// (prev,next) before execute(), Box<...> after err_or_link: UnsafeCell>, - phantom: PhantomPinned, + _phantom: PhantomPinned, } impl Debug for Job { @@ -501,7 +501,7 @@ mod job { next: None, }, }), - phantom: PhantomPinned, + _phantom: PhantomPinned, } } pub fn empty() -> Job { @@ -519,7 +519,7 @@ mod job { next: None, }, }), - phantom: PhantomPinned, + _phantom: PhantomPinned, } } @@ -562,13 +562,11 @@ mod job { Ordering::Release, Ordering::Relaxed, ); - eprintln!("wait({:?}): eepy", self as *const _); + std::thread::park(); spin.reset(); - eprintln!("wait({:?}): woken", self as *const _); // after sleeping, state should be `Finished` - continue; } Err(state) => { assert_ne!(state, JobState::Pending as usize); @@ -589,11 +587,11 @@ mod job { return result; } else { // spin until lock is released. - eprintln!( - "wait({:?}): spinning ({:?})", - self as *const _, - JobState::from_u8(state as u8).unwrap() - ); + // eprintln!( + // "wait({:?}): spinning ({:?})", + // self as *const _, + // JobState::from_u8(state as u8).unwrap() + // ); spin.spin(); } } @@ -638,13 +636,11 @@ mod job { let harness: unsafe fn(*const (), *const Self) = mem::transmute(ptr.as_ptr()); let this = (*self.val_or_this.get()).this; - eprintln!("{harness:?}({this:?}, {:?})", self as *const Self); harness(this.as_ptr().cast(), (self as *const Self).cast()); } } fn complete(&self, result: std::thread::Result) { - eprintln!("complete({:?}) {:#?}", self as *const _, self); let mut spin = SpinWait::new(); loop { match self.harness_and_state.compare_exchange_weak_tag( @@ -659,11 +655,6 @@ mod job { } Err(state) => { assert_ne!(state, JobState::Pending as usize); - // eprintln!( - // "complete(): spin waiting for lock to complete with {:?}: ({:?})", - // result.as_ref().map(|_| ()), - // JobState::from_u8(state as u8).unwrap() - // ); spin.spin(); } } @@ -691,7 +682,6 @@ mod job { Ordering::Release, Ordering::Relaxed, ); - eprintln!("complete({:?}): finished", self as *const _); } } @@ -773,7 +763,7 @@ mod job { } use std::{ - cell::UnsafeCell, + cell::{Cell, UnsafeCell}, collections::BTreeMap, mem, pin::{pin, Pin}, @@ -790,6 +780,7 @@ use parking_lot::{Condvar, Mutex}; use util::DropGuard; pub struct Scope { + join_count: Cell, context: Arc, index: usize, heartbeat: Arc, @@ -815,6 +806,7 @@ impl Scope { context, index, heartbeat, + join_count: Cell::new(0), queue: UnsafeCell::new(JobList::new()), } } @@ -896,6 +888,43 @@ impl Scope { } pub fn join(&self, a: A, b: B) -> (RA, RB) + where + RA: Send, + RB: Send, + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + { + self.join_heartbeat_every::<_, _, _, _, 64>(a, b) + } + + pub fn join_seq(&self, a: A, b: B) -> (RA, RB) + where + RA: Send, + RB: Send, + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + { + (a(), b()) + } + + pub fn join_heartbeat_every(&self, a: A, b: B) -> (RA, RB) + where + RA: Send, + RB: Send, + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + { + let count = self.join_count.get(); + self.join_count.set(count.wrapping_add(1) % TIMES); + + if count == 1 { + self.join_heartbeat(a, b) + } else { + self.join_seq(a, b) + } + } + + pub fn join_heartbeat(&self, a: A, b: B) -> (RA, RB) where RA: Send, RB: Send, @@ -926,6 +955,7 @@ impl Scope { } }; + drop(b); (ra, rb) } @@ -938,21 +968,18 @@ impl Scope { #[inline] fn execute(&self, job: &Job) { - eprintln!("execute()"); self.tick(); job.execute(); } #[cold] fn heartbeat_cold(&self) { - eprintln!("heartbeat_cold()"); let mut guard = self.context.shared.lock(); if !guard.jobs.contains_key(&self.index) { if let Some(job) = self.pop_back() { unsafe { job.as_ref().set_pending(); - eprintln!("sharing {job:?} {:#?}", job.as_ref()); } guard.jobs.insert(self.index, job); self.context.shared_job.notify_one(); @@ -964,37 +991,32 @@ impl Scope { #[cold] pub fn wait_until(&self, job: Pin<&Job>) -> Option> { - // let shared_job = self.context.shared.lock().jobs.remove(&self.index); + let shared_job = self.context.shared.lock().jobs.remove(&self.index); - // if let Some(ptr) = shared_job { - // if ptr.as_ptr() == job as *const _ as *mut _ { - // eprintln!("reclaimed shared job"); - // return None; - // } else { - // unsafe { - // self.execute(ptr.as_ref()); - // } - // } - // } + if let Some(ptr) = shared_job { + if ptr.as_ptr() == &*job as *const _ as *mut _ { + return None; + } else { + unsafe { + self.execute(ptr.as_ref()); + } + } + } while job.state() != JobState::Finished as u8 { - let Some(job) = - // self - // .pop_front() - // .inspect(|job| unsafe { - // job.as_ref().set_pending(); - // }) - None + let Some(job) = self + .context + .shared + .lock() + .jobs + .pop_first() + .map(|(_, job)| job) .or_else(|| { - self.context - .shared - .lock() - .jobs - .pop_first() - .map(|(_, job)| job) + self.pop_front().inspect(|job| unsafe { + job.as_ref().set_pending(); + }) }) else { - eprintln!("no more jobs, sleep instead"); break; }; @@ -1014,7 +1036,7 @@ where A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, { - Scope::with(|scope| scope.join(a, b)) + Scope::with(|scope| scope.join_heartbeat(a, b)) } struct Heartbeat { @@ -1090,6 +1112,7 @@ impl Context { eprintln!("created threadpool {:?}", Arc::as_ptr(&this)); let num_threads = available_parallelism(); + // let num_threads = 2; let barrier = Arc::new(std::sync::Barrier::new(num_threads + 1)); for _ in 0..num_threads { @@ -1101,32 +1124,6 @@ impl Context { let ctx = this.clone(); std::thread::spawn(|| heartbeat_worker(ctx)); - // { - // // only for #[cfg] - // use parking_lot::deadlock; - // use std::thread; - // use std::time::Duration; - - // // Create a background thread which checks for deadlocks every 10s - // thread::spawn(move || loop { - // thread::sleep(Duration::from_secs(1)); - // let deadlocks = deadlock::check_deadlock(); - // if deadlocks.is_empty() { - // println!("no deadlocks detected"); - // continue; - // } - - // println!("{} deadlocks detected", deadlocks.len()); - // for (i, threads) in deadlocks.iter().enumerate() { - // println!("Deadlock #{}", i); - // for t in threads { - // println!("Thread Id {:#?}", t.thread_id()); - // println!("{:#?}", t.backtrace()); - // } - // } - // }); - // } // only for #[cfg] - barrier.wait(); this @@ -1160,7 +1157,6 @@ fn worker(ctx: Arc, barrier: Arc) { let mut job = ctx.shared.lock().jobs.pop_first(); loop { if let Some((_, job)) = job { - eprintln!("worker(): found job {job:?}"); unsafe { scope.execute(job.as_ref()); }