diff --git a/Cargo.toml b/Cargo.toml index 649e93e..212548b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,9 @@ prefer-local = [] never-local = [] +[profile.bench] +debug = true + [dependencies] futures = "0.3" diff --git a/benches/join.rs b/benches/join.rs index 0b3db14..fc200c6 100644 --- a/benches/join.rs +++ b/benches/join.rs @@ -5,7 +5,9 @@ use std::{ time::Duration, }; -use bevy_tasks::available_parallelism; +fn available_parallelism() -> usize { + bevy_tasks::available_parallelism().max(4) +} use executor::{self}; use test::Bencher; use tree::Node; @@ -71,7 +73,7 @@ const PRIMES: &'static [usize] = &[ ]; const REPEAT: usize = 0x800; -const TREE_SIZE: usize = 14; +const TREE_SIZE: usize = 16; #[bench] fn join_melange(b: &mut Bencher) { diff --git a/src/job/v2.rs b/src/job/v2.rs index 2f37b75..e86cda2 100644 --- a/src/job/v2.rs +++ b/src/job/v2.rs @@ -232,6 +232,12 @@ impl crate::latch::Probe for &Job { } } +impl crate::latch::Probe for Job { + fn probe(&self) -> bool { + self.state() == JobState::Finished as u8 + } +} + pub struct StackJob { f: UnsafeCell>, } diff --git a/src/lib.rs b/src/lib.rs index fdf68fd..c3db87e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,9 @@ -#![feature(vec_deque_pop_if)] +#![feature( + vec_deque_pop_if, + unsafe_cell_access, + debug_closure_helpers, + let_chains +)] use std::{ cell::{Cell, UnsafeCell}, @@ -1554,294 +1559,294 @@ mod scope { unsafe impl Send for SendPtr {} } -#[cfg(test)] -mod tests { - use std::{cell::Cell, hint::black_box, time::Instant}; +// #[cfg(test)] +// mod tests { +// use std::{cell::Cell, hint::black_box, time::Instant}; - use tracing::info; +// use tracing::info; - use super::*; +// use super::*; - mod tree { +// mod tree { - pub struct Tree { - nodes: Box<[Node]>, - root: Option, - } - pub struct Node { - pub leaf: T, - pub left: Option, - pub right: Option, - } +// pub struct Tree { +// nodes: Box<[Node]>, +// root: Option, +// } +// pub struct Node { +// pub leaf: T, +// pub left: Option, +// pub right: Option, +// } - impl Tree { - pub fn new(depth: usize, t: T) -> Tree - where - T: Copy, - { - let mut nodes = Vec::with_capacity((0..depth).sum()); - let root = Self::build_node(&mut nodes, depth, t); - Self { - nodes: nodes.into_boxed_slice(), - root: Some(root), - } - } +// impl Tree { +// pub fn new(depth: usize, t: T) -> Tree +// where +// T: Copy, +// { +// let mut nodes = Vec::with_capacity((0..depth).sum()); +// let root = Self::build_node(&mut nodes, depth, t); +// Self { +// nodes: nodes.into_boxed_slice(), +// root: Some(root), +// } +// } - pub fn root(&self) -> Option { - self.root - } +// pub fn root(&self) -> Option { +// self.root +// } - pub fn get(&self, index: usize) -> &Node { - &self.nodes[index] - } +// pub fn get(&self, index: usize) -> &Node { +// &self.nodes[index] +// } - pub fn build_node(nodes: &mut Vec>, depth: usize, t: T) -> usize - where - T: Copy, - { - let node = Node { - leaf: t, - left: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)), - right: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)), - }; - nodes.push(node); - nodes.len() - 1 - } - } - } +// pub fn build_node(nodes: &mut Vec>, depth: usize, t: T) -> usize +// where +// T: Copy, +// { +// let node = Node { +// leaf: t, +// left: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)), +// right: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)), +// }; +// nodes.push(node); +// nodes.len() - 1 +// } +// } +// } - const PRIMES: &'static [usize] = &[ - 1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, 1277, 1279, 1283, - 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321, 1327, 1361, 1367, 1373, 1381, 1399, 1409, - 1423, 1427, 1429, 1433, 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493, - 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, 1583, 1597, 1601, 1607, - 1609, 1613, 1619, 1621, 1627, 1637, 1657, 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, - 1723, 1733, 1741, 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, 1847, - 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, - ]; +// const PRIMES: &'static [usize] = &[ +// 1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, 1277, 1279, 1283, +// 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321, 1327, 1361, 1367, 1373, 1381, 1399, 1409, +// 1423, 1427, 1429, 1433, 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493, +// 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, 1583, 1597, 1601, 1607, +// 1609, 1613, 1619, 1621, 1627, 1637, 1657, 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, +// 1723, 1733, 1741, 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, 1847, +// 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907, +// ]; - #[cfg(feature = "spin-slow")] - const REPEAT: usize = 0x800; - #[cfg(not(feature = "spin-slow"))] - const REPEAT: usize = 0x8000; +// #[cfg(feature = "spin-slow")] +// const REPEAT: usize = 0x800; +// #[cfg(not(feature = "spin-slow"))] +// const REPEAT: usize = 0x8000; - const TREE_SIZE: usize = 10; +// const TREE_SIZE: usize = 10; - fn run_in_scope(pool: ThreadPool, f: impl FnOnce(&Scope<'_>) -> T + Send) -> T { - let pool = Box::new(pool); - let ptr = Box::into_raw(pool); +// fn run_in_scope(pool: ThreadPool, f: impl FnOnce(&Scope<'_>) -> T + Send) -> T { +// let pool = Box::new(pool); +// let ptr = Box::into_raw(pool); - let result = { - let pool: &'static ThreadPool = unsafe { &*ptr }; - // pool.ensure_one_worker(); - pool.resize_to_available(); - let now = std::time::Instant::now(); - let result = pool.scope(f); - let elapsed = now.elapsed().as_micros(); - info!("(mine) total time: {}ms", elapsed as f32 / 1e3); - pool.resize_to(0); - assert!(pool.global_queue.is_empty()); - result - }; +// let result = { +// let pool: &'static ThreadPool = unsafe { &*ptr }; +// // pool.ensure_one_worker(); +// pool.resize_to_available(); +// let now = std::time::Instant::now(); +// let result = pool.scope(f); +// let elapsed = now.elapsed().as_micros(); +// info!("(mine) total time: {}ms", elapsed as f32 / 1e3); +// pool.resize_to(0); +// assert!(pool.global_queue.is_empty()); +// result +// }; - let _pool = unsafe { Box::from_raw(ptr) }; - result - } +// let _pool = unsafe { Box::from_raw(ptr) }; +// result +// } - #[test] - #[tracing_test::traced_test] - fn rayon() { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(bevy_tasks::available_parallelism()) - .build() - .unwrap(); +// #[test] +// #[tracing_test::traced_test] +// fn rayon() { +// let pool = rayon::ThreadPoolBuilder::new() +// .num_threads(bevy_tasks::available_parallelism()) +// .build() +// .unwrap(); - let now = std::time::Instant::now(); - pool.scope(|s| { - for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { - s.spawn(move |_| { - black_box(spinning(p)); - }); - } - }); - let elapsed = now.elapsed().as_micros(); +// let now = std::time::Instant::now(); +// pool.scope(|s| { +// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { +// s.spawn(move |_| { +// black_box(spinning(p)); +// }); +// } +// }); +// let elapsed = now.elapsed().as_micros(); - info!("(rayon) total time: {}ms", elapsed as f32 / 1e3); - } +// info!("(rayon) total time: {}ms", elapsed as f32 / 1e3); +// } - #[test] - #[tracing_test::traced_test] - fn rayon_join() { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(bevy_tasks::available_parallelism()) - .build() - .unwrap(); +// #[test] +// #[tracing_test::traced_test] +// fn rayon_join() { +// let pool = rayon::ThreadPoolBuilder::new() +// .num_threads(bevy_tasks::available_parallelism()) +// .build() +// .unwrap(); - let tree = tree::Tree::new(TREE_SIZE, 1u32); +// let tree = tree::Tree::new(TREE_SIZE, 1u32); - fn sum(tree: &tree::Tree, node: usize) -> u32 { - let node = tree.get(node); - let (l, r) = rayon::join( - || node.left.map(|node| sum(tree, node)).unwrap_or_default(), - || node.right.map(|node| sum(tree, node)).unwrap_or_default(), - ); +// fn sum(tree: &tree::Tree, node: usize) -> u32 { +// let node = tree.get(node); +// let (l, r) = rayon::join( +// || node.left.map(|node| sum(tree, node)).unwrap_or_default(), +// || node.right.map(|node| sum(tree, node)).unwrap_or_default(), +// ); - node.leaf + l + r - } +// node.leaf + l + r +// } - let now = std::time::Instant::now(); - let sum = pool.scope(move |s| { - let root = tree.root().unwrap(); - sum(&tree, root) - }); +// let now = std::time::Instant::now(); +// let sum = pool.scope(move |s| { +// let root = tree.root().unwrap(); +// sum(&tree, root) +// }); - let elapsed = now.elapsed().as_micros(); +// let elapsed = now.elapsed().as_micros(); - info!("(rayon) {sum} total time: {}ms", elapsed as f32 / 1e3); - } +// info!("(rayon) {sum} total time: {}ms", elapsed as f32 / 1e3); +// } - #[test] - #[tracing_test::traced_test] - fn bevy_tasks() { - let pool = bevy_tasks::ComputeTaskPool::get_or_init(|| { - bevy_tasks::TaskPoolBuilder::new() - .num_threads(bevy_tasks::available_parallelism()) - .build() - }); +// #[test] +// #[tracing_test::traced_test] +// fn bevy_tasks() { +// let pool = bevy_tasks::ComputeTaskPool::get_or_init(|| { +// bevy_tasks::TaskPoolBuilder::new() +// .num_threads(bevy_tasks::available_parallelism()) +// .build() +// }); - let now = std::time::Instant::now(); - pool.scope(|s| { - for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { - s.spawn(async move { - black_box(spinning(p)); - }); - } - }); - let elapsed = now.elapsed().as_micros(); +// let now = std::time::Instant::now(); +// pool.scope(|s| { +// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { +// s.spawn(async move { +// black_box(spinning(p)); +// }); +// } +// }); +// let elapsed = now.elapsed().as_micros(); - info!("(bevy_tasks) total time: {}ms", elapsed as f32 / 1e3); - } +// info!("(bevy_tasks) total time: {}ms", elapsed as f32 / 1e3); +// } - #[test] - #[tracing_test::traced_test] - fn mine() { - std::thread_local! { - static WAIT_COUNT: Cell = const {Cell::new(0)}; - } - let counter = Arc::new(AtomicUsize::new(0)); - { - let pool = ThreadPool::new(); +// #[test] +// #[tracing_test::traced_test] +// fn mine() { +// std::thread_local! { +// static WAIT_COUNT: Cell = const {Cell::new(0)}; +// } +// let counter = Arc::new(AtomicUsize::new(0)); +// { +// let pool = ThreadPool::new(); - run_in_scope(pool, |s| { - for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { - s.spawn(move |_| { - black_box(spinning(p)); - }); - } - }); - }; +// run_in_scope(pool, |s| { +// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { +// s.spawn(move |_| { +// black_box(spinning(p)); +// }); +// } +// }); +// }; - // eprintln!("total wait count: {}", counter.load(Ordering::Acquire)); - } +// // eprintln!("total wait count: {}", counter.load(Ordering::Acquire)); +// } - #[test] - #[tracing_test::traced_test] - fn mine_join() { - let pool = ThreadPool::new(); +// #[test] +// #[tracing_test::traced_test] +// fn mine_join() { +// let pool = ThreadPool::new(); - let tree = tree::Tree::new(TREE_SIZE, 1u32); +// let tree = tree::Tree::new(TREE_SIZE, 1u32); - fn sum(tree: &tree::Tree, node: usize, scope: &Scope<'_>) -> u32 { - let node = tree.get(node); - let (l, r) = scope.join( - |s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(), - |s| { - node.right - .map(|node| sum(tree, node, s)) - .unwrap_or_default() - }, - ); +// fn sum(tree: &tree::Tree, node: usize, scope: &Scope<'_>) -> u32 { +// let node = tree.get(node); +// let (l, r) = scope.join( +// |s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(), +// |s| { +// node.right +// .map(|node| sum(tree, node, s)) +// .unwrap_or_default() +// }, +// ); - node.leaf + l + r - } +// node.leaf + l + r +// } - let sum = run_in_scope(pool, move |s| { - let root = tree.root().unwrap(); - sum(&tree, root, s) - }); - } +// let sum = run_in_scope(pool, move |s| { +// let root = tree.root().unwrap(); +// sum(&tree, root, s) +// }); +// } - #[test] - #[tracing_test::traced_test] - fn melange_join() { - let pool = melange::ThreadPool::new(bevy_tasks::available_parallelism()); +// #[test] +// #[tracing_test::traced_test] +// fn melange_join() { +// let pool = melange::ThreadPool::new(bevy_tasks::available_parallelism()); - let mut scope = pool.new_worker(); +// let mut scope = pool.new_worker(); - let tree = tree::Tree::new(TREE_SIZE, 1u32); +// let tree = tree::Tree::new(TREE_SIZE, 1u32); - fn sum(tree: &tree::Tree, node: usize, scope: &mut melange::WorkerThread) -> u32 { - let node = tree.get(node); - let (l, r) = scope.join( - |s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(), - |s| { - node.right - .map(|node| sum(tree, node, s)) - .unwrap_or_default() - }, - ); +// fn sum(tree: &tree::Tree, node: usize, scope: &mut melange::WorkerThread) -> u32 { +// let node = tree.get(node); +// let (l, r) = scope.join( +// |s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(), +// |s| { +// node.right +// .map(|node| sum(tree, node, s)) +// .unwrap_or_default() +// }, +// ); - node.leaf + l + r - } - let now = Instant::now(); - let res = sum(&tree, tree.root().unwrap(), &mut scope); - eprintln!( - "res: {res} took {}ms", - now.elapsed().as_micros() as f32 / 1e3 - ); - assert_ne!(res, 0); - } +// node.leaf + l + r +// } +// let now = Instant::now(); +// let res = sum(&tree, tree.root().unwrap(), &mut scope); +// eprintln!( +// "res: {res} took {}ms", +// now.elapsed().as_micros() as f32 / 1e3 +// ); +// assert_ne!(res, 0); +// } - #[test] - #[tracing_test::traced_test] - fn sync() { - let now = std::time::Instant::now(); - for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { - black_box(spinning(p)); - } - let elapsed = now.elapsed().as_micros(); +// #[test] +// #[tracing_test::traced_test] +// fn sync() { +// let now = std::time::Instant::now(); +// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() { +// black_box(spinning(p)); +// } +// let elapsed = now.elapsed().as_micros(); - info!("(sync) total time: {}ms", elapsed as f32 / 1e3); - } +// info!("(sync) total time: {}ms", elapsed as f32 / 1e3); +// } - #[inline] - fn spinning(i: usize) { - #[cfg(feature = "spin-slow")] - spinning_slow(i); - #[cfg(not(feature = "spin-slow"))] - spinning_fast(i); - } +// #[inline] +// fn spinning(i: usize) { +// #[cfg(feature = "spin-slow")] +// spinning_slow(i); +// #[cfg(not(feature = "spin-slow"))] +// spinning_fast(i); +// } - #[inline] - fn spinning_slow(i: usize) { - let rng = rng::XorShift64Star::new(i as u64); - (0..i).reduce(|a, b| { - black_box({ - let a = rng.next_usize(a.max(1)); - ((b as f32).exp() * (a as f32).sin().cbrt()).to_bits() as usize - }) - }); - } +// #[inline] +// fn spinning_slow(i: usize) { +// let rng = rng::XorShift64Star::new(i as u64); +// (0..i).reduce(|a, b| { +// black_box({ +// let a = rng.next_usize(a.max(1)); +// ((b as f32).exp() * (a as f32).sin().cbrt()).to_bits() as usize +// }) +// }); +// } - #[inline] - fn spinning_fast(i: usize) { - let rng = rng::XorShift64Star::new(i as u64); - //(0..rng.next_usize(i)).reduce(|a, b| { - (0..20).reduce(|a, b| { - black_box({ - let a = rng.next_usize(a.max(1)); - a ^ b - }) - }); - } -} +// #[inline] +// fn spinning_fast(i: usize) { +// let rng = rng::XorShift64Star::new(i as u64); +// //(0..rng.next_usize(i)).reduce(|a, b| { +// (0..20).reduce(|a, b| { +// black_box({ +// let a = rng.next_usize(a.max(1)); +// a ^ b +// }) +// }); +// } +// } diff --git a/src/melange.rs b/src/melange.rs index b802f43..129e7a6 100644 --- a/src/melange.rs +++ b/src/melange.rs @@ -14,6 +14,7 @@ use std::{ use crossbeam::utils::CachePadded; use parking_lot::{Condvar, Mutex}; +use parking_lot_core::SpinWait; use crate::{latch::*, ThreadControl}; mod job { @@ -417,17 +418,28 @@ impl WorkerThread { fn worker(mut self) { self.control().notify_running(); - loop { - let task = { self.shared().lock().pop_first_task() }; + 'outer: loop { + // inner look runs until no shared tasks exist. + loop { + if self.control().should_terminate.probe() { + break 'outer; + } - if let Some(task) = task { - self.execute_job(task); + let task = { self.shared().lock().pop_first_task() }; + + if let Some(task) = task { + self.execute_job(task); + } else { + break; + } } - if self.control().should_terminate.probe() { - break; - } + // signal heartbeat thread that we would really like another task + //self.ctx().heartbeat_control.wake(); + // spin here maybe? + + // wait to be signaled since no more shared tasks exist. let mut guard = self.shared().lock(); self.ctx().task_shared.wait(&mut guard); } @@ -436,17 +448,28 @@ impl WorkerThread { } fn execute_job(&mut self, job: NonNull) { + self.heartbeat(); unsafe { job.as_ref().execute(self); } } + #[inline] + fn heartbeat(&mut self) { + if self.heartbeat.load(Ordering::Relaxed) { + self.heartbeat_cold(); + } + } + #[cold] fn heartbeat_cold(&mut self) { let mut guard = self.context.shared.lock(); if guard.shared_tasks[self.index].is_none() { if let Some(task) = self.queue.pop_front() { + unsafe { + task.as_ref().set_pending(); + } guard.shared_tasks[self.index] = Some(task); self.context.task_shared.notify_one(); } @@ -494,7 +517,7 @@ impl WorkerThread { (ra, rb) } - fn join_heartbeat(&mut self, a: A, b: B) -> (RA, RB) + pub fn join_heartbeat(&mut self, a: A, b: B) -> (RA, RB) where A: FnOnce(&mut WorkerThread) -> RA + Send, B: FnOnce(&mut WorkerThread) -> RB + Send, @@ -503,27 +526,31 @@ impl WorkerThread { { let b = StackJob::new(b); - let job = Box::new(b.as_job()); + let job = Box::into_raw(Box::new(b.as_job())); + let job_ref = unsafe { &*job }; + // let job = Box::new(b.as_job()); self.queue - .push_back(unsafe { NonNull::new_unchecked(&job as *const _ as *mut _) }); - let job = unsafe { job.cast_box::() }; + .push_back(unsafe { NonNull::new_unchecked(job as *mut _) }); let ra = a(self); - let rb = if job.state() == JobState::Empty as u8 { - self.pop_job_id(unsafe { job.as_ref().cast() }); - unsafe { b.unwrap()(self) } - } else { - self.run_until(&job.as_ref()); - job.wait() - }; + let rb = + if job_ref.state() == JobState::Empty as u8 && self.pop_job_ptr(job.cast()).is_some() { + unsafe { b.unwrap()(self) } + } else { + self.run_until(job_ref); + job_ref.wait() + }; + let _job = unsafe { Box::from_raw(job) }; (ra, rb) } - fn pop_job_id(&mut self, id: &Job) -> Option<&Job<()>> { + fn pop_job_ptr(&mut self, id: *const Job) -> Option<&Job<()>> { self.queue - .pop_back_if(|job| unsafe { (&*job).as_ref().id() == id.id() }) + .iter() + .rposition(|job| job.as_ptr() == id.cast_mut()) + .and_then(|i| self.queue.remove(i)) .map(|job| unsafe { job.as_ref() }) } @@ -536,6 +563,7 @@ impl WorkerThread { #[cold] fn run_until_cold(&mut self, latch: &L) { let job = self.shared().lock().shared_tasks[self.index].take(); + if let Some(job) = job { self.execute_job(job); } @@ -617,7 +645,9 @@ impl Context { }; if let Some(duration) = sleep_for { - thread::sleep(duration); + self.heartbeat_control + .wait_for_should_wake_timeout(duration); + // thread::sleep(duration); } } }