This commit is contained in:
Janis 2025-02-20 17:44:10 +01:00
parent 36c32c4dd6
commit bfa4a34f54
3 changed files with 48 additions and 24 deletions

View file

@ -20,7 +20,7 @@ debug = true
futures = "0.3" futures = "0.3"
rayon = "1.10" rayon = "1.10"
bevy_tasks = "0.15.1" bevy_tasks = "0.15.1"
parking_lot = {version = "0.12.3"} parking_lot = {version = "0.12.3", features = ["deadlock_detection"]}
thread_local = "1.1.8" thread_local = "1.1.8"
crossbeam = "0.8.4" crossbeam = "0.8.4"
st3 = "0.4" st3 = "0.4"

View file

@ -111,7 +111,7 @@ fn join_melange(b: &mut Bencher) {
#[bench] #[bench]
fn join_praetor(b: &mut Bencher) { fn join_praetor(b: &mut Bencher) {
use executor::praetor::Scope; use executor::praetor::Scope;
let pool = executor::praetor::ThreadPool::new(); let pool = executor::praetor::ThreadPool::global();
let tree = tree::Tree::new(TREE_SIZE, 1u32); let tree = tree::Tree::new(TREE_SIZE, 1u32);

View file

@ -43,7 +43,7 @@ mod util {
assert!(core::mem::align_of::<T>().ilog2() as usize >= BITS); assert!(core::mem::align_of::<T>().ilog2() as usize >= BITS);
let mask = Self::mask(); let mask = Self::mask();
Self( Self(
AtomicPtr::new(ptr.with_addr(ptr.addr() | tag & mask).cast()), AtomicPtr::new(ptr.with_addr((ptr.addr() & !mask) | (tag & mask)).cast()),
PhantomData, PhantomData,
) )
} }
@ -801,11 +801,13 @@ thread_local! {
} }
impl Scope { impl Scope {
/// locks shared context
fn new() -> Self { fn new() -> Self {
let context = Context::global().clone(); let context = Context::global().clone();
Self::new_in(context) Self::new_in(context)
} }
/// locks shared context
fn new_in(context: Arc<Context>) -> Self { fn new_in(context: Arc<Context>) -> Self {
let (heartbeat, index) = context.shared.lock().new_heartbeat(); let (heartbeat, index) = context.shared.lock().new_heartbeat();
@ -828,14 +830,14 @@ impl Scope {
_ = Box::from_raw(Self::unset_current().unwrap().as_ptr()); _ = Box::from_raw(Self::unset_current().unwrap().as_ptr());
Self::set_current(old.cast_const()); Self::set_current(old.cast_const());
}))); })));
let current = Box::into_raw(Box::new(Self::new())); let current = Box::into_raw(Box::new(Self::new_in(ctx)));
unsafe { unsafe {
Self::set_current(current.cast_const()); Self::set_current(current.cast_const());
&*current &*current
} }
} }
None => { None => {
let current = Box::into_raw(Box::new(Self::new())); let current = Box::into_raw(Box::new(Self::new_in(ctx)));
guard = Some(DropGuard::new(Box::new(|| unsafe { guard = Some(DropGuard::new(Box::new(|| unsafe {
_ = Box::from_raw(Self::unset_current().unwrap().as_ptr()); _ = Box::from_raw(Self::unset_current().unwrap().as_ptr());
@ -855,23 +857,7 @@ impl Scope {
} }
pub fn with<T, F: FnOnce(&Scope) -> T>(f: F) -> T { pub fn with<T, F: FnOnce(&Scope) -> T>(f: F) -> T {
let mut guard = None; Self::with_in(Context::global().clone(), f)
let current = Self::current_ref().unwrap_or_else(|| {
let current = Box::into_raw(Box::new(Self::new()));
guard = Some(DropGuard::new(|| unsafe {
_ = Box::from_raw(Self::unset_current().unwrap().as_ptr());
}));
unsafe {
Self::set_current(current.cast_const());
&*current
}
});
f(current)
} }
unsafe fn set_current(scope: *const Scope) { unsafe fn set_current(scope: *const Scope) {
@ -1008,7 +994,7 @@ impl Scope {
.map(|(_, job)| job) .map(|(_, job)| job)
}) })
else { else {
// no more jobs, sleep instead eprintln!("no more jobs, sleep instead");
break; break;
}; };
@ -1047,6 +1033,12 @@ impl ThreadPool {
} }
} }
pub fn global() -> ThreadPool {
ThreadPool {
context: Context::global().clone(),
}
}
pub fn scope<T, F: FnOnce(&Scope) -> T>(&self, f: F) -> T { pub fn scope<T, F: FnOnce(&Scope) -> T>(&self, f: F) -> T {
Scope::with_in(self.context.clone(), f) Scope::with_in(self.context.clone(), f)
} }
@ -1095,6 +1087,8 @@ impl Context {
shared_job: Condvar::new(), shared_job: Condvar::new(),
}); });
eprintln!("created threadpool {:?}", Arc::as_ptr(&this));
let num_threads = available_parallelism(); let num_threads = available_parallelism();
let barrier = Arc::new(std::sync::Barrier::new(num_threads + 1)); let barrier = Arc::new(std::sync::Barrier::new(num_threads + 1));
@ -1107,6 +1101,32 @@ impl Context {
let ctx = this.clone(); let ctx = this.clone();
std::thread::spawn(|| heartbeat_worker(ctx)); std::thread::spawn(|| heartbeat_worker(ctx));
// {
// // only for #[cfg]
// use parking_lot::deadlock;
// use std::thread;
// use std::time::Duration;
// // Create a background thread which checks for deadlocks every 10s
// thread::spawn(move || loop {
// thread::sleep(Duration::from_secs(1));
// let deadlocks = deadlock::check_deadlock();
// if deadlocks.is_empty() {
// println!("no deadlocks detected");
// continue;
// }
// println!("{} deadlocks detected", deadlocks.len());
// for (i, threads) in deadlocks.iter().enumerate() {
// println!("Deadlock #{}", i);
// for t in threads {
// println!("Thread Id {:#?}", t.thread_id());
// println!("{:#?}", t.backtrace());
// }
// }
// });
// } // only for #[cfg]
barrier.wait(); barrier.wait();
this this
@ -1137,8 +1157,8 @@ fn worker(ctx: Arc<Context>, barrier: Arc<std::sync::Barrier>) {
barrier.wait(); barrier.wait();
let mut job = ctx.shared.lock().jobs.pop_first();
loop { loop {
let job = ctx.shared.lock().jobs.pop_first();
if let Some((_, job)) = job { if let Some((_, job)) = job {
eprintln!("worker(): found job {job:?}"); eprintln!("worker(): found job {job:?}");
unsafe { unsafe {
@ -1150,7 +1170,9 @@ fn worker(ctx: Arc<Context>, barrier: Arc<std::sync::Barrier>) {
if guard.should_stop { if guard.should_stop {
break; break;
} }
ctx.shared_job.wait(&mut guard); ctx.shared_job.wait(&mut guard);
job = guard.jobs.pop_first();
} }
} }
@ -1176,6 +1198,8 @@ fn heartbeat_worker(ctx: Arc<Context>) {
}); });
let num_heartbeats = guard.heartbeats.len(); let num_heartbeats = guard.heartbeats.len();
drop(guard);
if i >= num_heartbeats { if i >= num_heartbeats {
i = 0; i = 0;
} else { } else {