tests run

This commit is contained in:
Janis 2025-07-01 21:24:51 +02:00
parent 6e4f6a1285
commit edf25e407f
5 changed files with 272 additions and 96 deletions

View file

@ -135,11 +135,11 @@ impl Context {
/// caller should hold the shared lock while calling this
pub unsafe fn notify_job_shared(&self) {
if let Some((i, sender)) = self
.heartbeats
.inner()
let heartbeats = self.heartbeats.inner();
if let Some((i, sender)) = heartbeats
.iter()
.find(|(_, heartbeat)| heartbeat.is_waiting())
.or_else(|| heartbeats.iter().next())
{
_ = i;
#[cfg(feature = "tracing")]

View file

@ -13,6 +13,7 @@ use crate::{
channel::{Parker, Sender},
};
#[repr(transparent)]
pub struct StackJob<F> {
f: UnsafeCell<ManuallyDrop<F>>,
}
@ -196,6 +197,9 @@ impl SharedJob {
unsafe {
(harness)(worker, this, sender);
}
#[cfg(feature = "tracing")]
tracing::trace!("finished executing shared job: {:?}", this);
}
}
@ -249,3 +253,69 @@ mod queuedjobqueue {
}
}
}
pub mod traits {
use std::{cell::UnsafeCell, mem::ManuallyDrop};
use crate::WorkerThread;
use super::{HeapJob, Job2, StackJob};
pub trait IntoJob<T> {
fn into_job(self) -> Job2<T>;
}
pub trait InlineJob<T>: IntoJob<T> {
fn run_inline(self, worker: &WorkerThread) -> T;
}
impl<F, T> IntoJob<T> for F
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn into_job(self) -> Job2<T> {
Job2::from_heapjob(HeapJob::new(self))
}
}
impl<F, T> IntoJob<T> for &UnsafeCell<ManuallyDrop<F>>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn into_job(self) -> Job2<T> {
Job2::from_stackjob(unsafe { std::mem::transmute::<Self, &StackJob<F>>(self) })
}
}
impl<F, T> InlineJob<T> for &UnsafeCell<ManuallyDrop<F>>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn run_inline(self, worker: &WorkerThread) -> T {
unsafe { ManuallyDrop::take(&mut *self.get())(worker) }
}
}
impl<F, T> IntoJob<T> for &StackJob<F>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn into_job(self) -> Job2<T> {
Job2::from_stackjob(self)
}
}
impl<F, T> InlineJob<T> for &StackJob<F>
where
F: FnOnce(&WorkerThread) -> T + Send,
T: Send,
{
fn run_inline(self, worker: &WorkerThread) -> T {
unsafe { self.unwrap()(worker) }
}
}
}

View file

