From 5c7f1345c4efd2e4f3058d7a93312081020a9a5f Mon Sep 17 00:00:00 2001 From: Janis Date: Sat, 8 Feb 2025 04:52:18 +0100 Subject: [PATCH] aaaaaaaaaaaa --- src/job/v2.rs | 75 ++++-- src/lib.rs | 2 + src/melange.rs | 603 +++++++++++++++++++++---------------------------- src/util.rs | 2 +- 4 files changed, 316 insertions(+), 366 deletions(-) diff --git a/src/job/v2.rs b/src/job/v2.rs index b3c6adb..2f37b75 100644 --- a/src/job/v2.rs +++ b/src/job/v2.rs @@ -36,19 +36,43 @@ pub enum JobState { Inline = 1 << (u8::BITS - 1), } -pub struct Job { +pub struct Job { state: AtomicU8, this: SendPtr<()>, - harness: unsafe fn(*const (), *const Job<()>), + harness: unsafe fn(*const (), *const Job<()>, &mut S), maybe_boxed_val: UnsafeCell>>, waiting_thread: UnsafeCell>, } -impl Job { +impl Job { + pub unsafe fn cast_box(self: Box) -> Box> + where + T: Sized, + U: Sized, + { + let ptr = Box::into_raw(self); + + Box::from_raw(ptr.cast()) + } + + pub unsafe fn cast(self: &Self) -> &Job + where + T: Sized, + U: Sized, + { + // SAFETY: both T and U are sized, so Box and Box should be the + // same size as well. + unsafe { mem::transmute(self) } + } + + pub fn id(&self) -> impl Eq { + (self.this, self.harness) + } + pub fn state(&self) -> u8 { self.state.load(Ordering::Relaxed) & !(JobState::Inline as u8) } - pub fn wait(&self) -> Option { + pub fn wait(&self) -> T { let mut state = self.state.load(Ordering::Relaxed); let mask = JobState::Inline as u8; @@ -70,7 +94,7 @@ impl Job { .store(JobState::Pending as u8 | (state & mask), Ordering::Release); std::thread::park(); spin.reset(); - break; + continue; } Err(x) => { if x & JobState::Finished as u8 != 0 { @@ -78,14 +102,13 @@ impl Job { let value = (&*self.maybe_boxed_val.get()).assume_init_read(); value.get(state & JobState::Inline as u8 != 0) }; - return Some(val); + return val; } else { spin.spin(); } } } } - return None; } /// call this when popping value from local queue pub fn set_pending(&self) { @@ -110,9 +133,9 @@ impl Job { } } - pub fn execute(&self) { + pub fn execute(&self, s: &mut S) { // SAFETY: self is non-null - unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast()) }; + unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast(), s) }; } fn complete(&self, result: T) { @@ -166,14 +189,14 @@ impl HeapJob { pub fn new(f: F) -> Box { Box::new(Self { f }) } - pub fn into_boxed_job(self: Box) -> Box> + pub fn into_boxed_job(self: Box) -> Box> where - F: FnOnce() -> T + Send, + F: FnOnce(&mut S) -> T + Send, T: Send, { - unsafe fn harness(this: *const (), job: *const Job<()>) + unsafe fn harness(this: *const (), job: *const Job<()>, s: &mut S) where - F: FnOnce() -> T + Send, + F: FnOnce(&mut S) -> T + Send, T: Sized + Send, { let job = unsafe { &*job.cast::>() }; @@ -181,7 +204,7 @@ impl HeapJob { let this = unsafe { Box::from_raw(this.cast::>().cast_mut()) }; let f = this.f; - job.complete(f()); + job.complete(f(s)); } let size = mem::size_of::(); @@ -197,12 +220,18 @@ impl HeapJob { state: AtomicU8::new(new_state), this: SendPtr::new(Box::into_raw(self)).unwrap().cast(), waiting_thread: UnsafeCell::new(None), - harness: harness::, + harness: harness::, maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), }) } } +impl crate::latch::Probe for &Job { + fn probe(&self) -> bool { + self.state() == JobState::Finished as u8 + } +} + pub struct StackJob { f: UnsafeCell>, } @@ -218,14 +247,14 @@ impl StackJob { unsafe { ManuallyDrop::take(&mut *self.f.get()) } } - pub fn as_boxed_job(&self) -> Box> + pub fn as_job(&self) -> Job where - F: FnOnce() -> T + Send, + F: FnOnce(&mut S) -> T + Send, T: Send, { - unsafe fn harness(this: *const (), job: *const Job<()>) + unsafe fn harness(this: *const (), job: *const Job<()>, s: &mut S) where - F: FnOnce() -> T + Send, + F: FnOnce(&mut S) -> T + Send, T: Sized + Send, { let job = unsafe { &*job.cast::>() }; @@ -233,7 +262,7 @@ impl StackJob { let this = unsafe { &*this.cast::>() }; let f = unsafe { this.unwrap() }; - job.complete(f()); + job.complete(f(s)); } let size = mem::size_of::(); @@ -245,12 +274,12 @@ impl StackJob { JobState::Inline as u8 }; - Box::new(Job { + Job { state: AtomicU8::new(new_state), this: SendPtr::new(self).unwrap().cast(), waiting_thread: UnsafeCell::new(None), - harness: harness::, + harness: harness::, maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), - }) + } } } diff --git a/src/lib.rs b/src/lib.rs index 0b953ec..fdf68fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(vec_deque_pop_if)] + use std::{ cell::{Cell, UnsafeCell}, future::Future, diff --git a/src/melange.rs b/src/melange.rs index da69c0d..b802f43 100644 --- a/src/melange.rs +++ b/src/melange.rs @@ -17,295 +17,299 @@ use parking_lot::{Condvar, Mutex}; use crate::{latch::*, ThreadControl}; mod job { - use std::{ - cell::{Cell, UnsafeCell}, - collections::VecDeque, - mem::ManuallyDrop, - panic::{self, AssertUnwindSafe}, - ptr::NonNull, + use core::{ + cell::UnsafeCell, + mem::{self, ManuallyDrop, MaybeUninit}, sync::atomic::{AtomicU8, Ordering}, - thread::{self, Thread}, }; + use std::thread::Thread; - use super::WorkerThread as Scope; + use parking_lot_core::SpinWait; - enum Poll { + use crate::util::SendPtr; + + use super::WorkerThread; + + #[allow(dead_code)] + #[cfg_attr(target_pointer_width = "64", repr(align(16)))] + #[cfg_attr(target_pointer_width = "32", repr(align(8)))] + #[derive(Debug, Default, Clone, Copy)] + struct Size2([usize; 2]); + + struct Value(pub MaybeUninit>>); + + impl Value { + unsafe fn get(self, inline: bool) -> T { + if inline { + unsafe { mem::transmute_copy(&self.0) } + } else { + unsafe { (*self.0.assume_init()).assume_init() } + } + } + } + + #[repr(u8)] + pub enum JobState { + Empty, + Locked = 1, Pending, - Ready, - Locked, + Finished, + Inline = 1 << (u8::BITS - 1), } - #[derive(Debug, Default)] - pub struct Future { + pub struct Job { state: AtomicU8, - /// Can only be accessed if `state` is `Poll::Locked`. + this: SendPtr<()>, + harness: unsafe fn(*const (), *const Job<()>, &mut WorkerThread), + maybe_boxed_val: UnsafeCell>>, waiting_thread: UnsafeCell>, - /// Can only be written if `state` is `Poll::Locked` and read if `state` is - /// `Poll::Ready`. - val: UnsafeCell>>>, } - impl Future { - pub fn poll(&self) -> bool { - self.state.load(Ordering::Acquire) == Poll::Ready as u8 + impl Job { + pub unsafe fn cast_box(self: Box) -> Box> + where + T: Sized, + U: Sized, + { + let ptr = Box::into_raw(self); + + Box::from_raw(ptr.cast()) } - pub fn wait(&self) -> Option> { + pub unsafe fn cast(self: &Self) -> &Job + where + T: Sized, + U: Sized, + { + // SAFETY: both T and U are sized, so Box and Box should be the + // same size as well. + unsafe { mem::transmute(self) } + } + + pub fn state(&self) -> u8 { + self.state.load(Ordering::Relaxed) & !(JobState::Inline as u8) + } + pub fn wait(&self) -> T { + let mut state = self.state.load(Ordering::Relaxed); + let mask = JobState::Inline as u8; + + let mut spin = SpinWait::new(); loop { - let result = self.state.compare_exchange( - Poll::Pending as u8, - Poll::Locked as u8, - Ordering::AcqRel, + match self.state.compare_exchange( + JobState::Pending as u8 | (state & mask), + JobState::Locked as u8 | (state & mask), Ordering::Acquire, - ); + Ordering::Relaxed, + ) { + Ok(x) => { + state = x; + unsafe { + *self.waiting_thread.get() = Some(std::thread::current()); + } - match result { - Ok(_) => { - // SAFETY: - // Lock is acquired, only we are accessing `self.waiting_thread`. - unsafe { *self.waiting_thread.get() = Some(thread::current()) }; - - self.state.store(Poll::Pending as u8, Ordering::Release); - - thread::park(); - - // Skip yielding after being woken up. + self.state + .store(JobState::Pending as u8 | (state & mask), Ordering::Release); + std::thread::park(); + spin.reset(); continue; } - Err(state) if state == Poll::Ready as u8 => { - // SAFETY: - // `state` is `Poll::Ready` only after `Self::complete` - // releases the lock. - // - // Calling `Self::complete` when `state` is `Poll::Ready` - // cannot mutate `self.val`. - break unsafe { (*self.val.get()).take().map(|b| *b) }; + Err(x) => { + if x & JobState::Finished as u8 != 0 { + let val = unsafe { + let value = (&*self.maybe_boxed_val.get()).assume_init_read(); + value.get(state & JobState::Inline as u8 != 0) + }; + return val; + } else { + spin.spin(); + } } - _ => (), } + } + } + /// call this when popping value from local queue + pub fn set_pending(&self) { + let state = self.state.load(Ordering::Relaxed); + let mask = JobState::Inline as u8; - thread::yield_now(); + let mut spin = SpinWait::new(); + loop { + match self.state.compare_exchange( + JobState::Empty as u8 | (state & mask), + JobState::Pending as u8 | (state & mask), + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => { + return; + } + Err(_) => { + spin.spin(); + } + } } } - pub fn complete(&self, val: thread::Result) { - let val = Box::new(val); + pub fn execute(&self, s: &mut WorkerThread) { + // SAFETY: self is non-null + unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast(), s) }; + } + fn complete(&self, result: T) { + let mut state = self.state.load(Ordering::Relaxed); + let mask = JobState::Inline as u8; + + let mut spin = SpinWait::new(); loop { - let result = self.state.compare_exchange( - Poll::Pending as u8, - Poll::Locked as u8, - Ordering::AcqRel, + match self.state.compare_exchange( + JobState::Pending as u8 | (state & mask), + JobState::Locked as u8 | (state & mask), Ordering::Acquire, - ); - - match result { - Ok(_) => break, - Err(_) => thread::yield_now(), + Ordering::Relaxed, + ) { + Ok(x) => { + state = x; + break; + } + Err(_) => { + spin.spin(); + } } } - // SAFETY: - // Lock is acquired, only we are accessing `self.val`. unsafe { - *self.val.get() = Some(val); + let value = (&mut *self.maybe_boxed_val.get()).assume_init_mut(); + // SAFETY: we know the box is allocated if state was `Pending`. + if state & JobState::Inline as u8 == 0 { + value.0 = MaybeUninit::new(Box::new(MaybeUninit::new(result))); + } else { + *mem::transmute::<_, &mut T>(&mut value.0) = result; + } } - // SAFETY: - // Lock is acquired, only we are accessing `self.waiting_thread`. - if let Some(thread) = unsafe { (*self.waiting_thread.get()).take() } { + if let Some(thread) = unsafe { &mut *self.waiting_thread.get() }.take() { thread.unpark(); } - self.state.store(Poll::Ready as u8, Ordering::Release); + self.state + .store(JobState::Finished as u8 | (state & mask), Ordering::Release); } } - pub struct JobStack { - /// All code paths should call either `Job::execute` or `Self::unwrap` to - /// avoid a potential memory leak. + impl Job {} + + pub struct HeapJob { + f: F, + } + + impl HeapJob { + pub fn new(f: F) -> Box { + Box::new(Self { f }) + } + pub fn into_boxed_job(self: Box) -> Box> + where + F: FnOnce(&mut WorkerThread) -> T + Send, + T: Send, + { + unsafe fn harness(this: *const (), job: *const Job<()>, s: &mut WorkerThread) + where + F: FnOnce(&mut WorkerThread) -> T + Send, + T: Sized + Send, + { + let job = unsafe { &*job.cast::>() }; + + let this = unsafe { Box::from_raw(this.cast::>().cast_mut()) }; + let f = this.f; + + job.complete(f(s)); + } + + let size = mem::size_of::(); + let align = mem::align_of::(); + + let new_state = if size > mem::size_of::>() || align > mem::align_of::>() + { + JobState::Empty as u8 + } else { + JobState::Inline as u8 + }; + + Box::new(Job { + state: AtomicU8::new(new_state), + this: SendPtr::new(Box::into_raw(self)).unwrap().cast(), + waiting_thread: UnsafeCell::new(None), + harness: harness::, + maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), + }) + } + } + + impl crate::latch::Probe for &Job { + fn probe(&self) -> bool { + self.state() == JobState::Finished as u8 + } + } + + pub struct StackJob { f: UnsafeCell>, } - impl JobStack { + impl StackJob { pub fn new(f: F) -> Self { Self { f: UnsafeCell::new(ManuallyDrop::new(f)), } } - /// SAFETY: - /// It should only be called once. - pub unsafe fn take_once(&self) -> F { - // SAFETY: - // No `Job` has has been executed, therefore `self.f` has not yet been - // `take`n. + pub unsafe fn unwrap(&self) -> F { unsafe { ManuallyDrop::take(&mut *self.f.get()) } } - } - /// `Job` is only sent, not shared between threads. - /// - /// When popped from the `JobQueue`, it gets copied before sending across - /// thread boundaries. - #[derive(Clone, Debug)] - pub struct Job { - stack: NonNull, - harness: unsafe fn(&mut Scope, NonNull, NonNull), - fut: Cell>>>, - } - - impl Job { - pub fn new(stack: &JobStack) -> Self + pub fn as_job(&self) -> Job where - F: FnOnce(&mut Scope) -> T + Send, + F: FnOnce(&mut WorkerThread) -> T + Send, T: Send, { - /// SAFETY: - /// It should only be called while the `stack` is still alive. - unsafe fn harness( - scope: &mut Scope, - stack: NonNull, - fut: NonNull, - ) where - F: FnOnce(&mut Scope) -> T + Send, + unsafe fn harness(this: *const (), job: *const Job<()>, s: &mut WorkerThread) + where + F: FnOnce(&mut WorkerThread) -> T + Send, T: Send, { - // SAFETY: - // The `stack` is still alive. - let stack: &JobStack = unsafe { stack.cast().as_ref() }; - // SAFETY: - // This is the first call to `take_once` since `Job::execute` - // (the only place where this harness is called) is called only - // after the job has been popped. - let f = unsafe { stack.take_once() }; - // SAFETY: - // Before being popped, the `JobQueue` allocates and stores a - // `Future` in `self.fur_or_next` that should get passed here. - let fut: &Future = unsafe { fut.cast().as_ref() }; + let job = unsafe { &*job.cast::>() }; - fut.complete(panic::catch_unwind(AssertUnwindSafe(|| f(scope)))); + let this = unsafe { &*this.cast::>() }; + let f = unsafe { this.unwrap() }; + + job.complete(f(s)); } - Self { - stack: NonNull::from(stack).cast(), + let size = mem::size_of::(); + let align = mem::align_of::(); + + let new_state = if size > mem::size_of::>() || align > mem::align_of::>() + { + JobState::Empty as u8 + } else { + JobState::Inline as u8 + }; + + Job { + state: AtomicU8::new(new_state), + this: SendPtr::new(self).unwrap().cast(), + waiting_thread: UnsafeCell::new(None), harness: harness::, - fut: Cell::new(None), + maybe_boxed_val: UnsafeCell::new(MaybeUninit::uninit()), } } - - pub fn is_waiting(&self) -> bool { - self.fut.get().is_none() - } - - pub fn eq(&self, other: &Job) -> bool { - self.stack == other.stack - } - - /// SAFETY: - /// It should only be called after being popped from a `JobQueue`. - pub unsafe fn poll(&self) -> bool { - self.fut - .get() - .map(|fut| { - // SAFETY: - // Before being popped, the `JobQueue` allocates and stores a - // `Future` in `self.fur_or_next` that should get passed here. - let fut = unsafe { fut.as_ref() }; - fut.poll() - }) - .unwrap_or_default() - } - - /// SAFETY: - /// It should only be called after being popped from a `JobQueue`. - pub unsafe fn wait(&self) -> Option> { - self.fut.get().and_then(|fut| { - // SAFETY: - // Before being popped, the `JobQueue` allocates and stores a - // `Future` in `self.fur_or_next` that should get passed here. - let result = unsafe { fut.as_ref().wait() }; - // SAFETY: - // We only can drop the `Box` *after* waiting on the `Future` - // in order to ensure unique access. - unsafe { - drop(Box::from_raw(fut.as_ptr())); - } - - result - }) - } - - /// SAFETY: - /// It should only be called in the case where the job has been popped - /// from the front and will not be `Job::Wait`ed. - pub unsafe fn drop(&self) { - if let Some(fut) = self.fut.get() { - // SAFETY: - // Before being popped, the `JobQueue` allocates and store a - // `Future` in `self.fur_or_next` that should get passed here. - unsafe { - drop(Box::from_raw(fut.as_ptr())); - } - } - } - } - - impl Job { - /// SAFETY: - /// It should only be called while the `JobStack` it was created with is - /// still alive and after being popped from a `JobQueue`. - pub unsafe fn execute(&self, scope: &mut Scope) { - // SAFETY: - // Before being popped, the `JobQueue` allocates and store a - // `Future` in `self.fur_or_next` that should get passed here. - unsafe { - (self.harness)(scope, self.stack, self.fut.get().unwrap()); - } - } - } - - // SAFETY: - // The job's `stack` will only be accessed after acquiring a lock (in - // `Future`), while `prev` and `fut_or_next` are never accessed after being - // sent across threads. - unsafe impl Send for Job {} - - #[derive(Debug, Default)] - pub struct JobQueue(VecDeque>); - - impl JobQueue { - pub fn len(&self) -> usize { - self.0.len() - } - - /// SAFETY: - /// Any `Job` pushed onto the queue should alive at least until it gets - /// popped. - pub unsafe fn push_back(&mut self, job: &Job) { - self.0.push_back(NonNull::from(job).cast()); - } - - pub fn pop_back(&mut self) { - self.0.pop_back(); - } - - pub fn pop_front(&mut self) -> Option { - // SAFETY: - // `Job` is still alive as per contract in `push_back`. - let job = unsafe { self.0.pop_front()?.as_ref() }; - job.fut - .set(Some(Box::leak(Box::new(Future::default())).into())); - - Some(job.clone()) - } } } //use job::{Future, Job, JobQueue, JobStack}; -use crate::job::v2::{Job, JobState, StackJob}; +use crate::job::v2::{Job as JobArchetype, JobState, StackJob}; // use crate::job::{Job, JobRef, StackJob}; +type Job = JobArchetype; + struct ThreadState { control: ThreadControl, } @@ -320,6 +324,9 @@ pub struct SharedContext { rng: crate::rng::XorShift64Star, } +// SAFETY: Job is Send +unsafe impl Send for SharedContext {} + pub struct Context { shared: Mutex, threads: Box<[CachePadded]>, @@ -360,17 +367,13 @@ impl SharedContext { .next() } - fn pop_random_task(&mut self) -> Option { + fn pop_random_task(&mut self) -> Option> { let i = self.rng.next_usize(self.shared_tasks.len()); let (a, b) = self.shared_tasks.split_at_mut(i); a.into_iter().chain(b).filter_map(|task| task.take()).next() } } -std::thread_local! { - static WORKER_THREAD_STATE: Cell<*const WorkerThread> = const { Cell::new(ptr::null())}; -} - pub struct WorkerThread { context: Arc, index: usize, @@ -381,6 +384,9 @@ pub struct WorkerThread { _marker: PhantomPinned, } +// SAFETY: Job is Send +unsafe impl Send for WorkerThread {} + impl WorkerThread { fn new(context: Arc, heartbeat: Arc, index: usize) -> WorkerThread { WorkerThread { @@ -393,23 +399,6 @@ impl WorkerThread { _marker: PhantomPinned, } } - unsafe fn set_current(this: *const Self) { - WORKER_THREAD_STATE.with(|ptr| { - assert!(ptr.get().is_null()); - ptr.set(this); - }); - } - unsafe fn unset_current() { - WORKER_THREAD_STATE.with(|ptr| { - assert!(!ptr.get().is_null()); - ptr.set(ptr::null()); - }); - } - unsafe fn current() -> *const WorkerThread { - let ptr = WORKER_THREAD_STATE.with(|ptr| ptr.get()); - - ptr - } fn state(&self) -> &CachePadded { &self.context.threads[self.index] } @@ -422,89 +411,33 @@ impl WorkerThread { fn ctx(&self) -> &Arc { &self.context } - - fn with) -> T>(f: F) -> T { - WORKER_THREAD_STATE.with(|worker| { - f(unsafe { NonNull::new(worker.get().cast_mut()).map(|ptr| ptr.as_ref()) }) - }) - } - - fn with_mut) -> T>(f: F) -> T { - WORKER_THREAD_STATE.with(|worker| { - f(unsafe { NonNull::new(worker.get().cast_mut()).map(|mut ptr| ptr.as_mut()) }) - }) - } } -struct CurrentWorker; - -impl Deref for CurrentWorker { - type Target = WorkerThread; - - fn deref(&self) -> &Self::Target { - unsafe { - NonNull::new(WorkerThread::current().cast_mut()) - .unwrap() - .as_ref() - } - } -} - -impl DerefMut for CurrentWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { - NonNull::new(WorkerThread::current().cast_mut()) - .unwrap() - .as_mut() - } - } -} - -// impl Drop for WorkerThread { -// fn drop(&mut self) { -// WORKER_THREAD_STATE.with(|ptr| { -// assert!(!ptr.get().is_null()); -// ptr.set(ptr::null()); -// }); -// } -// } - impl WorkerThread { - fn worker(self) { - { - let worker = Box::leak(Box::new(self)); - unsafe { - WorkerThread::set_current(worker); - } - } - - CurrentWorker.control().notify_running(); + fn worker(mut self) { + self.control().notify_running(); loop { - let task = { CurrentWorker.shared().lock().pop_first_task() }; + let task = { self.shared().lock().pop_first_task() }; if let Some(task) = task { - CurrentWorker.execute_job(task); + self.execute_job(task); } - if CurrentWorker.control().should_terminate.probe() { + if self.control().should_terminate.probe() { break; } - let mut guard = CurrentWorker.shared().lock(); - CurrentWorker.ctx().task_shared.wait(&mut guard); + let mut guard = self.shared().lock(); + self.ctx().task_shared.wait(&mut guard); } - CurrentWorker.control().notify_termination(); - unsafe { - let worker = Box::from_raw(WorkerThread::current().cast_mut()); - WorkerThread::unset_current(); - } + self.control().notify_termination(); } fn execute_job(&mut self, job: NonNull) { unsafe { - job.as_ref().execute(); + job.as_ref().execute(self); } } @@ -568,44 +501,30 @@ impl WorkerThread { RA: Send, RB: Send, { - let mut ra = None; - let latch = AtomicLatch::new(); - let a = |scope: &mut WorkerThread| { - if scope.heartbeat.load(Ordering::Relaxed) { - scope.heartbeat_cold(); - } + let b = StackJob::new(b); - ra = Some(a(scope)); - unsafe { - Latch::set_raw(&latch); - } + let job = Box::new(b.as_job()); + self.queue + .push_back(unsafe { NonNull::new_unchecked(&job as *const _ as *mut _) }); + let job = unsafe { job.cast_box::() }; + + let ra = a(self); + + let rb = if job.state() == JobState::Empty as u8 { + self.pop_job_id(unsafe { job.as_ref().cast() }); + unsafe { b.unwrap()(self) } + } else { + self.run_until(&job.as_ref()); + job.wait() }; - let stack = StackJob::new(a); - let task: JobRef = - unsafe { core::mem::transmute::, JobRef>(stack.as_task_ref()) }; + (ra, rb) + } - let id = task.id(); - self.queue.push_back(task); - - let rb = b(self); - - if !latch.probe() { - if let Some(job) = self.queue.pop_back() { - if job.id() == id { - unsafe { - (stack.take_once())(self); - } - return (ra.unwrap(), rb); - } else { - self.queue.push_back(job); - } - } - } - - self.run_until(&latch); - - (ra.unwrap(), rb) + fn pop_job_id(&mut self, id: &Job) -> Option<&Job<()>> { + self.queue + .pop_back_if(|job| unsafe { (&*job).as_ref().id() == id.id() }) + .map(|job| unsafe { job.as_ref() }) } fn run_until(&mut self, latch: &L) { diff --git a/src/util.rs b/src/util.rs index f903d8f..5678402 100644 --- a/src/util.rs +++ b/src/util.rs @@ -4,7 +4,7 @@ use core::{ ptr::NonNull, }; -#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Copy)] #[repr(transparent)] pub struct SendPtr(NonNull);