diff --git a/Cargo.toml b/Cargo.toml index 212548b..16f0313 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ debug = true futures = "0.3" rayon = "1.10" bevy_tasks = "0.15.1" -parking_lot = {version = "0.12.3"} +parking_lot = {version = "0.12.3", features = ["deadlock_detection"]} thread_local = "1.1.8" crossbeam = "0.8.4" st3 = "0.4" diff --git a/benches/join.rs b/benches/join.rs index 4d2a63b..94b31ab 100644 --- a/benches/join.rs +++ b/benches/join.rs @@ -111,7 +111,7 @@ fn join_melange(b: &mut Bencher) { #[bench] fn join_praetor(b: &mut Bencher) { use executor::praetor::Scope; - let pool = executor::praetor::ThreadPool::new(); + let pool = executor::praetor::ThreadPool::global(); let tree = tree::Tree::new(TREE_SIZE, 1u32); diff --git a/src/praetor/mod.rs b/src/praetor/mod.rs index 064c567..84f3e83 100644 --- a/src/praetor/mod.rs +++ b/src/praetor/mod.rs @@ -43,7 +43,7 @@ mod util { assert!(core::mem::align_of::().ilog2() as usize >= BITS); let mask = Self::mask(); Self( - AtomicPtr::new(ptr.with_addr(ptr.addr() | tag & mask).cast()), + AtomicPtr::new(ptr.with_addr((ptr.addr() & !mask) | (tag & mask)).cast()), PhantomData, ) } @@ -801,11 +801,13 @@ thread_local! { } impl Scope { + /// locks shared context fn new() -> Self { let context = Context::global().clone(); Self::new_in(context) } + /// locks shared context fn new_in(context: Arc) -> Self { let (heartbeat, index) = context.shared.lock().new_heartbeat(); @@ -828,14 +830,14 @@ impl Scope { _ = Box::from_raw(Self::unset_current().unwrap().as_ptr()); Self::set_current(old.cast_const()); }))); - let current = Box::into_raw(Box::new(Self::new())); + let current = Box::into_raw(Box::new(Self::new_in(ctx))); unsafe { Self::set_current(current.cast_const()); &*current } } None => { - let current = Box::into_raw(Box::new(Self::new())); + let current = Box::into_raw(Box::new(Self::new_in(ctx))); guard = Some(DropGuard::new(Box::new(|| unsafe { _ = Box::from_raw(Self::unset_current().unwrap().as_ptr()); @@ -855,23 +857,7 @@ impl Scope { } pub fn with T>(f: F) -> T { - let mut guard = None; - - let current = Self::current_ref().unwrap_or_else(|| { - let current = Box::into_raw(Box::new(Self::new())); - - guard = Some(DropGuard::new(|| unsafe { - _ = Box::from_raw(Self::unset_current().unwrap().as_ptr()); - })); - - unsafe { - Self::set_current(current.cast_const()); - - &*current - } - }); - - f(current) + Self::with_in(Context::global().clone(), f) } unsafe fn set_current(scope: *const Scope) { @@ -1008,7 +994,7 @@ impl Scope { .map(|(_, job)| job) }) else { - // no more jobs, sleep instead + eprintln!("no more jobs, sleep instead"); break; }; @@ -1047,6 +1033,12 @@ impl ThreadPool { } } + pub fn global() -> ThreadPool { + ThreadPool { + context: Context::global().clone(), + } + } + pub fn scope T>(&self, f: F) -> T { Scope::with_in(self.context.clone(), f) } @@ -1095,6 +1087,8 @@ impl Context { shared_job: Condvar::new(), }); + eprintln!("created threadpool {:?}", Arc::as_ptr(&this)); + let num_threads = available_parallelism(); let barrier = Arc::new(std::sync::Barrier::new(num_threads + 1)); @@ -1107,6 +1101,32 @@ impl Context { let ctx = this.clone(); std::thread::spawn(|| heartbeat_worker(ctx)); + // { + // // only for #[cfg] + // use parking_lot::deadlock; + // use std::thread; + // use std::time::Duration; + + // // Create a background thread which checks for deadlocks every 10s + // thread::spawn(move || loop { + // thread::sleep(Duration::from_secs(1)); + // let deadlocks = deadlock::check_deadlock(); + // if deadlocks.is_empty() { + // println!("no deadlocks detected"); + // continue; + // } + + // println!("{} deadlocks detected", deadlocks.len()); + // for (i, threads) in deadlocks.iter().enumerate() { + // println!("Deadlock #{}", i); + // for t in threads { + // println!("Thread Id {:#?}", t.thread_id()); + // println!("{:#?}", t.backtrace()); + // } + // } + // }); + // } // only for #[cfg] + barrier.wait(); this @@ -1137,8 +1157,8 @@ fn worker(ctx: Arc, barrier: Arc) { barrier.wait(); + let mut job = ctx.shared.lock().jobs.pop_first(); loop { - let job = ctx.shared.lock().jobs.pop_first(); if let Some((_, job)) = job { eprintln!("worker(): found job {job:?}"); unsafe { @@ -1150,7 +1170,9 @@ fn worker(ctx: Arc, barrier: Arc) { if guard.should_stop { break; } + ctx.shared_job.wait(&mut guard); + job = guard.jobs.pop_first(); } } @@ -1176,6 +1198,8 @@ fn heartbeat_worker(ctx: Arc) { }); let num_heartbeats = guard.heartbeats.len(); + drop(guard); + if i >= num_heartbeats { i = 0; } else {