This commit is contained in:
Janis 2025-02-20 15:20:37 +01:00
parent 5c7f1345c4
commit 5547bf7df7
5 changed files with 316 additions and 270 deletions

View file

@ -12,6 +12,9 @@ prefer-local = []
never-local = []
[profile.bench]
debug = true
[dependencies]
futures = "0.3"

View file

@ -5,7 +5,9 @@ use std::{
time::Duration,
};
use bevy_tasks::available_parallelism;
fn available_parallelism() -> usize {
bevy_tasks::available_parallelism().max(4)
}
use executor::{self};
use test::Bencher;
use tree::Node;
@ -71,7 +73,7 @@ const PRIMES: &'static [usize] = &[
];
const REPEAT: usize = 0x800;
const TREE_SIZE: usize = 14;
const TREE_SIZE: usize = 16;
#[bench]
fn join_melange(b: &mut Bencher) {

View file

@ -232,6 +232,12 @@ impl<T, S> crate::latch::Probe for &Job<T, S> {
}
}
impl<T, S> crate::latch::Probe for Job<T, S> {
fn probe(&self) -> bool {
self.state() == JobState::Finished as u8
}
}
pub struct StackJob<F> {
f: UnsafeCell<ManuallyDrop<F>>,
}

View file

@ -1,4 +1,9 @@
#![feature(vec_deque_pop_if)]
#![feature(
vec_deque_pop_if,
unsafe_cell_access,
debug_closure_helpers,
let_chains
)]
use std::{
cell::{Cell, UnsafeCell},
@ -1554,294 +1559,294 @@ mod scope {
unsafe impl<T> Send for SendPtr<T> {}
}
#[cfg(test)]
mod tests {
use std::{cell::Cell, hint::black_box, time::Instant};
// #[cfg(test)]
// mod tests {
// use std::{cell::Cell, hint::black_box, time::Instant};
use tracing::info;
// use tracing::info;
use super::*;
// use super::*;
mod tree {
// mod tree {
pub struct Tree<T> {
nodes: Box<[Node<T>]>,
root: Option<usize>,
}
pub struct Node<T> {
pub leaf: T,
pub left: Option<usize>,
pub right: Option<usize>,
}
// pub struct Tree<T> {
// nodes: Box<[Node<T>]>,
// root: Option<usize>,
// }
// pub struct Node<T> {
// pub leaf: T,
// pub left: Option<usize>,
// pub right: Option<usize>,
// }
impl<T> Tree<T> {
pub fn new(depth: usize, t: T) -> Tree<T>
where
T: Copy,
{
let mut nodes = Vec::with_capacity((0..depth).sum());
let root = Self::build_node(&mut nodes, depth, t);
Self {
nodes: nodes.into_boxed_slice(),
root: Some(root),
}
}
// impl<T> Tree<T> {
// pub fn new(depth: usize, t: T) -> Tree<T>
// where
// T: Copy,
// {
// let mut nodes = Vec::with_capacity((0..depth).sum());
// let root = Self::build_node(&mut nodes, depth, t);
// Self {
// nodes: nodes.into_boxed_slice(),
// root: Some(root),
// }
// }
pub fn root(&self) -> Option<usize> {
self.root
}
// pub fn root(&self) -> Option<usize> {
// self.root
// }
pub fn get(&self, index: usize) -> &Node<T> {
&self.nodes[index]
}
// pub fn get(&self, index: usize) -> &Node<T> {
// &self.nodes[index]
// }
pub fn build_node(nodes: &mut Vec<Node<T>>, depth: usize, t: T) -> usize
where
T: Copy,
{
let node = Node {
leaf: t,
left: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)),
right: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)),
};
nodes.push(node);
nodes.len() - 1
}
}
}
// pub fn build_node(nodes: &mut Vec<Node<T>>, depth: usize, t: T) -> usize
// where
// T: Copy,
// {
// let node = Node {
// leaf: t,
// left: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)),
// right: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)),
// };
// nodes.push(node);
// nodes.len() - 1
// }
// }
// }
const PRIMES: &'static [usize] = &[
1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, 1277, 1279, 1283,
1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321, 1327, 1361, 1367, 1373, 1381, 1399, 1409,
1423, 1427, 1429, 1433, 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493,
1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, 1583, 1597, 1601, 1607,
1609, 1613, 1619, 1621, 1627, 1637, 1657, 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721,
1723, 1733, 1741, 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, 1847,
1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907,
];
// const PRIMES: &'static [usize] = &[
// 1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, 1277, 1279, 1283,
// 1289, 1291, 1297, 1301, 1303, 1307, 1319, 1321, 1327, 1361, 1367, 1373, 1381, 1399, 1409,
// 1423, 1427, 1429, 1433, 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493,
// 1499, 1511, 1523, 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, 1583, 1597, 1601, 1607,
// 1609, 1613, 1619, 1621, 1627, 1637, 1657, 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721,
// 1723, 1733, 1741, 1747, 1753, 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, 1847,
// 1861, 1867, 1871, 1873, 1877, 1879, 1889, 1901, 1907,
// ];
#[cfg(feature = "spin-slow")]
const REPEAT: usize = 0x800;
#[cfg(not(feature = "spin-slow"))]
const REPEAT: usize = 0x8000;
// #[cfg(feature = "spin-slow")]
// const REPEAT: usize = 0x800;
// #[cfg(not(feature = "spin-slow"))]
// const REPEAT: usize = 0x8000;
const TREE_SIZE: usize = 10;
// const TREE_SIZE: usize = 10;
fn run_in_scope<T: Send>(pool: ThreadPool, f: impl FnOnce(&Scope<'_>) -> T + Send) -> T {
let pool = Box::new(pool);
let ptr = Box::into_raw(pool);
// fn run_in_scope<T: Send>(pool: ThreadPool, f: impl FnOnce(&Scope<'_>) -> T + Send) -> T {
// let pool = Box::new(pool);
// let ptr = Box::into_raw(pool);
let result = {
let pool: &'static ThreadPool = unsafe { &*ptr };
// pool.ensure_one_worker();
pool.resize_to_available();
let now = std::time::Instant::now();
let result = pool.scope(f);
let elapsed = now.elapsed().as_micros();
info!("(mine) total time: {}ms", elapsed as f32 / 1e3);
pool.resize_to(0);
assert!(pool.global_queue.is_empty());
result
};
// let result = {
// let pool: &'static ThreadPool = unsafe { &*ptr };
// // pool.ensure_one_worker();
// pool.resize_to_available();
// let now = std::time::Instant::now();
// let result = pool.scope(f);
// let elapsed = now.elapsed().as_micros();
// info!("(mine) total time: {}ms", elapsed as f32 / 1e3);
// pool.resize_to(0);
// assert!(pool.global_queue.is_empty());
// result
// };
let _pool = unsafe { Box::from_raw(ptr) };
result
}
// let _pool = unsafe { Box::from_raw(ptr) };
// result
// }
#[test]
#[tracing_test::traced_test]
fn rayon() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(bevy_tasks::available_parallelism())
.build()
.unwrap();
// #[test]
// #[tracing_test::traced_test]
// fn rayon() {
// let pool = rayon::ThreadPoolBuilder::new()
// .num_threads(bevy_tasks::available_parallelism())
// .build()
// .unwrap();
let now = std::time::Instant::now();
pool.scope(|s| {
for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
s.spawn(move |_| {
black_box(spinning(p));
});
}
});
let elapsed = now.elapsed().as_micros();
// let now = std::time::Instant::now();
// pool.scope(|s| {
// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
// s.spawn(move |_| {
// black_box(spinning(p));
// });
// }
// });
// let elapsed = now.elapsed().as_micros();
info!("(rayon) total time: {}ms", elapsed as f32 / 1e3);
}
// info!("(rayon) total time: {}ms", elapsed as f32 / 1e3);
// }
#[test]
#[tracing_test::traced_test]
fn rayon_join() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(bevy_tasks::available_parallelism())
.build()
.unwrap();
// #[test]
// #[tracing_test::traced_test]
// fn rayon_join() {
// let pool = rayon::ThreadPoolBuilder::new()
// .num_threads(bevy_tasks::available_parallelism())
// .build()
// .unwrap();
let tree = tree::Tree::new(TREE_SIZE, 1u32);
// let tree = tree::Tree::new(TREE_SIZE, 1u32);
fn sum(tree: &tree::Tree<u32>, node: usize) -> u32 {
let node = tree.get(node);
let (l, r) = rayon::join(
|| node.left.map(|node| sum(tree, node)).unwrap_or_default(),
|| node.right.map(|node| sum(tree, node)).unwrap_or_default(),
);
// fn sum(tree: &tree::Tree<u32>, node: usize) -> u32 {
// let node = tree.get(node);
// let (l, r) = rayon::join(
// || node.left.map(|node| sum(tree, node)).unwrap_or_default(),
// || node.right.map(|node| sum(tree, node)).unwrap_or_default(),
// );
node.leaf + l + r
}
// node.leaf + l + r
// }
let now = std::time::Instant::now();
let sum = pool.scope(move |s| {
let root = tree.root().unwrap();
sum(&tree, root)
});
// let now = std::time::Instant::now();
// let sum = pool.scope(move |s| {
// let root = tree.root().unwrap();
// sum(&tree, root)
// });
let elapsed = now.elapsed().as_micros();
// let elapsed = now.elapsed().as_micros();
info!("(rayon) {sum} total time: {}ms", elapsed as f32 / 1e3);
}
// info!("(rayon) {sum} total time: {}ms", elapsed as f32 / 1e3);
// }
#[test]
#[tracing_test::traced_test]
fn bevy_tasks() {
let pool = bevy_tasks::ComputeTaskPool::get_or_init(|| {
bevy_tasks::TaskPoolBuilder::new()
.num_threads(bevy_tasks::available_parallelism())
.build()
});
// #[test]
// #[tracing_test::traced_test]
// fn bevy_tasks() {
// let pool = bevy_tasks::ComputeTaskPool::get_or_init(|| {
// bevy_tasks::TaskPoolBuilder::new()
// .num_threads(bevy_tasks::available_parallelism())
// .build()
// });
let now = std::time::Instant::now();
pool.scope(|s| {
for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
s.spawn(async move {
black_box(spinning(p));
});
}
});
let elapsed = now.elapsed().as_micros();
// let now = std::time::Instant::now();
// pool.scope(|s| {
// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
// s.spawn(async move {
// black_box(spinning(p));
// });
// }
// });
// let elapsed = now.elapsed().as_micros();
info!("(bevy_tasks) total time: {}ms", elapsed as f32 / 1e3);
}
// info!("(bevy_tasks) total time: {}ms", elapsed as f32 / 1e3);
// }
#[test]
#[tracing_test::traced_test]
fn mine() {
std::thread_local! {
static WAIT_COUNT: Cell<usize> = const {Cell::new(0)};
}
let counter = Arc::new(AtomicUsize::new(0));
{
let pool = ThreadPool::new();
// #[test]
// #[tracing_test::traced_test]
// fn mine() {
// std::thread_local! {
// static WAIT_COUNT: Cell<usize> = const {Cell::new(0)};
// }
// let counter = Arc::new(AtomicUsize::new(0));
// {
// let pool = ThreadPool::new();
run_in_scope(pool, |s| {
for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
s.spawn(move |_| {
black_box(spinning(p));
});
}
});
};
// run_in_scope(pool, |s| {
// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
// s.spawn(move |_| {
// black_box(spinning(p));
// });
// }
// });
// };
// eprintln!("total wait count: {}", counter.load(Ordering::Acquire));
}
// // eprintln!("total wait count: {}", counter.load(Ordering::Acquire));
// }
#[test]
#[tracing_test::traced_test]
fn mine_join() {
let pool = ThreadPool::new();
// #[test]
// #[tracing_test::traced_test]
// fn mine_join() {
// let pool = ThreadPool::new();
let tree = tree::Tree::new(TREE_SIZE, 1u32);
// let tree = tree::Tree::new(TREE_SIZE, 1u32);
fn sum(tree: &tree::Tree<u32>, node: usize, scope: &Scope<'_>) -> u32 {
let node = tree.get(node);
let (l, r) = scope.join(
|s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(),
|s| {
node.right
.map(|node| sum(tree, node, s))
.unwrap_or_default()
},
);
// fn sum(tree: &tree::Tree<u32>, node: usize, scope: &Scope<'_>) -> u32 {
// let node = tree.get(node);
// let (l, r) = scope.join(
// |s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(),
// |s| {
// node.right
// .map(|node| sum(tree, node, s))
// .unwrap_or_default()
// },
// );
node.leaf + l + r
}
// node.leaf + l + r
// }
let sum = run_in_scope(pool, move |s| {
let root = tree.root().unwrap();
sum(&tree, root, s)
});
}
// let sum = run_in_scope(pool, move |s| {
// let root = tree.root().unwrap();
// sum(&tree, root, s)
// });
// }
#[test]
#[tracing_test::traced_test]
fn melange_join() {
let pool = melange::ThreadPool::new(bevy_tasks::available_parallelism());
// #[test]
// #[tracing_test::traced_test]
// fn melange_join() {
// let pool = melange::ThreadPool::new(bevy_tasks::available_parallelism());
let mut scope = pool.new_worker();
// let mut scope = pool.new_worker();
let tree = tree::Tree::new(TREE_SIZE, 1u32);
// let tree = tree::Tree::new(TREE_SIZE, 1u32);
fn sum(tree: &tree::Tree<u32>, node: usize, scope: &mut melange::WorkerThread) -> u32 {
let node = tree.get(node);
let (l, r) = scope.join(
|s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(),
|s| {
node.right
.map(|node| sum(tree, node, s))
.unwrap_or_default()
},
);
// fn sum(tree: &tree::Tree<u32>, node: usize, scope: &mut melange::WorkerThread) -> u32 {
// let node = tree.get(node);
// let (l, r) = scope.join(
// |s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(),
// |s| {
// node.right
// .map(|node| sum(tree, node, s))
// .unwrap_or_default()
// },
// );
node.leaf + l + r
}
let now = Instant::now();
let res = sum(&tree, tree.root().unwrap(), &mut scope);
eprintln!(
"res: {res} took {}ms",
now.elapsed().as_micros() as f32 / 1e3
);
assert_ne!(res, 0);
}
// node.leaf + l + r
// }
// let now = Instant::now();
// let res = sum(&tree, tree.root().unwrap(), &mut scope);
// eprintln!(
// "res: {res} took {}ms",
// now.elapsed().as_micros() as f32 / 1e3
// );
// assert_ne!(res, 0);
// }
#[test]
#[tracing_test::traced_test]
fn sync() {
let now = std::time::Instant::now();
for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
black_box(spinning(p));
}
let elapsed = now.elapsed().as_micros();
// #[test]
// #[tracing_test::traced_test]
// fn sync() {
// let now = std::time::Instant::now();
// for &p in core::iter::repeat_n(PRIMES, REPEAT).flatten() {
// black_box(spinning(p));
// }
// let elapsed = now.elapsed().as_micros();
info!("(sync) total time: {}ms", elapsed as f32 / 1e3);
}
// info!("(sync) total time: {}ms", elapsed as f32 / 1e3);
// }
#[inline]
fn spinning(i: usize) {
#[cfg(feature = "spin-slow")]
spinning_slow(i);
#[cfg(not(feature = "spin-slow"))]
spinning_fast(i);
}
// #[inline]
// fn spinning(i: usize) {
// #[cfg(feature = "spin-slow")]
// spinning_slow(i);
// #[cfg(not(feature = "spin-slow"))]
// spinning_fast(i);
// }
#[inline]
fn spinning_slow(i: usize) {
let rng = rng::XorShift64Star::new(i as u64);
(0..i).reduce(|a, b| {
black_box({
let a = rng.next_usize(a.max(1));
((b as f32).exp() * (a as f32).sin().cbrt()).to_bits() as usize
})
});
}
// #[inline]
// fn spinning_slow(i: usize) {
// let rng = rng::XorShift64Star::new(i as u64);
// (0..i).reduce(|a, b| {
// black_box({
// let a = rng.next_usize(a.max(1));
// ((b as f32).exp() * (a as f32).sin().cbrt()).to_bits() as usize
// })
// });
// }
#[inline]
fn spinning_fast(i: usize) {
let rng = rng::XorShift64Star::new(i as u64);
//(0..rng.next_usize(i)).reduce(|a, b| {
(0..20).reduce(|a, b| {
black_box({
let a = rng.next_usize(a.max(1));
a ^ b
})
});
}
}
// #[inline]
// fn spinning_fast(i: usize) {
// let rng = rng::XorShift64Star::new(i as u64);
// //(0..rng.next_usize(i)).reduce(|a, b| {
// (0..20).reduce(|a, b| {
// black_box({
// let a = rng.next_usize(a.max(1));
// a ^ b
// })
// });
// }
// }

