Compare commits

...

12 commits

Author SHA1 Message Date
Janis a2112b9ef5 move join stuff to context, but should be moved to workerthread 2025-06-20 13:03:02 +02:00
Janis f6f8095440 remove with_in/with functions from WorkerThread (replaced by run_in_*) 2025-06-20 12:24:13 +02:00
Janis 940c681222 threadlocal count for join/heatbeat 2025-06-20 12:16:51 +02:00
Janis 9b0cc41834 does stuff, doesn't deadlock, faster than rayon (maybe?), 2025-06-19 18:00:06 +02:00
Janis d611535994 it compiles... 2025-06-19 14:25:15 +02:00
Janis 9f776183c4 tests updated to no more pin innit 2025-06-19 14:24:58 +02:00
Janis 04496aa7e2 cfg-if, idk why? 2025-06-19 14:24:32 +02:00
Janis 0af62712ea idk why but cool nightly features 2025-06-17 14:49:04 +02:00
Janis 1baf870d9c lots
+ removed pin in joblist and jobs
+ running closures in worker functions
2025-06-17 14:47:17 +02:00
Janis dc820fc64a TaggedAtomicPtr to named struct 2025-06-17 14:45:50 +02:00
Janis e590dc4509 proper copy and clone for sendptr 2025-06-17 14:45:22 +02:00
Janis 46504f64f2 latchref and nopref types 2025-06-17 14:44:55 +02:00
6 changed files with 980 additions and 319 deletions

View file

@ -39,6 +39,7 @@ thiserror = "2.0"
bitflags = "2.6"
core_affinity = "0.8.1"
parking_lot_core = "0.9.10"
cfg-if = "1.0.0"
# derive_more = "1.0.0"
[dev-dependencies]

View file

