removed lots of inline hints because they probably SUCK + benchmark

This commit is contained in:
Janis 2025-07-01 13:05:19 +02:00
parent f384f61f81
commit 69d3794ff1
8 changed files with 144 additions and 21 deletions

View file

@ -3,6 +3,12 @@ name = "distaff"
version = "0.1.0" version = "0.1.0"
edition = "2024" edition = "2024"
[profile.bench]
debug = true
[profile.release]
debug = true
[features] [features]
default = ["metrics"] default = ["metrics"]
tracing = ["dep:tracing"] tracing = ["dep:tracing"]
@ -23,3 +29,11 @@ async-task = "4.7.1"
tracing-test = {version = "0.2"} tracing-test = {version = "0.2"}
tracing-tracy = {version = "0.11"} tracing-tracy = {version = "0.11"}
futures = "0.3" futures = "0.3"
divan = "0.1.14"
rayon = "1.10.0"
chili = {path = "../../chili"}
[[bench]]
name = "overhead"
harness = false

119
distaff/benches/overhead.rs Normal file
View file

@ -0,0 +1,119 @@
use distaff::{Scope, ThreadPool};
use divan::Bencher;
struct Node {
val: u64,
left: Option<Box<Node>>,
right: Option<Box<Node>>,
}
impl Node {
pub fn tree(layers: usize) -> Self {
Self {
val: 1,
left: (layers != 1).then(|| Box::new(Self::tree(layers - 1))),
right: (layers != 1).then(|| Box::new(Self::tree(layers - 1))),
}
}
}
const LAYERS: &[usize] = &[10, 24];
fn nodes() -> impl Iterator<Item = (usize, usize)> {
LAYERS.iter().map(|&l| (l, (1 << l) - 1))
}
#[divan::bench(args = nodes())]
fn no_overhead(bencher: Bencher, nodes: (usize, usize)) {
fn join_no_overhead<A, B, RA, RB>(scope: &Scope<'_, '_>, a: A, b: B) -> (RA, RB)
where
A: FnOnce(&Scope<'_, '_>) -> RA + Send,
B: FnOnce(&Scope<'_, '_>) -> RB + Send,
RA: Send,
RB: Send,
{
(a(scope), b(scope))
}
#[inline]
fn sum(node: &Node, scope: &Scope<'_, '_>) -> u64 {
let (left, right) = join_no_overhead(
scope,
|s| node.left.as_deref().map(|n| sum(n, s)).unwrap_or_default(),
|s| node.right.as_deref().map(|n| sum(n, s)).unwrap_or_default(),
);
node.val + left + right
}
let tree = Node::tree(nodes.0);
let pool = ThreadPool::global();
bencher.bench_local(move || {
pool.scope(|scope| {
assert_eq!(sum(&tree, scope), nodes.1 as u64);
});
});
}
#[divan::bench(args = nodes())]
fn distaff_overhead(bencher: Bencher, nodes: (usize, usize)) {
fn sum<'scope, 'env>(node: &Node, scope: &'scope Scope<'scope, 'env>) -> u64 {
let (left, right) = scope.join(
|s| node.left.as_deref().map(|n| sum(n, s)).unwrap_or_default(),
|s| node.right.as_deref().map(|n| sum(n, s)).unwrap_or_default(),
);
node.val + left + right
}
let tree = Node::tree(nodes.0);
let pool = ThreadPool::global();
bencher.bench_local(move || {
pool.scope(|scope| {
assert_eq!(sum(&tree, scope), nodes.1 as u64);
});
});
}
#[divan::bench(args = nodes())]
fn rayon_overhead(bencher: Bencher, nodes: (usize, usize)) {
fn sum(node: &Node) -> u64 {
let (left, right) = rayon::join(
|| node.left.as_deref().map(sum).unwrap_or_default(),
|| node.right.as_deref().map(sum).unwrap_or_default(),
);
node.val + left + right
}
let tree = Node::tree(nodes.0);
bencher.bench_local(move || {
assert_eq!(sum(&tree), nodes.1 as u64);
});
}
#[divan::bench(args = nodes())]
fn chili_overhead(bencher: Bencher, nodes: (usize, usize)) {
use chili::Scope;
fn sum(node: &Node, scope: &mut Scope<'_>) -> u64 {
let (left, right) = scope.join(
|s| node.left.as_deref().map(|n| sum(n, s)).unwrap_or_default(),
|s| node.right.as_deref().map(|n| sum(n, s)).unwrap_or_default(),
);
node.val + left + right
}
let tree = Node::tree(nodes.0);
let mut scope = Scope::global();
bencher.bench_local(move || {
assert_eq!(sum(&tree, &mut scope), nodes.1 as u64);
});
}
fn main() {
divan::main();
}

View file