View file

@ -14,6 +14,7 @@ use std::{
use crossbeam::utils::CachePadded;
use parking_lot::{Condvar, Mutex};
use parking_lot_core::SpinWait;
use crate::{latch::*, ThreadControl};
mod job {
@ -417,17 +418,28 @@ impl WorkerThread {
fn worker(mut self) {
self.control().notify_running();
'outer: loop {
// inner look runs until no shared tasks exist.
loop {
if self.control().should_terminate.probe() {
break 'outer;
}
let task = { self.shared().lock().pop_first_task() };
if let Some(task) = task {
self.execute_job(task);
}
if self.control().should_terminate.probe() {
} else {
break;
}
}
// signal heartbeat thread that we would really like another task
//self.ctx().heartbeat_control.wake();
// spin here maybe?
// wait to be signaled since no more shared tasks exist.
let mut guard = self.shared().lock();
self.ctx().task_shared.wait(&mut guard);
}
@ -436,17 +448,28 @@ impl WorkerThread {
}
fn execute_job(&mut self, job: NonNull<Job>) {
self.heartbeat();
unsafe {
job.as_ref().execute(self);
}
}
#[inline]
fn heartbeat(&mut self) {
if self.heartbeat.load(Ordering::Relaxed) {
self.heartbeat_cold();
}
}
#[cold]
fn heartbeat_cold(&mut self) {
let mut guard = self.context.shared.lock();
if guard.shared_tasks[self.index].is_none() {
if let Some(task) = self.queue.pop_front() {
unsafe {
task.as_ref().set_pending();
}
guard.shared_tasks[self.index] = Some(task);
self.context.task_shared.notify_one();
}
@ -494,7 +517,7 @@ impl WorkerThread {
(ra, rb)
}
fn join_heartbeat<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
pub fn join_heartbeat<A, B, RA, RB>(&mut self, a: A, b: B) -> (RA, RB)
where
A: FnOnce(&mut WorkerThread) -> RA + Send,
B: FnOnce(&mut WorkerThread) -> RB + Send,
@ -503,27 +526,31 @@ impl WorkerThread {
{
let b = StackJob::new(b);
let job = Box::new(b.as_job());
let job = Box::into_raw(Box::new(b.as_job()));
let job_ref = unsafe { &*job };
// let job = Box::new(b.as_job());
self.queue
.push_back(unsafe { NonNull::new_unchecked(&job as *const _ as *mut _) });
let job = unsafe { job.cast_box::<RB, Self>() };
.push_back(unsafe { NonNull::new_unchecked(job as *mut _) });
let ra = a(self);
let rb = if job.state() == JobState::Empty as u8 {
self.pop_job_id(unsafe { job.as_ref().cast() });
let rb =
if job_ref.state() == JobState::Empty as u8 && self.pop_job_ptr(job.cast()).is_some() {
unsafe { b.unwrap()(self) }
} else {
self.run_until(&job.as_ref());
job.wait()
self.run_until(job_ref);
job_ref.wait()
};
let _job = unsafe { Box::from_raw(job) };
(ra, rb)
}
fn pop_job_id(&mut self, id: &Job) -> Option<&Job<()>> {
fn pop_job_ptr(&mut self, id: *const Job) -> Option<&Job<()>> {
self.queue
.pop_back_if(|job| unsafe { (&*job).as_ref().id() == id.id() })
.iter()
.rposition(|job| job.as_ptr() == id.cast_mut())
.and_then(|i| self.queue.remove(i))
.map(|job| unsafe { job.as_ref() })
}
@ -536,6 +563,7 @@ impl WorkerThread {
#[cold]
fn run_until_cold<L: Probe>(&mut self, latch: &L) {
let job = self.shared().lock().shared_tasks[self.index].take();
if let Some(job) = job {
self.execute_job(job);
}
@ -617,7 +645,9 @@ impl Context {
};
if let Some(duration) = sleep_for {
thread::sleep(duration);
self.heartbeat_control
.wait_for_should_wake_timeout(duration);
// thread::sleep(duration);
}
}
}