@ -56,7 +56,7 @@ mod tree {
}
}
const TREE_SIZE: usize = 16;
const TREE_SIZE: usize = 13;
#[bench]
fn join_melange(b: &mut Bencher) {
@ -93,6 +93,7 @@ fn join_melange(b: &mut Bencher) {
#[bench]
fn join_praetor(b: &mut Bencher) {
tracing_subscriber::fmt().with_test_writer().init();
use executor::praetor::Scope;
let pool = executor::praetor::ThreadPool::global();
@ -121,6 +122,7 @@ fn join_praetor(b: &mut Bencher) {
#[bench]
fn join_sync(b: &mut Bencher) {
tracing_subscriber::fmt().with_test_writer().init();
let tree = tree::Tree::new(TREE_SIZE, 1u32);
fn sum(tree: &tree::Tree<u32>, node: usize) -> u32 {

View file

@ -5,6 +5,8 @@
cell_update,
cold_path,
fn_align,
box_vec_non_null,
atomic_try_update,
let_chains
)]
@ -163,6 +165,7 @@ pub mod task {
}
pub mod latch {
use core::marker::PhantomData;
use std::{
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
@ -283,6 +286,48 @@ pub mod latch {
}
}
pub struct LatchRef<'a, L: Latch> {
inner: *const L,
_marker: PhantomData<&'a L>,
}
impl<'a, L: Latch> LatchRef<'a, L> {
#[inline]
pub const fn new(latch: &'a L) -> Self {
Self {
inner: latch,
_marker: PhantomData,
}
}
}
impl<'a, L: Latch> Latch for LatchRef<'a, L> {
#[inline]
unsafe fn set_raw(this: *const Self) {
let this = &*this;
Latch::set_raw(this.inner);
}
}
impl<'a, L: Latch + Probe> Probe for LatchRef<'a, L> {
#[inline]
fn probe(&self) -> bool {
unsafe {
let this = &*self.inner;
Probe::probe(this)
}
}
}
pub struct NopLatch;
impl Latch for NopLatch {
#[inline]
unsafe fn set_raw(_this: *const Self) {
// do nothing
}
}
pub struct MutexLatch {
mutex: Mutex<bool>,
signal: Condvar,

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,9 @@
use std::{mem::MaybeUninit, pin::Pin};
use std::{
mem::MaybeUninit,
pin::{pin, Pin},
};
use tracing_test::traced_test;
use super::{util::TaggedAtomicPtr, *};
@ -11,11 +16,11 @@ fn pin_ptr<T>(pin: &Pin<&mut T>) -> NonNull<T> {
fn new_job() {
let mut list = JobList::new();
let stack = pin!(StackJob::new(|_: &Scope| 3 + 3));
let stack = StackJob::new(|| 3 + 3, crate::latch::MutexLatch::new());
let job = pin!(stack.as_ref().as_job());
let job = stack.as_job();
unsafe {
list.push_front(job.as_ref());
list.push_front(&job);
}
unsafe {
@ -24,7 +29,7 @@ fn new_job() {
_ = stack.unwrap();
job_ref.complete(Ok(6));
let result = job_ref.wait();
let result = job_ref.wait().into_inner();
assert_eq!(result.ok(), Some(6));
}
}
@ -37,14 +42,14 @@ fn job_list_pop_back() {
let c = pin!(Job::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(b.as_ref());
list.push_back(c.as_ref());
list.push_front(&a);
list.push_front(&b);
list.push_back(&c);
}
assert_eq!(list.pop_back(), Some(pin_ptr(&c)));
unsafe {
list.push_front(c.as_ref());
list.push_front(&c);
}
assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
assert_eq!(list.pop_back(), Some(pin_ptr(&b)));
@ -61,14 +66,14 @@ fn job_list_pop_front() {
let c = pin!(Job::<()>::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(b.as_ref());
list.push_back(c.as_ref());
list.push_front(&a);
list.push_front(&b);
list.push_back(&c);
}
assert_eq!(list.pop_front(), Some(pin_ptr(&b)));
unsafe {
list.push_back(b.as_ref());
list.push_back(&b);
}
assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
assert_eq!(list.pop_front(), Some(pin_ptr(&c)));
@ -85,9 +90,9 @@ fn unlink_job_middle() {
let c = pin!(Job::<()>::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(b.as_ref());
list.push_front(c.as_ref());
list.push_front(&a);
list.push_front(&b);
list.push_front(&c);
}
unsafe {
@ -108,9 +113,9 @@ fn unlink_job_front() {
let c = pin!(Job::<()>::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(b.as_ref());
list.push_front(c.as_ref());
list.push_front(&a);
list.push_front(&b);
list.push_front(&c);
}
unsafe {
@ -131,9 +136,9 @@ fn unlink_job_back() {
let c = pin!(Job::<()>::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(b.as_ref());
list.push_front(c.as_ref());
list.push_front(&a);
list.push_front(&b);
list.push_front(&c);
}
unsafe {
@ -152,7 +157,7 @@ fn unlink_job_single() {
let a = pin!(Job::<()>::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(&a);
}
unsafe {
@ -342,7 +347,7 @@ fn job_list_pop_back_emptied() {
let a = pin!(Job::<()>::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(&a);
}
assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
@ -357,7 +362,7 @@ fn job_list_pop_front_emptied() {
let a = pin!(Job::<()>::empty());
unsafe {
list.push_front(a.as_ref());
list.push_front(&a);
}
assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
@ -367,9 +372,27 @@ fn job_list_pop_front_emptied() {
}
#[test]
#[tracing_test::traced_test]
fn spawn() {
let pool = ThreadPool::new();
let mut x = 0;
pool.scope(|s| {
tracing::info!("scope");
s.spawn(|_| {
tracing::info!("spawn");
x += 1;
tracing::info!("x: {x}");
});
tracing::info!("~scope");
});
eprintln!("x: {x}");
}
#[test]
fn rayon_spawn() {
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
let mut x = 0;
pool.scope(|s| {
s.spawn(|_| {
@ -425,6 +448,35 @@ fn join() {
eprintln!("x: {x}");
}
#[test]
#[traced_test]
fn join_many() {
use crate::util::tree::{Tree, TREE_SIZE};
let pool = ThreadPool::new();
let tree = Tree::new(16, 1u32);
fn sum(tree: &Tree<u32>, node: usize, scope: &Scope) -> u32 {
let node = tree.get(node);
let (l, r) = scope.join_heartbeat(
|s| node.left.map(|node| sum(tree, node, s)).unwrap_or_default(),
|s| {
node.right
.map(|node| sum(tree, node, s))
.unwrap_or_default()
},
);
// eprintln!("node: {node:?}, l: {l}, r: {r}");
node.leaf + l + r
}
let sum = pool.scope(|s| sum(&tree, tree.root().unwrap(), s));
eprintln!("{sum}");
}
#[test]
fn rebox() {
struct A(u32);

View file

@ -65,3 +65,71 @@ impl XorShift64Star {
(self.next() % n as u64) as usize
}
}
#[macro_export]
macro_rules! cfg_miri {
(
@miri => { $($tokens:tt)* }$(,)?
_ => { $($tokens2:tt)* }
) => {
#[cfg(miri)]
{
$($tokens)*
}
#[cfg(not(miri))]
{
$($tokens2)*
}
};
}
pub mod tree {
pub struct Tree<T> {
nodes: Box<[Node<T>]>,
root: Option<usize>,
}
#[derive(Debug, Clone)]
pub struct Node<T> {
pub leaf: T,
pub left: Option<usize>,
pub right: Option<usize>,
}
impl<T> Tree<T> {
pub fn new(depth: usize, t: T) -> Tree<T>
where
T: Copy,
{
let mut nodes = Vec::with_capacity((0..depth).sum());
let root = Self::build_node(&mut nodes, depth, t);
Self {
nodes: nodes.into_boxed_slice(),
root: Some(root),
}
}
pub fn root(&self) -> Option<usize> {
self.root
}
pub fn get(&self, index: usize) -> &Node<T> {
&self.nodes[index]
}
pub fn build_node(nodes: &mut Vec<Node<T>>, depth: usize, t: T) -> usize
where
T: Copy,
{
let node = Node {
leaf: t,
left: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)),
right: (depth != 0).then(|| Self::build_node(nodes, depth - 1, t)),
};
nodes.push(node);
nodes.len() - 1
}
}
pub const TREE_SIZE: usize = 16;
}