@ -53,7 +53,6 @@ impl Shared {
} }
impl Context { impl Context {
#[inline]
pub(crate) fn shared(&self) -> parking_lot::MutexGuard<'_, Shared> { pub(crate) fn shared(&self) -> parking_lot::MutexGuard<'_, Shared> {
self.shared.lock() self.shared.lock()
} }

View file

@ -11,7 +11,6 @@ use crate::{
}; };
impl WorkerThread { impl WorkerThread {
#[inline]
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn join_seq<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB) fn join_seq<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
where where
@ -24,7 +23,6 @@ impl WorkerThread {
(ra, rb) (ra, rb)
} }
#[inline]
pub(crate) fn join_heartbeat_every<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB) pub(crate) fn join_heartbeat_every<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
where where
A: FnOnce() -> RA + Send, A: FnOnce() -> RA + Send,
@ -61,7 +59,6 @@ impl WorkerThread {
} }
/// This function must be called from a worker thread. /// This function must be called from a worker thread.
#[inline]
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn join_heartbeat<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB) fn join_heartbeat<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
where where
@ -131,7 +128,6 @@ impl WorkerThread {
} }
impl Context { impl Context {
#[inline]
pub fn join<A, B, RA, RB>(self: &Arc<Self>, a: A, b: B) -> (RA, RB) pub fn join<A, B, RA, RB>(self: &Arc<Self>, a: A, b: B) -> (RA, RB)
where where
A: FnOnce() -> RA + Send, A: FnOnce() -> RA + Send,

View file

@ -257,7 +257,6 @@ impl<'scope, 'env> Scope<'scope, 'env> {
task task
} }
#[inline]
pub fn join<A, B, RA, RB>(&'scope self, a: A, b: B) -> (RA, RB) pub fn join<A, B, RA, RB>(&'scope self, a: A, b: B) -> (RA, RB)
where where
RA: Send, RA: Send,

View file

@ -8,8 +8,8 @@ pub struct ThreadPool {
impl Drop for ThreadPool { impl Drop for ThreadPool {
fn drop(&mut self) { fn drop(&mut self) {
// Ensure that the context is properly cleaned up when the thread pool is dropped. // TODO: Ensure that the context is properly cleaned up when the thread pool is dropped.
self.context.set_should_exit(); // self.context.set_should_exit();
} }
} }
@ -25,6 +25,11 @@ impl ThreadPool {
Self { context } Self { context }
} }
pub fn global() -> Self {
let context = Context::global_context().clone();
Self { context }
}
pub fn scope<'env, F, R>(&self, f: F) -> R pub fn scope<'env, F, R>(&self, f: F) -> R
where where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R + Send, F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R + Send,

View file

@ -182,7 +182,6 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
} }
/// returns tag /// returns tag
#[inline]
#[allow(dead_code)] #[allow(dead_code)]
pub fn compare_exchange_tag( pub fn compare_exchange_tag(
&self, &self,
@ -201,7 +200,6 @@ impl<T, const BITS: u8> TaggedAtomicPtr<T, BITS> {
} }
/// returns tag /// returns tag
#[inline]
pub fn compare_exchange_weak_tag( pub fn compare_exchange_weak_tag(
&self, &self,
old: usize, old: usize,

View file

@ -147,7 +147,6 @@ impl WorkerThread {
None None
} }
#[inline]
fn find_work(&self) -> Option<SharedJob> { fn find_work(&self) -> Option<SharedJob> {
let mut guard = self.context.shared(); let mut guard = self.context.shared();
@ -162,7 +161,6 @@ impl WorkerThread {
None None
} }
#[inline(always)]
pub(crate) fn tick(&self) { pub(crate) fn tick(&self) {
if self.heartbeat.take() { if self.heartbeat.take() {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -176,7 +174,6 @@ impl WorkerThread {
} }
} }
#[inline]
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
fn execute(&self, job: SharedJob) { fn execute(&self, job: SharedJob) {
unsafe { SharedJob::execute(job, self) }; unsafe { SharedJob::execute(job, self) };
@ -209,28 +206,23 @@ impl WorkerThread {
} }
impl WorkerThread { impl WorkerThread {
#[inline]
pub fn pop_back(&self) -> Option<NonNull<Job>> { pub fn pop_back(&self) -> Option<NonNull<Job>> {
unsafe { self.queue.as_mut_unchecked().pop_back() } unsafe { self.queue.as_mut_unchecked().pop_back() }
} }
#[inline]
pub fn push_back<T>(&self, job: *const Job<T>) { pub fn push_back<T>(&self, job: *const Job<T>) {
unsafe { self.queue.as_mut_unchecked().push_back(job.cast()) } unsafe { self.queue.as_mut_unchecked().push_back(job.cast()) }
} }
#[inline]
pub fn push_front<T>(&self, job: *const Job<T>) { pub fn push_front<T>(&self, job: *const Job<T>) {
unsafe { self.queue.as_mut_unchecked().push_front(job.cast()) } unsafe { self.queue.as_mut_unchecked().push_front(job.cast()) }
} }
#[inline]
pub fn pop_front(&self) -> Option<NonNull<Job>> { pub fn pop_front(&self) -> Option<NonNull<Job>> {
unsafe { self.queue.as_mut_unchecked().pop_front() } unsafe { self.queue.as_mut_unchecked().pop_front() }
} }
} }
impl WorkerThread { impl WorkerThread {
#[inline]
pub fn current_ref<'a>() -> Option<&'a Self> { pub fn current_ref<'a>() -> Option<&'a Self> {
unsafe { (*WORKER.with(UnsafeCell::get)).map(|ptr| ptr.as_ref()) } unsafe { (*WORKER.with(UnsafeCell::get)).map(|ptr| ptr.as_ref()) }
} }
@ -352,9 +344,10 @@ impl WorkerThread {
unsafe { unsafe {
SharedJob::execute(job, self); SharedJob::execute(job, self);
} }
} else { continue;
break;
} }
recv.wait();
} }
Some(recv.recv()) Some(recv.recv())