@ -5,7 +5,10 @@ use std::{hint::cold_path, sync::Arc};
use crate::{
context::Context,
job::{Job2 as Job, StackJob},
job::{
Job2 as Job, StackJob,
traits::{InlineJob, IntoJob},
},
workerthread::WorkerThread,
};
@ -58,6 +61,99 @@ impl WorkerThread {
}
}
/// This function must be called from a worker thread.
#[allow(dead_code)]
pub(crate) fn join_heartbeat2_every<A, B, RA, RB, const TIMES: usize>(
&self,
a: A,
b: B,
) -> (RA, RB)
where
RA: Send,
A: InlineJob<RA> + Copy,
B: FnOnce(&WorkerThread) -> RB,
{
// SAFETY: each worker is only ever used by one thread, so this is safe.
let count = self.join_count.get();
let queue_len = unsafe { self.queue.as_ref_unchecked().len() };
self.join_count.set(count.wrapping_add(1) % TIMES as u8);
// TODO: add counter to job queue, check for low job count to decide whether to use heartbeat or seq.
// see: chili
// SAFETY: this function runs in a worker thread, so we can access the queue safely.
if count == 0 || queue_len < 3 {
cold_path();
self.join_heartbeat2(a, b)
} else {
(a.run_inline(self), b(self))
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
pub(crate) fn join_heartbeat2<RA, A, B, RB>(&self, a: A, b: B) -> (RA, RB)
where
B: FnOnce(&WorkerThread) -> RB,
A: InlineJob<RA> + Copy,
RA: Send,
{
use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
#[cfg(feature = "metrics")]
self.metrics.num_joins.fetch_add(1, Ordering::Relaxed);
let job = a.into_job();
self.push_back(&job);
self.tick();
let rb = match catch_unwind(AssertUnwindSafe(|| b(self))) {
Ok(val) => val,
Err(payload) => {
#[cfg(feature = "tracing")]
tracing::debug!("join_heartbeat: b panicked, waiting for a to finish");
cold_path();
// if b panicked, we need to wait for a to finish
let mut receiver = job.take_receiver();
self.wait_until_pred(|| match &receiver {
Some(recv) => recv.poll().is_some(),
None => {
receiver = job.take_receiver();
false
}
});
resume_unwind(payload);
}
};
let ra = if let Some(recv) = job.take_receiver() {
match self.wait_until_recv(recv) {
Some(t) => crate::util::unwrap_or_panic(t),
None => {
#[cfg(feature = "tracing")]
tracing::trace!(
"join_heartbeat: job was shared, but reclaimed, running a() inline"
);
// the job was shared, but not yet stolen, so we get to run the
// job inline
a.run_inline(self)
}
}
} else {
self.pop_back();
// SAFETY: we just popped the job from the queue, so it is safe to unwrap.
#[cfg(feature = "tracing")]
tracing::trace!("join_heartbeat: job was not shared, running a() inline");
a.run_inline(self)
};
(ra, rb)
}
/// This function must be called from a worker thread.
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn join_heartbeat<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)

View file

@ -15,7 +15,10 @@ use async_task::Runnable;
use crate::{
channel::Sender,
context::Context,
job::{HeapJob, Job2 as Job},
job::{
HeapJob, Job2 as Job,
traits::{InlineJob, IntoJob},
},
latch::{CountLatch, Probe},
util::{DropGuard, SendPtr},
workerthread::WorkerThread,
@ -107,25 +110,6 @@ impl ScopeInner {
}
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn wait_for_jobs(&self) {
loop {
let count = self.outstanding_jobs.load(Ordering::Relaxed);
if count == 0 {
break;
}
#[cfg(feature = "tracing")]
tracing::trace!("waiting for {} jobs to finish.", count);
// wait until the parker is unparked
unsafe {
self.parker.as_ref().park();
}
// parking gives us AcqRel semantics.
}
}
}
// find below a sketch of an unbalanced tree:
@ -202,14 +186,31 @@ impl<'scope, 'env> Scope<'scope, 'env> {
}
};
self.wait_for_jobs();
let inner = self.inner();
inner.wait_for_jobs();
inner.maybe_propagate_panic();
// SAFETY: if result panicked, we would have propagated the panic above.
result.unwrap()
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn wait_for_jobs(&self) {
#[cfg(feature = "tracing")]
tracing::trace!(
"waiting for {} jobs to finish.",
self.inner().outstanding_jobs.load(Ordering::Relaxed)
);
self.worker().wait_until_pred(|| {
// SAFETY: we are in a worker thread, so the inner is valid.
let count = self.inner().outstanding_jobs.load(Ordering::Relaxed);
#[cfg(feature = "tracing")]
tracing::trace!("waiting for {} jobs to finish.", count);
count == 0
});
}
fn inner(&self) -> &ScopeInner {
unsafe { self.inner.as_ref() }
}
@ -224,12 +225,6 @@ impl<'scope, 'env> Scope<'scope, 'env> {
where
F: FnOnce(Self) + Send,
{
let inner = self.inner;
unsafe {
inner.as_ref().increment();
}
struct SpawnedJob<F> {
f: F,
inner: SendPtr<ScopeInner>,
@ -263,11 +258,10 @@ impl<'scope, 'env> Scope<'scope, 'env> {
// SAFETY: we are in a worker thread, so the inner is valid.
(f)(scope);
unsafe { inner.as_ref().decrement() };
}
}
self.inner().increment();
let job = SpawnedJob::new(
move |scope| {
if let Err(payload) = catch_unwind(AssertUnwindSafe(|| f(scope))) {
@ -280,9 +274,6 @@ impl<'scope, 'env> Scope<'scope, 'env> {
);
self.context().inject_job(job.share(None));
// WorkerThread::current_ref()
// .expect("spawn is run in workerthread.")
// .push_front(job.as_ptr());
}
pub fn spawn_future<T, F>(&self, future: F) -> async_task::Task<T>
@ -312,23 +303,17 @@ impl<'scope, 'env> Scope<'scope, 'env> {
{
self.inner().increment();
let this = SendPtr::new_const(self).unwrap();
// let this = SendPtr::new_const(&self.job_counter).unwrap();
// TODO: make sure this worker lasts long enough for the
// reference to remain valid for the duration of the future.
let scope = unsafe { Self::new_unchecked(self.worker.as_ref(), self.inner) };
let future = async move {
// SAFETY: this is valid until we decrement the job counter.
unsafe {
let _guard = DropGuard::new(move || {
this.as_ref().inner().decrement();
});
let _guard = DropGuard::new(move || {
scope.inner().decrement();
});
// TODO: handle panics here
f(scope).await
}
// TODO: handle panics here
f(scope).await
};
let schedule = move |runnable: Runnable| {
@ -415,59 +400,84 @@ impl<'scope, 'env> Scope<'scope, 'env> {
}
}
let stack = ScopeJob::new(a, self.inner);
let job = ScopeJob::into_job(&stack);
worker.push_back(&job);
worker.tick();
let rb = match catch_unwind(AssertUnwindSafe(|| b(*self))) {
Ok(val) => val,
Err(payload) => {
#[cfg(feature = "tracing")]
tracing::debug!("join_heartbeat: b panicked, waiting for a to finish");
std::hint::cold_path();
// if b panicked, we need to wait for a to finish
let mut receiver = job.take_receiver();
worker.wait_until_pred(|| match &receiver {
Some(recv) => recv.poll().is_some(),
None => {
receiver = job.take_receiver();
false
}
});
resume_unwind(payload);
impl<'scope, 'env, F, T> IntoJob<T> for &ScopeJob<F>
where
F: FnOnce(Scope<'scope, 'env>) -> T + Send,
'env: 'scope,
T: Send,
{
fn into_job(self) -> Job<T> {
self.into_job()
}
};
}
let ra = if let Some(recv) = job.take_receiver() {
match worker.wait_until_recv(recv) {
Some(t) => crate::util::unwrap_or_panic(t),
None => {
#[cfg(feature = "tracing")]
tracing::trace!(
"join_heartbeat: job was shared, but reclaimed, running a() inline"
);
// the job was shared, but not yet stolen, so we get to run the
// job inline
unsafe { stack.unwrap()(*self) }
}
impl<'scope, 'env, F, T> InlineJob<T> for &ScopeJob<F>
where
F: FnOnce(Scope<'scope, 'env>) -> T + Send,
'env: 'scope,
T: Send,
{
fn run_inline(self, worker: &WorkerThread) -> T {
unsafe { self.unwrap()(Scope::<'scope, 'env>::new_unchecked(worker, self.inner)) }
}
} else {
worker.pop_back();
}
unsafe {
// SAFETY: we just popped the job from the queue, so it is safe to unwrap.
#[cfg(feature = "tracing")]
tracing::trace!("join_heartbeat: job was not shared, running a() inline");
stack.unwrap()(*self)
}
};
return worker
.join_heartbeat2_every::<_, _, _, _, 64>(&ScopeJob::new(a, self.inner), |_| b(*self));
(ra, rb)
// let stack = ScopeJob::new(a, self.inner);
// let job = ScopeJob::into_job(&stack);
// worker.push_back(&job);
// worker.tick();
// let rb = match catch_unwind(AssertUnwindSafe(|| b(*self))) {
// Ok(val) => val,
// Err(payload) => {
// #[cfg(feature = "tracing")]
// tracing::debug!("join_heartbeat: b panicked, waiting for a to finish");
// std::hint::cold_path();
// // if b panicked, we need to wait for a to finish
// let mut receiver = job.take_receiver();
// worker.wait_until_pred(|| match &receiver {
// Some(recv) => recv.poll().is_some(),
// None => {
// receiver = job.take_receiver();
// false
// }
// });
// resume_unwind(payload);
// }
// };
// let ra = if let Some(recv) = job.take_receiver() {
// match worker.wait_until_recv(recv) {
// Some(t) => crate::util::unwrap_or_panic(t),
// None => {
// #[cfg(feature = "tracing")]
// tracing::trace!(
// "join_heartbeat: job was shared, but reclaimed, running a() inline"
// );
// // the job was shared, but not yet stolen, so we get to run the
// // job inline
// unsafe { stack.unwrap()(*self) }
// }
// }
// } else {
// worker.pop_back();
// unsafe {
// // SAFETY: we just popped the job from the queue, so it is safe to unwrap.
// #[cfg(feature = "tracing")]
// tracing::trace!("join_heartbeat: job was not shared, running a() inline");
// stack.unwrap()(*self)
// }
// };
// (ra, rb)
}
fn new(worker: &WorkerThread, inner: Pin<&'scope ScopeInner>) -> Self {

View file

@ -174,7 +174,7 @@ impl WorkerThread {
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))]
#[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))]
fn execute(&self, job: SharedJob) {
unsafe { SharedJob::execute(job, self) };
self.tick();