Compare commits
5 commits
448d2d02b4
...
edaa32591e
Author | SHA1 | Date | |
---|---|---|---|
|
edaa32591e | ||
|
3d32569e2f | ||
|
8b3ecb1455 | ||
|
3eec242097 | ||
|
3730952cad |
|
@ -5,6 +5,7 @@
|
||||||
cold_path,
|
cold_path,
|
||||||
fn_align,
|
fn_align,
|
||||||
box_vec_non_null,
|
box_vec_non_null,
|
||||||
|
box_as_ptr,
|
||||||
atomic_try_update,
|
atomic_try_update,
|
||||||
let_chains
|
let_chains
|
||||||
)]
|
)]
|
||||||
|
|
|
@ -71,6 +71,8 @@ mod util {
|
||||||
None => None,
|
None => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
pub const unsafe fn new_unchecked(ptr: *mut T) -> Self {
|
pub const unsafe fn new_unchecked(ptr: *mut T) -> Self {
|
||||||
unsafe { Self(NonNull::new_unchecked(ptr)) }
|
unsafe { Self(NonNull::new_unchecked(ptr)) }
|
||||||
}
|
}
|
||||||
|
@ -79,6 +81,7 @@ mod util {
|
||||||
Self::new(ptr.cast_mut())
|
Self::new(ptr.cast_mut())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
pub const unsafe fn new_const_unchecked(ptr: *const T) -> Self {
|
pub const unsafe fn new_const_unchecked(ptr: *const T) -> Self {
|
||||||
Self::new_unchecked(ptr.cast_mut())
|
Self::new_unchecked(ptr.cast_mut())
|
||||||
}
|
}
|
||||||
|
@ -429,8 +432,9 @@ mod job {
|
||||||
// for some reason I confused head and tail here and the list is something like this:
|
// for some reason I confused head and tail here and the list is something like this:
|
||||||
// tail <-> job1 <-> job2 <-> ... <-> head
|
// tail <-> job1 <-> job2 <-> ... <-> head
|
||||||
pub struct JobList {
|
pub struct JobList {
|
||||||
head: Box<Job>,
|
// these cannot be boxes because boxes are noalias.
|
||||||
tail: Box<Job>,
|
head: NonNull<Job>,
|
||||||
|
tail: NonNull<Job>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Debug for JobList {
|
impl Debug for JobList {
|
||||||
|
@ -468,73 +472,70 @@ mod job {
|
||||||
|
|
||||||
impl JobList {
|
impl JobList {
|
||||||
pub fn new() -> JobList {
|
pub fn new() -> JobList {
|
||||||
let head = Box::new(Job::empty());
|
let head = Box::into_raw(Box::new(Job::empty()));
|
||||||
let tail = Box::new(Job::empty());
|
let tail = Box::into_raw(Box::new(Job::empty()));
|
||||||
|
|
||||||
// head and tail point at themselves
|
// head and tail point at themselves
|
||||||
unsafe {
|
unsafe {
|
||||||
(&mut *head.err_or_link.get()).link.next = None;
|
(&mut *(&mut *head).err_or_link.get()).link.next = None;
|
||||||
(&mut *head.err_or_link.get()).link.prev =
|
(&mut *(&mut *head).err_or_link.get()).link.prev =
|
||||||
Some(NonNull::new_unchecked((&raw const *tail).cast_mut()));
|
Some(NonNull::new_unchecked(tail));
|
||||||
|
|
||||||
(&mut *tail.err_or_link.get()).link.next =
|
(&mut *(&mut *tail).err_or_link.get()).link.prev = None;
|
||||||
Some(NonNull::new_unchecked((&raw const *head).cast_mut()));
|
(&mut *(&mut *tail).err_or_link.get()).link.next =
|
||||||
(&mut *tail.err_or_link.get()).link.prev = None;
|
Some(NonNull::new_unchecked(head));
|
||||||
|
|
||||||
|
Self {
|
||||||
|
head: NonNull::new_unchecked(head),
|
||||||
|
tail: NonNull::new_unchecked(tail),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Self { head, tail }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn head_ptr(&self) -> *const Job {
|
|
||||||
&raw const *self.head
|
|
||||||
}
|
|
||||||
fn tail_ptr(&self) -> *const Job {
|
|
||||||
&raw const *self.tail
|
|
||||||
}
|
|
||||||
fn head(&self) -> NonNull<Job> {
|
fn head(&self) -> NonNull<Job> {
|
||||||
unsafe { NonNull::new_unchecked(self.head_ptr().cast_mut()) }
|
self.head
|
||||||
}
|
}
|
||||||
fn tail(&self) -> NonNull<Job> {
|
fn tail(&self) -> NonNull<Job> {
|
||||||
unsafe { NonNull::new_unchecked(self.tail_ptr().cast_mut()) }
|
self.tail
|
||||||
}
|
}
|
||||||
|
|
||||||
/// elem must be valid until it is popped.
|
/// elem must be valid until it is popped.
|
||||||
pub unsafe fn push_front<T>(&mut self, elem: &Job<T>) {
|
pub unsafe fn push_front<T>(&mut self, elem: *const Job<T>) {
|
||||||
let head_link = unsafe { self.head.link_mut() };
|
let head_link = unsafe { self.head.as_ref().link_mut() };
|
||||||
|
|
||||||
// SAFETY: head will always have a previous element.
|
// SAFETY: head will always have a previous element.
|
||||||
let prev = head_link.prev.unwrap();
|
let prev = head_link.prev.unwrap();
|
||||||
let prev_link = unsafe { prev.as_ref().link_mut() };
|
let prev_link = unsafe { prev.as_ref().link_mut() };
|
||||||
|
|
||||||
let elem_ptr = unsafe { NonNull::new_unchecked(&*elem as *const Job<T> as *mut Job) };
|
let elem_ptr = unsafe { NonNull::new_unchecked(elem as _) };
|
||||||
head_link.prev = Some(elem_ptr);
|
head_link.prev = Some(elem_ptr);
|
||||||
prev_link.next = Some(elem_ptr);
|
prev_link.next = Some(elem_ptr);
|
||||||
|
|
||||||
let elem_link = unsafe { elem.link_mut() };
|
let elem_link = unsafe { (*elem).link_mut() };
|
||||||
elem_link.prev = Some(prev);
|
elem_link.prev = Some(prev);
|
||||||
elem_link.next = Some(self.head());
|
elem_link.next = Some(self.head());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// elem must be valid until it is popped.
|
/// elem must be valid until it is popped.
|
||||||
pub unsafe fn push_back<T>(&mut self, elem: &Job<T>) {
|
pub unsafe fn push_back<T>(&mut self, elem: *const Job<T>) {
|
||||||
let tail_link = unsafe { self.tail.link_mut() };
|
let tail_link = unsafe { self.tail.as_ref().link_mut() };
|
||||||
|
|
||||||
// SAFETY: tail will always have a previous element.
|
// SAFETY: tail will always have a previous element.
|
||||||
let next = tail_link.next.unwrap();
|
let next = tail_link.next.unwrap();
|
||||||
let next_link = unsafe { next.as_ref().link_mut() };
|
let next_link = unsafe { next.as_ref().link_mut() };
|
||||||
|
|
||||||
let elem_ptr = unsafe { NonNull::new_unchecked(&*elem as *const Job<T> as *mut Job) };
|
let elem_ptr = unsafe { NonNull::new_unchecked(elem as _) };
|
||||||
tail_link.next = Some(elem_ptr);
|
tail_link.next = Some(elem_ptr);
|
||||||
next_link.prev = Some(elem_ptr);
|
next_link.prev = Some(elem_ptr);
|
||||||
|
|
||||||
let elem_link = unsafe { elem.link_mut() };
|
let elem_link = unsafe { (*elem).link_mut() };
|
||||||
elem_link.next = Some(next);
|
elem_link.next = Some(next);
|
||||||
elem_link.prev = Some(self.tail());
|
elem_link.prev = Some(self.tail());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
|
pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
|
||||||
let head_link = unsafe { self.head.link_mut() };
|
let head_link = unsafe { self.head.as_ref().link_mut() };
|
||||||
|
|
||||||
// SAFETY: head will always have a previous element.
|
// SAFETY: head will always have a previous element.
|
||||||
let elem = head_link.prev.unwrap();
|
let elem = head_link.prev.unwrap();
|
||||||
|
@ -551,7 +552,7 @@ mod job {
|
||||||
|
|
||||||
pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
|
pub fn pop_back(&mut self) -> Option<NonNull<Job>> {
|
||||||
// TODO: next and elem might be the same
|
// TODO: next and elem might be the same
|
||||||
let tail_link = unsafe { self.tail.link_mut() };
|
let tail_link = unsafe { self.tail.as_ref().link_mut() };
|
||||||
|
|
||||||
// SAFETY: head will always have a previous element.
|
// SAFETY: head will always have a previous element.
|
||||||
let elem = tail_link.next.unwrap();
|
let elem = tail_link.next.unwrap();
|
||||||
|
@ -567,6 +568,16 @@ mod job {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for JobList {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// Need to drop the head and tail, which were allocated on the heap.
|
||||||
|
// elements of the list are managed externally.
|
||||||
|
unsafe {
|
||||||
|
drop((Box::from_non_null(self.head), Box::from_non_null(self.tail)));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
union ValueOrThis<T> {
|
union ValueOrThis<T> {
|
||||||
uninit: (),
|
uninit: (),
|
||||||
value: ManuallyDrop<SmallBox<T>>,
|
value: ManuallyDrop<SmallBox<T>>,
|
||||||
|
@ -597,6 +608,7 @@ mod job {
|
||||||
error: ManuallyDrop<Option<Box<dyn Any + Send + 'static>>>,
|
error: ManuallyDrop<Option<Box<dyn Any + Send + 'static>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
pub struct Job<T = ()> {
|
pub struct Job<T = ()> {
|
||||||
/// tagged pointer, 8-aligned
|
/// tagged pointer, 8-aligned
|
||||||
harness_and_state: TaggedAtomicPtr<usize, 3>,
|
harness_and_state: TaggedAtomicPtr<usize, 3>,
|
||||||
|
@ -682,6 +694,8 @@ mod job {
|
||||||
mem::transmute::<&Job<T>, &Job<U>>(self)
|
mem::transmute::<&Job<T>, &Job<U>>(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// unwraps the `this` pointer, which is only valid if the job is in the empty state.
|
||||||
|
#[allow(dead_code)]
|
||||||
pub unsafe fn unwrap_this(&self) -> NonNull<()> {
|
pub unsafe fn unwrap_this(&self) -> NonNull<()> {
|
||||||
assert!(self.state() == JobState::Empty as u8);
|
assert!(self.state() == JobState::Empty as u8);
|
||||||
unsafe { (&*self.val_or_this.get()).this }
|
unsafe { (&*self.val_or_this.get()).this }
|
||||||
|
@ -758,8 +772,7 @@ mod job {
|
||||||
// after sleeping, state should be `Finished`
|
// after sleeping, state should be `Finished`
|
||||||
}
|
}
|
||||||
Err(state) => {
|
Err(state) => {
|
||||||
// debug_assert_ne!(state, JobState::Pending as usize);
|
// job finished under us, check if it was successful
|
||||||
|
|
||||||
if state == JobState::Finished as usize {
|
if state == JobState::Finished as usize {
|
||||||
let err = unsafe { (&mut *self.err_or_link.get()).error.take() };
|
let err = unsafe { (&mut *self.err_or_link.get()).error.take() };
|
||||||
|
|
||||||
|
@ -900,12 +913,14 @@ mod job {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// unwraps the job into it's closure.
|
||||||
|
#[allow(dead_code)]
|
||||||
pub fn into_inner(self) -> F {
|
pub fn into_inner(self) -> F {
|
||||||
self.f
|
self.f
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn into_boxed_job<T>(self: Box<Self>) -> Box<Job<()>>
|
pub fn into_boxed_job<T>(self: Box<Self>) -> *mut Job<()>
|
||||||
where
|
where
|
||||||
F: FnOnce() -> T + Send,
|
F: FnOnce() -> T + Send,
|
||||||
T: Send,
|
T: Send,
|
||||||
|
@ -916,10 +931,15 @@ mod job {
|
||||||
F: FnOnce() -> T + Send,
|
F: FnOnce() -> T + Send,
|
||||||
T: Sized + Send,
|
T: Sized + Send,
|
||||||
{
|
{
|
||||||
eprintln!("heapjob harness");
|
|
||||||
let job = job.cast_mut();
|
let job = job.cast_mut();
|
||||||
|
|
||||||
|
// turn `this`, which was allocated at (2), into box.
|
||||||
|
// miri complains this is a use-after-free, but it isn't? silly miri...
|
||||||
|
// Turns out this is actually correct on miri's end, but because
|
||||||
|
// we ensure that the scope lives as long as any jobs, this is
|
||||||
|
// actually fine, as far as I can tell.
|
||||||
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
|
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
|
||||||
let f = this.f;
|
let f = this.into_inner();
|
||||||
|
|
||||||
_ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
|
_ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
|
||||||
|
|
||||||
|
@ -931,9 +951,10 @@ mod job {
|
||||||
}
|
}
|
||||||
|
|
||||||
// (1) allocate box for job
|
// (1) allocate box for job
|
||||||
Box::new(Job::new(harness::<F, T>, unsafe {
|
Box::into_raw(Box::new(Job::new(harness::<F, T>, {
|
||||||
NonNull::new_unchecked(Box::into_raw(self)).cast()
|
// (2) convert self into a pointer
|
||||||
}))
|
Box::into_non_null(self).cast()
|
||||||
|
})))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -994,6 +1015,8 @@ mod job {
|
||||||
Self { result }
|
Self { result }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// convert JobResult into a thread result.
|
||||||
|
#[allow(dead_code)]
|
||||||
pub fn into_inner(self) -> std::thread::Result<T> {
|
pub fn into_inner(self) -> std::thread::Result<T> {
|
||||||
self.result
|
self.result
|
||||||
}
|
}
|
||||||
|
@ -1018,7 +1041,7 @@ use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
hint::cold_path,
|
hint::cold_path,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
mem::{self, MaybeUninit},
|
mem::MaybeUninit,
|
||||||
ptr::{self, NonNull},
|
ptr::{self, NonNull},
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
|
atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering},
|
||||||
|
@ -1061,7 +1084,8 @@ impl JobCounter {
|
||||||
|
|
||||||
/// must only be called once
|
/// must only be called once
|
||||||
pub unsafe fn wait(&self) {
|
pub unsafe fn wait(&self) {
|
||||||
_ = self.waker.lock().insert(std::thread::current());
|
// SAFETY: this is only called once, so the waker is guaranteed to be None.
|
||||||
|
assert!(self.waker.lock().replace(std::thread::current()).is_none());
|
||||||
|
|
||||||
let count = self.jobs_pending.load(Ordering::SeqCst);
|
let count = self.jobs_pending.load(Ordering::SeqCst);
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
|
@ -1121,6 +1145,7 @@ impl WorkerThread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
fn drop_current_guard(new: Option<NonNull<Self>>) -> DropGuard<impl FnOnce()> {
|
fn drop_current_guard(new: Option<NonNull<Self>>) -> DropGuard<impl FnOnce()> {
|
||||||
DropGuard::new(move || unsafe {
|
DropGuard::new(move || unsafe {
|
||||||
if let Some(old) = Self::unset_current() {
|
if let Some(old) = Self::unset_current() {
|
||||||
|
@ -1158,21 +1183,23 @@ impl WorkerThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
#[inline(always)]
|
||||||
fn current() -> Option<NonNull<WorkerThread>> {
|
fn current() -> Option<NonNull<WorkerThread>> {
|
||||||
WORKER.with(|ptr| unsafe { *ptr.get() })
|
unsafe { *WORKER.with(UnsafeCell::get) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
fn current_ref<'a>() -> Option<&'a WorkerThread> {
|
fn current_ref<'a>() -> Option<&'a WorkerThread> {
|
||||||
WORKER.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) })
|
unsafe { (*WORKER.with(UnsafeCell::get)).map(|ptr| ptr.as_ref()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn push_front<T>(&self, job: &Job<T>) {
|
fn push_front<T>(&self, job: *const Job<T>) {
|
||||||
unsafe {
|
unsafe {
|
||||||
self.queue.as_mut_unchecked().push_front(job);
|
self.queue.as_mut_unchecked().push_front(job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn push_back<T>(&self, job: &Job<T>) {
|
fn push_back<T>(&self, job: *const Job<T>) {
|
||||||
unsafe {
|
unsafe {
|
||||||
self.queue.as_mut_unchecked().push_back(job);
|
self.queue.as_mut_unchecked().push_back(job);
|
||||||
}
|
}
|
||||||
|
@ -1185,15 +1212,6 @@ impl WorkerThread {
|
||||||
unsafe { self.queue.as_mut_unchecked().pop_front() }
|
unsafe { self.queue.as_mut_unchecked().pop_front() }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn complete_jobs(&self) {
|
|
||||||
while let Some(job) = self.pop_front() {
|
|
||||||
unsafe {
|
|
||||||
job.as_ref().set_pending();
|
|
||||||
}
|
|
||||||
self.execute(job);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn tick(&self) {
|
fn tick(&self) {
|
||||||
if self.heartbeat.load(Ordering::Relaxed) {
|
if self.heartbeat.load(Ordering::Relaxed) {
|
||||||
|
@ -1262,6 +1280,7 @@ impl WorkerThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This function must be called from a worker thread.
|
/// This function must be called from a worker thread.
|
||||||
|
#[inline]
|
||||||
fn join_heartbeat<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
|
fn join_heartbeat<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
|
||||||
where
|
where
|
||||||
RA: Send,
|
RA: Send,
|
||||||
|
@ -1360,20 +1379,22 @@ impl WorkerThread {
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
// TODO: spin2win
|
|
||||||
tracing::trace!("waiting for shared job, thread id: {:?}", self.index);
|
tracing::trace!("waiting for shared job, thread id: {:?}", self.index);
|
||||||
|
|
||||||
// TODO: wait on latch? if we have something that can
|
// TODO: wait on latch? if we have something that can
|
||||||
// signal being done, e.g. can be waited on instead of
|
// signal being done, e.g. can be waited on instead of
|
||||||
// shared jobs, we should wait on it instead, but we
|
// shared jobs, we should wait on it instead, but we
|
||||||
// would also want to receive shared jobs still?
|
// would also want to receive shared jobs still?
|
||||||
|
// Spin? probably just wastes CPU time.
|
||||||
// self.context.shared_job.wait(&mut guard);
|
// self.context.shared_job.wait(&mut guard);
|
||||||
// if spin.spin() {
|
// if spin.spin() {
|
||||||
// // wait for more shared jobs.
|
// // wait for more shared jobs.
|
||||||
// // self.context.shared_job.wait(&mut guard);
|
// // self.context.shared_job.wait(&mut guard);
|
||||||
// return;
|
// return;
|
||||||
// }
|
// }
|
||||||
std::thread::yield_now();
|
// Yield? same as spinning, really, so just exit and let the upstream use wait
|
||||||
|
// std::thread::yield_now();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1402,14 +1423,13 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'scope> Scope<'scope> {
|
impl<'scope> Scope<'scope> {
|
||||||
fn wait_for_jobs(&self) {
|
fn wait_for_jobs(&self, worker: &WorkerThread) {
|
||||||
let thread = WorkerThread::current_ref().unwrap();
|
|
||||||
tracing::trace!("waiting for {} jobs to finish.", self.job_counter.count());
|
tracing::trace!("waiting for {} jobs to finish.", self.job_counter.count());
|
||||||
tracing::trace!("thread id: {:?}, jobs: {:?}", thread.index, unsafe {
|
tracing::trace!("thread id: {:?}, jobs: {:?}", worker.index, unsafe {
|
||||||
thread.queue.as_ref_unchecked()
|
worker.queue.as_ref_unchecked()
|
||||||
});
|
});
|
||||||
|
|
||||||
thread.wait_until_latch(&self.job_counter);
|
worker.wait_until_latch(&self.job_counter);
|
||||||
unsafe { self.job_counter.wait() };
|
unsafe { self.job_counter.wait() };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1418,9 +1438,12 @@ impl<'scope> Scope<'scope> {
|
||||||
F: FnOnce(&Self) -> R + Send,
|
F: FnOnce(&Self) -> R + Send,
|
||||||
R: Send,
|
R: Send,
|
||||||
{
|
{
|
||||||
run_in_worker(|thread| {
|
run_in_worker(|worker| {
|
||||||
let this = Self::from_context(thread.context.clone());
|
// SAFETY: we call complete() after creating this scope, which
|
||||||
this.complete(|| f(&this))
|
// ensures that any jobs spawned from the scope exit before the
|
||||||
|
// scope closes.
|
||||||
|
let this = unsafe { Self::from_context(worker.context.clone()) };
|
||||||
|
this.complete(worker, || f(&this))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1429,14 +1452,17 @@ impl<'scope> Scope<'scope> {
|
||||||
F: FnOnce(&Self) -> R + Send,
|
F: FnOnce(&Self) -> R + Send,
|
||||||
R: Send,
|
R: Send,
|
||||||
{
|
{
|
||||||
context.run_in_worker(|_| {
|
context.run_in_worker(|worker| {
|
||||||
let this = Self::from_context(context.clone());
|
// SAFETY: we call complete() after creating this scope, which
|
||||||
this.complete(|| f(&this))
|
// ensures that any jobs spawned from the scope exit before the
|
||||||
|
// scope closes.
|
||||||
|
let this = unsafe { Self::from_context(context.clone()) };
|
||||||
|
this.complete(worker, || f(&this))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// should be called from within a worker thread.
|
/// should be called from within a worker thread.
|
||||||
fn complete<F, R>(&self, f: F) -> R
|
fn complete<F, R>(&self, worker: &WorkerThread, f: F) -> R
|
||||||
where
|
where
|
||||||
F: FnOnce() -> R + Send,
|
F: FnOnce() -> R + Send,
|
||||||
R: Send,
|
R: Send,
|
||||||
|
@ -1468,7 +1494,7 @@ impl<'scope> Scope<'scope> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.wait_for_jobs();
|
self.wait_for_jobs(worker);
|
||||||
self.maybe_propagate_panic();
|
self.maybe_propagate_panic();
|
||||||
|
|
||||||
// SAFETY: if result panicked, we would have propagated the panic above.
|
// SAFETY: if result panicked, we would have propagated the panic above.
|
||||||
|
@ -1528,8 +1554,8 @@ impl<'scope> Scope<'scope> {
|
||||||
|
|
||||||
tracing::trace!("allocated heapjob");
|
tracing::trace!("allocated heapjob");
|
||||||
|
|
||||||
worker.push_front(&job);
|
worker.push_front(job);
|
||||||
Box::leak(job);
|
|
||||||
tracing::trace!("leaked heapjob");
|
tracing::trace!("leaked heapjob");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1558,13 +1584,13 @@ impl<'scope> Scope<'scope> {
|
||||||
Runnable::<()>::from_raw(NonNull::new_unchecked(this.cast_mut()));
|
Runnable::<()>::from_raw(NonNull::new_unchecked(this.cast_mut()));
|
||||||
runnable.run();
|
runnable.run();
|
||||||
|
|
||||||
|
// SAFETY: job was turned into raw
|
||||||
drop(Box::from_raw(job.cast_mut()));
|
drop(Box::from_raw(job.cast_mut()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let job = Box::new(Job::<T>::new(harness::<T>, runnable.into_raw()));
|
let job = Box::new(Job::<T>::new(harness::<T>, runnable.into_raw()));
|
||||||
|
|
||||||
worker.push_front(job.as_ref());
|
worker.push_front(Box::into_raw(job));
|
||||||
mem::forget(job);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
|
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
|
||||||
|
@ -1596,29 +1622,22 @@ impl<'scope> Scope<'scope> {
|
||||||
A: FnOnce(&Self) -> RA + Send,
|
A: FnOnce(&Self) -> RA + Send,
|
||||||
B: FnOnce(&Self) -> RB + Send,
|
B: FnOnce(&Self) -> RB + Send,
|
||||||
{
|
{
|
||||||
#[inline(always)]
|
|
||||||
fn make_scope_closure<'scope, A, RA>(
|
|
||||||
this: SendPtr<Scope<'scope>>,
|
|
||||||
a: A,
|
|
||||||
) -> impl FnOnce() -> RA + use<'scope, RA, A>
|
|
||||||
where
|
|
||||||
A: FnOnce(&Scope<'scope>) -> RA + Send,
|
|
||||||
RA: Send,
|
|
||||||
{
|
|
||||||
let scope = unsafe { this.as_ref() };
|
|
||||||
move || a(scope)
|
|
||||||
}
|
|
||||||
|
|
||||||
let worker = WorkerThread::current_ref().expect("join is run in workerthread.");
|
let worker = WorkerThread::current_ref().expect("join is run in workerthread.");
|
||||||
let this = SendPtr::new_const(self).unwrap();
|
let this = SendPtr::new_const(self).unwrap();
|
||||||
|
|
||||||
worker.join_heartbeat_every::<_, _, _, _, 64>(
|
worker.join_heartbeat_every::<_, _, _, _, 64>(
|
||||||
make_scope_closure(this, a),
|
{
|
||||||
make_scope_closure(this, b),
|
let this = this;
|
||||||
|
move || a(unsafe { this.as_ref() })
|
||||||
|
},
|
||||||
|
{
|
||||||
|
let this = this;
|
||||||
|
move || b(unsafe { this.as_ref() })
|
||||||
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_context(ctx: Arc<Context>) -> Self {
|
unsafe fn from_context(ctx: Arc<Context>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
context: ctx,
|
context: ctx,
|
||||||
job_counter: JobCounter::default(),
|
job_counter: JobCounter::default(),
|
||||||
|
@ -1642,7 +1661,7 @@ where
|
||||||
|
|
||||||
/// run two closures potentially in parallel, in the global threadpool.
|
/// run two closures potentially in parallel, in the global threadpool.
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn join_in<A, B, RA, RB>(context: Arc<Context>, a: A, b: B) -> (RA, RB)
|
fn join_in<A, B, RA, RB>(context: Arc<Context>, a: A, b: B) -> (RA, RB)
|
||||||
where
|
where
|
||||||
RA: Send,
|
RA: Send,
|
||||||
RB: Send,
|
RB: Send,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
mem::MaybeUninit,
|
mem::{self, MaybeUninit},
|
||||||
pin::{pin, Pin},
|
pin::{pin, Pin},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -42,14 +42,14 @@ fn job_list_pop_back() {
|
||||||
let c = pin!(Job::empty());
|
let c = pin!(Job::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
list.push_front(&b);
|
list.push_front(&*b);
|
||||||
list.push_back(&c);
|
list.push_back(&*c);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(list.pop_back(), Some(pin_ptr(&c)));
|
assert_eq!(list.pop_back(), Some(pin_ptr(&c)));
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&c);
|
list.push_front(&*c);
|
||||||
}
|
}
|
||||||
assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
|
assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
|
||||||
assert_eq!(list.pop_back(), Some(pin_ptr(&b)));
|
assert_eq!(list.pop_back(), Some(pin_ptr(&b)));
|
||||||
|
@ -66,14 +66,14 @@ fn job_list_pop_front() {
|
||||||
let c = pin!(Job::<()>::empty());
|
let c = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
list.push_front(&b);
|
list.push_front(&*b);
|
||||||
list.push_back(&c);
|
list.push_back(&*c);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(list.pop_front(), Some(pin_ptr(&b)));
|
assert_eq!(list.pop_front(), Some(pin_ptr(&b)));
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_back(&b);
|
list.push_back(&*b);
|
||||||
}
|
}
|
||||||
assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
|
assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
|
||||||
assert_eq!(list.pop_front(), Some(pin_ptr(&c)));
|
assert_eq!(list.pop_front(), Some(pin_ptr(&c)));
|
||||||
|
@ -90,9 +90,9 @@ fn unlink_job_middle() {
|
||||||
let c = pin!(Job::<()>::empty());
|
let c = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
list.push_front(&b);
|
list.push_front(&*b);
|
||||||
list.push_front(&c);
|
list.push_front(&*c);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -113,9 +113,9 @@ fn unlink_job_front() {
|
||||||
let c = pin!(Job::<()>::empty());
|
let c = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
list.push_front(&b);
|
list.push_front(&*b);
|
||||||
list.push_front(&c);
|
list.push_front(&*c);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -136,9 +136,9 @@ fn unlink_job_back() {
|
||||||
let c = pin!(Job::<()>::empty());
|
let c = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
list.push_front(&b);
|
list.push_front(&*b);
|
||||||
list.push_front(&c);
|
list.push_front(&*c);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -157,7 +157,7 @@ fn unlink_job_single() {
|
||||||
let a = pin!(Job::<()>::empty());
|
let a = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -347,7 +347,7 @@ fn job_list_pop_back_emptied() {
|
||||||
let a = pin!(Job::<()>::empty());
|
let a = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
|
assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
|
||||||
|
@ -362,7 +362,7 @@ fn job_list_pop_front_emptied() {
|
||||||
let a = pin!(Job::<()>::empty());
|
let a = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
list.push_front(&a);
|
list.push_front(&*a);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
|
assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
|
||||||
|
@ -372,7 +372,6 @@ fn job_list_pop_front_emptied() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[tracing_test::traced_test]
|
|
||||||
fn spawn() {
|
fn spawn() {
|
||||||
let pool = ThreadPool::new();
|
let pool = ThreadPool::new();
|
||||||
|
|
||||||
|
@ -390,19 +389,6 @@ fn spawn() {
|
||||||
eprintln!("x: {x}");
|
eprintln!("x: {x}");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn rayon_spawn() {
|
|
||||||
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
|
|
||||||
let mut x = 0;
|
|
||||||
pool.scope(|s| {
|
|
||||||
s.spawn(|_| {
|
|
||||||
x += 1;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
eprintln!("x: {x}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn spawn_borrow() {
|
fn spawn_borrow() {
|
||||||
let pool = ThreadPool::new();
|
let pool = ThreadPool::new();
|
||||||
|
@ -449,13 +435,12 @@ fn join() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[traced_test]
|
|
||||||
fn join_many() {
|
fn join_many() {
|
||||||
use crate::util::tree::{Tree, TREE_SIZE};
|
use crate::util::tree::Tree;
|
||||||
|
|
||||||
let pool = ThreadPool::new();
|
let pool = ThreadPool::new();
|
||||||
|
|
||||||
let tree = Tree::new(16, 1u32);
|
let tree = Tree::new(4, 1u32);
|
||||||
|
|
||||||
fn sum(tree: &Tree<u32>, node: usize, scope: &Scope) -> u32 {
|
fn sum(tree: &Tree<u32>, node: usize, scope: &Scope) -> u32 {
|
||||||
let node = tree.get(node);
|
let node = tree.get(node);
|
||||||
|
|
Loading…
Reference in a new issue