Compare commits
10 commits
5881f8e26a
...
3458a900ee
Author | SHA1 | Date | |
---|---|---|---|
|
3458a900ee | ||
|
b069f0cc87 | ||
|
44acdd7873 | ||
|
60942daca5 | ||
|
bc57d221bc | ||
|
8f753108ec | ||
|
11514efd30 | ||
|
c25b62ee3e | ||
|
cd4c5467ba | ||
|
52fea2e306 |
0
.cargo/config.toml
Normal file
0
.cargo/config.toml
Normal file
|
@ -15,6 +15,9 @@ never-local = []
|
||||||
[profile.bench]
|
[profile.bench]
|
||||||
debug = true
|
debug = true
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
debug = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
@ -39,4 +42,5 @@ parking_lot_core = "0.9.10"
|
||||||
# derive_more = "1.0.0"
|
# derive_more = "1.0.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
async-std = "1.13.0"
|
||||||
tracing-test = "0.2.5"
|
tracing-test = "0.2.5"
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
debug_closure_helpers,
|
debug_closure_helpers,
|
||||||
cell_update,
|
cell_update,
|
||||||
cold_path,
|
cold_path,
|
||||||
|
fn_align,
|
||||||
let_chains
|
let_chains
|
||||||
)]
|
)]
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ mod util {
|
||||||
cell::UnsafeCell,
|
cell::UnsafeCell,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
mem::ManuallyDrop,
|
mem::ManuallyDrop,
|
||||||
num::NonZero,
|
ops::{Deref, DerefMut},
|
||||||
ptr::NonNull,
|
ptr::NonNull,
|
||||||
sync::atomic::{AtomicPtr, Ordering},
|
sync::atomic::{AtomicPtr, Ordering},
|
||||||
};
|
};
|
||||||
|
@ -30,6 +30,56 @@ mod util {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(transparent)]
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||||
|
pub struct SendPtr<T>(NonNull<T>);
|
||||||
|
|
||||||
|
impl<T> std::fmt::Pointer for SendPtr<T> {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
<NonNull<T> as core::fmt::Pointer>::fmt(&self.0, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<T> Send for SendPtr<T> {}
|
||||||
|
|
||||||
|
impl<T> Deref for SendPtr<T> {
|
||||||
|
type Target = NonNull<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for SendPtr<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> SendPtr<T> {
|
||||||
|
pub const fn new(ptr: *mut T) -> Option<Self> {
|
||||||
|
match NonNull::new(ptr) {
|
||||||
|
Some(ptr) => Some(Self(ptr)),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub const unsafe fn new_unchecked(ptr: *mut T) -> Self {
|
||||||
|
unsafe { Self(NonNull::new_unchecked(ptr)) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const fn new_const(ptr: *const T) -> Option<Self> {
|
||||||
|
Self::new(ptr.cast_mut())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const unsafe fn new_const_unchecked(ptr: *const T) -> Self {
|
||||||
|
Self::new_unchecked(ptr.cast_mut())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Miri doesn't like tagging pointers that it doesn't know the alignment of.
|
||||||
|
// This includes function pointers, which aren't guaranteed to be aligned to
|
||||||
|
// anything, but generally have an alignment of 8, and can be specified to
|
||||||
|
// be aligned to `n` with `#[repr(align(n))]`.
|
||||||
#[repr(transparent)]
|
#[repr(transparent)]
|
||||||
pub struct TaggedAtomicPtr<T, const BITS: usize>(AtomicPtr<()>, PhantomData<T>);
|
pub struct TaggedAtomicPtr<T, const BITS: usize>(AtomicPtr<()>, PhantomData<T>);
|
||||||
|
|
||||||
|
@ -49,8 +99,12 @@ mod util {
|
||||||
|
|
||||||
pub fn ptr(&self, order: Ordering) -> NonNull<T> {
|
pub fn ptr(&self, order: Ordering) -> NonNull<T> {
|
||||||
unsafe {
|
unsafe {
|
||||||
NonNull::new_unchecked(self.0.load(order) as _)
|
NonNull::new_unchecked(
|
||||||
.map_addr(|addr| NonZero::new_unchecked(addr.get() & !Self::mask()))
|
self.0
|
||||||
|
.load(order)
|
||||||
|
.map_addr(|addr| addr & !Self::mask())
|
||||||
|
.cast(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,8 +131,8 @@ mod util {
|
||||||
let mask = Self::mask();
|
let mask = Self::mask();
|
||||||
let old_ptr = self.0.load(failure);
|
let old_ptr = self.0.load(failure);
|
||||||
|
|
||||||
let old = old_ptr.with_addr((old_ptr.addr() & !mask) | (old & mask));
|
let old = old_ptr.map_addr(|addr| (addr & !mask) | (old & mask));
|
||||||
let new = old_ptr.with_addr((old_ptr.addr() & !mask) | (new & mask));
|
let new = old_ptr.map_addr(|addr| (addr & !mask) | (new & mask));
|
||||||
|
|
||||||
let result = cmpxchg(&self.0, old, new, success, failure);
|
let result = cmpxchg(&self.0, old, new, success, failure);
|
||||||
|
|
||||||
|
@ -130,7 +184,7 @@ mod util {
|
||||||
let ptr = ptr.cast::<()>();
|
let ptr = ptr.cast::<()>();
|
||||||
loop {
|
loop {
|
||||||
let old = self.0.load(failure);
|
let old = self.0.load(failure);
|
||||||
let new = ptr.with_addr((ptr.addr() & !mask) | (old.addr() & mask));
|
let new = ptr.map_addr(|addr| (addr & !mask) | (old.addr() & mask));
|
||||||
if self
|
if self
|
||||||
.0
|
.0
|
||||||
.compare_exchange_weak(old, new, success, failure)
|
.compare_exchange_weak(old, new, success, failure)
|
||||||
|
@ -145,7 +199,8 @@ mod util {
|
||||||
let mask = Self::mask();
|
let mask = Self::mask();
|
||||||
loop {
|
loop {
|
||||||
let ptr = self.0.load(failure);
|
let ptr = self.0.load(failure);
|
||||||
let new = ptr.with_addr((ptr.addr() & !mask) | (tag & mask));
|
let new = ptr.map_addr(|addr| (addr & !mask) | (tag & mask));
|
||||||
|
|
||||||
if self
|
if self
|
||||||
.0
|
.0
|
||||||
.compare_exchange_weak(ptr, new, success, failure)
|
.compare_exchange_weak(ptr, new, success, failure)
|
||||||
|
@ -160,8 +215,8 @@ mod util {
|
||||||
let mask = Self::mask();
|
let mask = Self::mask();
|
||||||
let ptr = self.0.load(order);
|
let ptr = self.0.load(order);
|
||||||
let tag = ptr.addr() & mask;
|
let tag = ptr.addr() & mask;
|
||||||
let addr = ptr.addr() & !mask;
|
let ptr = ptr.map_addr(|addr| addr & !mask);
|
||||||
let ptr = unsafe { NonNull::new_unchecked(ptr.with_addr(addr).cast()) };
|
let ptr = unsafe { NonNull::new_unchecked(ptr.cast()) };
|
||||||
(ptr, tag)
|
(ptr, tag)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -188,6 +243,7 @@ mod job {
|
||||||
use super::util::TaggedAtomicPtr;
|
use super::util::TaggedAtomicPtr;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
#[repr(transparent)]
|
||||||
pub struct SmallBox<T>(pub MaybeUninit<Box<T>>);
|
pub struct SmallBox<T>(pub MaybeUninit<Box<T>>);
|
||||||
|
|
||||||
impl<T: Display> Display for SmallBox<T> {
|
impl<T: Display> Display for SmallBox<T> {
|
||||||
|
@ -557,10 +613,7 @@ mod job {
|
||||||
unsafe impl<T> Send for Job<T> {}
|
unsafe impl<T> Send for Job<T> {}
|
||||||
|
|
||||||
impl<T> Job<T> {
|
impl<T> Job<T> {
|
||||||
pub fn new(
|
pub fn new(harness: unsafe fn(*const (), *const Job<T>), this: NonNull<()>) -> Job<T> {
|
||||||
harness: unsafe fn(*const (), *const Job<T>, &super::Scope),
|
|
||||||
this: NonNull<()>,
|
|
||||||
) -> Job<T> {
|
|
||||||
Self {
|
Self {
|
||||||
harness_and_state: TaggedAtomicPtr::new(
|
harness_and_state: TaggedAtomicPtr::new(
|
||||||
unsafe { mem::transmute(harness) },
|
unsafe { mem::transmute(harness) },
|
||||||
|
@ -702,17 +755,18 @@ mod job {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn execute(&self, scope: &super::Scope) {
|
pub fn execute(job: NonNull<Self>) {
|
||||||
// SAFETY: self is non-null
|
// SAFETY: self is non-null
|
||||||
unsafe {
|
unsafe {
|
||||||
let (ptr, state) = self.harness_and_state.ptr_and_tag(Ordering::Relaxed);
|
let this = job.as_ref();
|
||||||
|
let (ptr, state) = this.harness_and_state.ptr_and_tag(Ordering::Relaxed);
|
||||||
|
|
||||||
debug_assert_eq!(state, JobState::Pending as usize);
|
debug_assert_eq!(state, JobState::Pending as usize);
|
||||||
|
let harness: unsafe fn(*const (), *const Self) = mem::transmute(ptr.as_ptr());
|
||||||
|
|
||||||
let harness: unsafe fn(*const (), *const Self, scope: &super::Scope) =
|
let this = (*this.val_or_this.get()).this;
|
||||||
mem::transmute(ptr.as_ptr());
|
|
||||||
let this = (*self.val_or_this.get()).this;
|
|
||||||
|
|
||||||
harness(this.as_ptr().cast(), (self as *const Self).cast(), scope);
|
harness(this.as_ptr().cast(), job.as_ptr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -764,34 +818,41 @@ mod job {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct HeapJob<F> {
|
pub struct HeapJob<F> {
|
||||||
f: F,
|
f: F,
|
||||||
|
_phantom: PhantomPinned,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> HeapJob<F> {
|
impl<F> HeapJob<F> {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn new(f: F) -> Box<Self> {
|
pub fn new(f: F) -> Box<Self> {
|
||||||
Box::new(Self { f })
|
Box::new(Self {
|
||||||
|
f,
|
||||||
|
_phantom: PhantomPinned,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
pub fn into_inner(self) -> F {
|
||||||
|
self.f
|
||||||
}
|
}
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn into_boxed_job<T>(self: Box<Self>) -> Box<Job<()>>
|
pub fn into_boxed_job<T>(self: Box<Self>) -> Pin<Box<Job<()>>>
|
||||||
where
|
where
|
||||||
F: FnOnce(&super::Scope) -> T + Send,
|
F: FnOnce() -> T + Send,
|
||||||
T: Send,
|
T: Send,
|
||||||
{
|
{
|
||||||
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>, scope: &super::Scope)
|
#[repr(align(8))]
|
||||||
|
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
|
||||||
where
|
where
|
||||||
F: FnOnce(&super::Scope) -> T + Send,
|
F: FnOnce() -> T + Send,
|
||||||
T: Sized + Send,
|
T: Sized + Send,
|
||||||
{
|
{
|
||||||
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
|
let this = unsafe { Box::from_raw(this.cast::<HeapJob<F>>().cast_mut()) };
|
||||||
let f = this.f;
|
let f = this.f;
|
||||||
|
|
||||||
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(scope)));
|
_ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
|
||||||
|
|
||||||
let job = unsafe { &*job.cast::<Job<T>>() };
|
_ = unsafe { Box::from_raw(job.cast_mut()) };
|
||||||
job.complete(result);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Box::new(Job::new(harness::<F, T>, unsafe {
|
Box::pin(Job::new(harness::<F, T>, unsafe {
|
||||||
NonNull::new_unchecked(Box::into_raw(self)).cast()
|
NonNull::new_unchecked(Box::into_raw(self)).cast()
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -816,18 +877,19 @@ mod job {
|
||||||
|
|
||||||
pub fn as_job<T>(self: Pin<&Self>) -> Job<()>
|
pub fn as_job<T>(self: Pin<&Self>) -> Job<()>
|
||||||
where
|
where
|
||||||
F: FnOnce(&super::Scope) -> T + Send,
|
F: FnOnce() -> T + Send,
|
||||||
T: Send,
|
T: Send,
|
||||||
{
|
{
|
||||||
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>, scope: &super::Scope)
|
#[repr(align(8))]
|
||||||
|
unsafe fn harness<F, T>(this: *const (), job: *const Job<()>)
|
||||||
where
|
where
|
||||||
F: FnOnce(&super::Scope) -> T + Send,
|
F: FnOnce() -> T + Send,
|
||||||
T: Sized + Send,
|
T: Sized + Send,
|
||||||
{
|
{
|
||||||
let this = unsafe { &*this.cast::<StackJob<F>>() };
|
let this = unsafe { &*this.cast::<StackJob<F>>() };
|
||||||
let f = unsafe { this.unwrap() };
|
let f = unsafe { this.unwrap() };
|
||||||
|
|
||||||
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(scope)));
|
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f()));
|
||||||
|
|
||||||
let job_ref = unsafe { &*job.cast::<Job<T>>() };
|
let job_ref = unsafe { &*job.cast::<Job<T>>() };
|
||||||
job_ref.complete(result);
|
job_ref.complete(result);
|
||||||
|
@ -843,34 +905,77 @@ mod job {
|
||||||
use std::{
|
use std::{
|
||||||
cell::{Cell, UnsafeCell},
|
cell::{Cell, UnsafeCell},
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
mem,
|
future::Future,
|
||||||
|
marker::PhantomData,
|
||||||
|
mem::{self, MaybeUninit},
|
||||||
pin::{pin, Pin},
|
pin::{pin, Pin},
|
||||||
ptr::NonNull,
|
ptr::NonNull,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||||
Arc, OnceLock, Weak,
|
Arc, OnceLock, Weak,
|
||||||
},
|
},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use async_task::Runnable;
|
||||||
use crossbeam::utils::CachePadded;
|
use crossbeam::utils::CachePadded;
|
||||||
use job::*;
|
use job::*;
|
||||||
use parking_lot::{Condvar, Mutex};
|
use parking_lot::{Condvar, Mutex};
|
||||||
use util::DropGuard;
|
use util::{DropGuard, SendPtr};
|
||||||
|
|
||||||
pub struct Scope {
|
#[derive(Debug, Default)]
|
||||||
join_count: Cell<usize>,
|
pub struct JobCounter {
|
||||||
|
jobs_pending: AtomicUsize,
|
||||||
|
waker: Mutex<Option<std::thread::Thread>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobCounter {
|
||||||
|
pub fn increment(&self) {
|
||||||
|
self.jobs_pending.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decrement(&self) {
|
||||||
|
if self.jobs_pending.fetch_sub(1, Ordering::SeqCst) == 1 {
|
||||||
|
if let Some(thread) = self.waker.lock().take() {
|
||||||
|
thread.unpark();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// must only be called once
|
||||||
|
pub unsafe fn wait(&self) {
|
||||||
|
_ = self.waker.lock().insert(std::thread::current());
|
||||||
|
|
||||||
|
let count = self.jobs_pending.load(Ordering::SeqCst);
|
||||||
|
if count > 0 {
|
||||||
|
std::thread::park();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct WorkerThread {
|
||||||
context: Arc<Context>,
|
context: Arc<Context>,
|
||||||
index: usize,
|
index: usize,
|
||||||
heartbeat: Arc<CachePadded<AtomicBool>>,
|
heartbeat: Arc<CachePadded<AtomicBool>>,
|
||||||
queue: UnsafeCell<JobList>,
|
queue: UnsafeCell<JobList>,
|
||||||
}
|
}
|
||||||
|
|
||||||
thread_local! {
|
pub struct Scope<'scope> {
|
||||||
static SCOPE: UnsafeCell<Option<NonNull<Scope>>> = const { UnsafeCell::new(None) };
|
join_count: Cell<usize>,
|
||||||
|
context: Arc<Context>,
|
||||||
|
index: usize,
|
||||||
|
heartbeat: Arc<CachePadded<AtomicBool>>,
|
||||||
|
queue: UnsafeCell<JobList>,
|
||||||
|
|
||||||
|
job_counter: JobCounter,
|
||||||
|
_pd: PhantomData<&'scope ()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Scope {
|
thread_local! {
|
||||||
|
static SCOPE: UnsafeCell<Option<NonNull<Scope<'static>>>> = const { UnsafeCell::new(None) };
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'scope> Scope<'scope> {
|
||||||
/// locks shared context
|
/// locks shared context
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
|
@ -888,20 +993,33 @@ impl Scope {
|
||||||
heartbeat,
|
heartbeat,
|
||||||
join_count: Cell::new(0),
|
join_count: Cell::new(0),
|
||||||
queue: UnsafeCell::new(JobList::new()),
|
queue: UnsafeCell::new(JobList::new()),
|
||||||
|
job_counter: JobCounter::default(),
|
||||||
|
_pd: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn drop_in_place_and_dealloc(this: NonNull<Scope>) {
|
||||||
|
unsafe {
|
||||||
|
let ptr = this.as_ptr();
|
||||||
|
ptr.drop_in_place();
|
||||||
|
|
||||||
|
_ = Box::<MaybeUninit<Self>>::from_raw(ptr.cast());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn with_in<T, F: FnOnce(&Scope) -> T>(ctx: &Arc<Context>, f: F) -> T {
|
fn with_in<T, F: FnOnce(&Scope) -> T>(ctx: &Arc<Context>, f: F) -> T {
|
||||||
let mut guard = Option::<DropGuard<Box<dyn FnOnce()>>>::None;
|
let mut _guard = Option::<DropGuard<Box<dyn FnOnce()>>>::None;
|
||||||
|
|
||||||
let scope = match Self::current_ref() {
|
let scope = match Self::current_ref() {
|
||||||
Some(scope) if Arc::ptr_eq(&scope.context, ctx) => scope,
|
Some(scope) if Arc::ptr_eq(&scope.context, ctx) => scope,
|
||||||
Some(_) => {
|
Some(_) => {
|
||||||
let old = unsafe { Self::unset_current().unwrap().as_ptr() };
|
let old = unsafe { Self::unset_current().unwrap().as_ptr() };
|
||||||
guard = Some(DropGuard::new(Box::new(move || unsafe {
|
_guard = Some(DropGuard::new(Box::new(move || unsafe {
|
||||||
_ = Box::from_raw(Self::unset_current().unwrap().as_ptr());
|
Self::drop_in_place_and_dealloc(Self::unset_current().unwrap());
|
||||||
|
|
||||||
Self::set_current(old.cast_const());
|
Self::set_current(old.cast_const());
|
||||||
})));
|
})));
|
||||||
|
|
||||||
let current = Box::into_raw(Box::new(Self::new_in(ctx.clone())));
|
let current = Box::into_raw(Box::new(Self::new_in(ctx.clone())));
|
||||||
unsafe {
|
unsafe {
|
||||||
Self::set_current(current.cast_const());
|
Self::set_current(current.cast_const());
|
||||||
|
@ -911,8 +1029,8 @@ impl Scope {
|
||||||
None => {
|
None => {
|
||||||
let current = Box::into_raw(Box::new(Self::new_in(ctx.clone())));
|
let current = Box::into_raw(Box::new(Self::new_in(ctx.clone())));
|
||||||
|
|
||||||
guard = Some(DropGuard::new(Box::new(|| unsafe {
|
_guard = Some(DropGuard::new(Box::new(|| unsafe {
|
||||||
_ = Box::from_raw(Self::unset_current().unwrap().as_ptr());
|
Self::drop_in_place_and_dealloc(Self::unset_current().unwrap());
|
||||||
})));
|
})));
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -924,7 +1042,6 @@ impl Scope {
|
||||||
};
|
};
|
||||||
|
|
||||||
let t = f(scope);
|
let t = f(scope);
|
||||||
drop(guard);
|
|
||||||
t
|
t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -932,22 +1049,22 @@ impl Scope {
|
||||||
Self::with_in(Context::global(), f)
|
Self::with_in(Context::global(), f)
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn set_current(scope: *const Scope) {
|
unsafe fn set_current(scope: *const Scope<'static>) {
|
||||||
SCOPE.with(|ptr| unsafe {
|
SCOPE.with(|ptr| unsafe {
|
||||||
_ = (&mut *ptr.get()).insert(NonNull::new_unchecked(scope.cast_mut()));
|
_ = (&mut *ptr.get()).insert(NonNull::new_unchecked(scope.cast_mut()));
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe fn unset_current() -> Option<NonNull<Scope>> {
|
unsafe fn unset_current() -> Option<NonNull<Scope<'static>>> {
|
||||||
SCOPE.with(|ptr| unsafe { (&mut *ptr.get()).take() })
|
SCOPE.with(|ptr| unsafe { (&mut *ptr.get()).take() })
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn current() -> Option<NonNull<Scope>> {
|
fn current() -> Option<NonNull<Scope<'scope>>> {
|
||||||
SCOPE.with(|ptr| unsafe { *ptr.get() })
|
SCOPE.with(|ptr| unsafe { *ptr.get() })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current_ref<'a>() -> Option<&'a Scope> {
|
fn current_ref<'a>() -> Option<&'a Scope<'scope>> {
|
||||||
SCOPE.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) })
|
SCOPE.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -970,6 +1087,91 @@ impl Scope {
|
||||||
unsafe { self.queue.as_mut_unchecked().pop_front() }
|
unsafe { self.queue.as_mut_unchecked().pop_front() }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn complete_jobs(&self) {
|
||||||
|
while let Some(job) = self.pop_front() {
|
||||||
|
unsafe {
|
||||||
|
job.as_ref().set_pending();
|
||||||
|
}
|
||||||
|
self.execute(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
self.job_counter.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn spawn<F>(&self, f: F)
|
||||||
|
where
|
||||||
|
F: FnOnce(&Scope<'scope>) + Send + 'scope,
|
||||||
|
{
|
||||||
|
self.job_counter.increment();
|
||||||
|
|
||||||
|
let this = SendPtr::new_const(self).unwrap();
|
||||||
|
|
||||||
|
let job = HeapJob::new(move || unsafe {
|
||||||
|
f(this.as_ref());
|
||||||
|
this.as_ref().job_counter.decrement();
|
||||||
|
})
|
||||||
|
.into_boxed_job();
|
||||||
|
|
||||||
|
self.push_front(Pin::as_ref(&job));
|
||||||
|
mem::forget(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn spawn_future<T, F>(&self, future: F) -> async_task::Task<T>
|
||||||
|
where
|
||||||
|
F: Future<Output = T> + Send + 'scope,
|
||||||
|
T: Send + 'scope,
|
||||||
|
{
|
||||||
|
self.job_counter.increment();
|
||||||
|
|
||||||
|
let this = SendPtr::new_const(&self.job_counter).unwrap();
|
||||||
|
|
||||||
|
let future = async move {
|
||||||
|
let _guard = DropGuard::new(move || unsafe {
|
||||||
|
this.as_ref().decrement();
|
||||||
|
});
|
||||||
|
future.await
|
||||||
|
};
|
||||||
|
|
||||||
|
let this = SendPtr::new_const(self).unwrap();
|
||||||
|
let schedule = move |runnable: Runnable| {
|
||||||
|
#[repr(align(8))]
|
||||||
|
unsafe fn harness<T>(this: *const (), job: *const Job<T>) {
|
||||||
|
let runnable = Runnable::<()>::from_raw(NonNull::new_unchecked(this.cast_mut()));
|
||||||
|
runnable.run();
|
||||||
|
|
||||||
|
drop(Box::from_raw(job.cast_mut()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let job = Box::pin(Job::<T>::new(harness::<T>, runnable.into_raw()));
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
this.as_ref().push_front(job.as_ref());
|
||||||
|
}
|
||||||
|
mem::forget(job);
|
||||||
|
};
|
||||||
|
|
||||||
|
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
|
||||||
|
|
||||||
|
runnable.schedule();
|
||||||
|
|
||||||
|
task
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn spawn_async<'a, T, Fut, Fn>(&'a self, f: Fn) -> async_task::Task<T>
|
||||||
|
where
|
||||||
|
Fn: FnOnce(&Scope) -> Fut + Send + 'static,
|
||||||
|
Fut: Future<Output = T> + Send + 'static,
|
||||||
|
T: Send + 'static,
|
||||||
|
{
|
||||||
|
let this = SendPtr::new_const(self).unwrap();
|
||||||
|
let future = async move { f(unsafe { this.as_ref() }).await };
|
||||||
|
|
||||||
|
self.spawn_future(future)
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
|
pub fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
|
||||||
where
|
where
|
||||||
|
@ -1020,7 +1222,9 @@ impl Scope {
|
||||||
A: FnOnce(&Self) -> RA + Send,
|
A: FnOnce(&Self) -> RA + Send,
|
||||||
B: FnOnce(&Self) -> RB + Send,
|
B: FnOnce(&Self) -> RB + Send,
|
||||||
{
|
{
|
||||||
let a = pin!(StackJob::new(move |scope: &Scope| {
|
let this = SendPtr::new_const(self).unwrap();
|
||||||
|
let a = pin!(StackJob::new(move || unsafe {
|
||||||
|
let scope = this.as_ref();
|
||||||
scope.tick();
|
scope.tick();
|
||||||
|
|
||||||
a(scope)
|
a(scope)
|
||||||
|
@ -1036,14 +1240,14 @@ impl Scope {
|
||||||
job.unlink();
|
job.unlink();
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe { a.unwrap()(self) }
|
unsafe { a.unwrap()() }
|
||||||
} else {
|
} else {
|
||||||
match self.wait_until::<RA>(unsafe {
|
match self.wait_until::<RA>(unsafe {
|
||||||
mem::transmute::<Pin<&Job<()>>, Pin<&Job<RA>>>(job.as_ref())
|
mem::transmute::<Pin<&Job<()>>, Pin<&Job<RA>>>(job.as_ref())
|
||||||
}) {
|
}) {
|
||||||
Some(Ok(t)) => t,
|
Some(Ok(t)) => t,
|
||||||
Some(Err(payload)) => std::panic::resume_unwind(payload),
|
Some(Err(payload)) => std::panic::resume_unwind(payload),
|
||||||
None => unsafe { a.unwrap()(self) },
|
None => unsafe { a.unwrap()() },
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1059,9 +1263,9 @@ impl Scope {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn execute(&self, job: &Job) {
|
fn execute(&self, job: NonNull<Job>) {
|
||||||
self.tick();
|
self.tick();
|
||||||
job.execute(self);
|
Job::execute(job);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cold]
|
#[cold]
|
||||||
|
@ -1088,9 +1292,7 @@ impl Scope {
|
||||||
if ptr.as_ptr() == &*job as *const _ as *mut _ {
|
if ptr.as_ptr() == &*job as *const _ as *mut _ {
|
||||||
return None;
|
return None;
|
||||||
} else {
|
} else {
|
||||||
unsafe {
|
self.execute(ptr);
|
||||||
self.execute(ptr.as_ref());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1111,9 +1313,7 @@ impl Scope {
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
unsafe {
|
self.execute(job);
|
||||||
self.execute(job.as_ref());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// while job isn't done, run other jobs.
|
// while job isn't done, run other jobs.
|
||||||
Some(job.wait())
|
Some(job.wait())
|
||||||
|
@ -1232,8 +1432,10 @@ fn worker(ctx: Arc<Context>, barrier: Arc<std::sync::Barrier>) {
|
||||||
unsafe {
|
unsafe {
|
||||||
Scope::set_current(Box::into_raw(Box::new(Scope::new_in(ctx.clone()))).cast_const());
|
Scope::set_current(Box::into_raw(Box::new(Scope::new_in(ctx.clone()))).cast_const());
|
||||||
}
|
}
|
||||||
let _guard =
|
|
||||||
DropGuard::new(|| unsafe { drop(Box::from_raw(Scope::unset_current().unwrap().as_ptr())) });
|
let _guard = DropGuard::new(|| unsafe {
|
||||||
|
Scope::drop_in_place_and_dealloc(Scope::unset_current().unwrap());
|
||||||
|
});
|
||||||
|
|
||||||
let scope = Scope::current_ref().unwrap();
|
let scope = Scope::current_ref().unwrap();
|
||||||
|
|
||||||
|
@ -1242,9 +1444,7 @@ fn worker(ctx: Arc<Context>, barrier: Arc<std::sync::Barrier>) {
|
||||||
let mut job = ctx.shared.lock().jobs.pop_first();
|
let mut job = ctx.shared.lock().jobs.pop_first();
|
||||||
loop {
|
loop {
|
||||||
if let Some((_, job)) = job {
|
if let Some((_, job)) = job {
|
||||||
unsafe {
|
scope.execute(job);
|
||||||
scope.execute(job.as_ref());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut guard = ctx.shared.lock();
|
let mut guard = ctx.shared.lock();
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use std::pin::Pin;
|
use std::{mem::MaybeUninit, pin::Pin};
|
||||||
|
|
||||||
use super::{util::TaggedAtomicPtr, *};
|
use super::{util::TaggedAtomicPtr, *};
|
||||||
|
|
||||||
|
@ -315,3 +315,134 @@ fn value_boxed_drop() {
|
||||||
}
|
}
|
||||||
assert_eq!(dropped, 2);
|
assert_eq!(dropped, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn job_list_pop_front_empty() {
|
||||||
|
let mut list = JobList::new();
|
||||||
|
|
||||||
|
assert_eq!(list.pop_front(), None);
|
||||||
|
assert_eq!(list.pop_front(), None);
|
||||||
|
assert_eq!(list.pop_front(), None);
|
||||||
|
assert_eq!(list.pop_front(), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn job_list_pop_back_empty() {
|
||||||
|
let mut list = JobList::new();
|
||||||
|
|
||||||
|
assert_eq!(list.pop_back(), None);
|
||||||
|
assert_eq!(list.pop_back(), None);
|
||||||
|
assert_eq!(list.pop_back(), None);
|
||||||
|
assert_eq!(list.pop_back(), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn job_list_pop_back_emptied() {
|
||||||
|
let mut list = JobList::new();
|
||||||
|
let a = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
list.push_front(a.as_ref());
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
|
||||||
|
assert_eq!(list.pop_back(), None);
|
||||||
|
assert_eq!(list.pop_back(), None);
|
||||||
|
assert_eq!(list.pop_back(), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn job_list_pop_front_emptied() {
|
||||||
|
let mut list = JobList::new();
|
||||||
|
let a = pin!(Job::<()>::empty());
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
list.push_front(a.as_ref());
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
|
||||||
|
assert_eq!(list.pop_front(), None);
|
||||||
|
assert_eq!(list.pop_front(), None);
|
||||||
|
assert_eq!(list.pop_front(), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn spawn() {
|
||||||
|
let pool = ThreadPool::new();
|
||||||
|
|
||||||
|
let mut x = 0;
|
||||||
|
pool.scope(|s| {
|
||||||
|
s.spawn(|_| {
|
||||||
|
x += 1;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
eprintln!("x: {x}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn spawn_borrow() {
|
||||||
|
let pool = ThreadPool::new();
|
||||||
|
|
||||||
|
pool.scope(|s| {
|
||||||
|
let mut x = 0;
|
||||||
|
s.spawn(|_| {
|
||||||
|
x += 1;
|
||||||
|
assert_eq!(x, 1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn spawn_future() {
|
||||||
|
let pool = ThreadPool::new();
|
||||||
|
|
||||||
|
let task = pool.scope(|s| {
|
||||||
|
let task = s.spawn_future(async {
|
||||||
|
eprintln!("executing future");
|
||||||
|
3 + 1
|
||||||
|
});
|
||||||
|
|
||||||
|
// let task = s.spawn_future(async {
|
||||||
|
// eprintln!("task: {}", task.await);
|
||||||
|
// });
|
||||||
|
task
|
||||||
|
});
|
||||||
|
let x = async_std::task::block_on(task);
|
||||||
|
eprintln!("x: {x}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn join() {
|
||||||
|
let pool = ThreadPool::new();
|
||||||
|
|
||||||
|
let x = pool.scope(|s| {
|
||||||
|
let (a, b) = s.join(|_| 3, |_| 4);
|
||||||
|
|
||||||
|
a + b
|
||||||
|
});
|
||||||
|
|
||||||
|
eprintln!("x: {x}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rebox() {
|
||||||
|
struct A(u32);
|
||||||
|
|
||||||
|
impl Drop for A {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
eprintln!("drop");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let ptr = Box::into_raw(Box::new(A(5)));
|
||||||
|
|
||||||
|
let x = unsafe { &*ptr.offset(mem::offset_of!(A, 0) as isize).cast::<u32>() };
|
||||||
|
let y = *x;
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
ptr.drop_in_place();
|
||||||
|
_ = Box::<MaybeUninit<A>>::from_raw(ptr.cast());
|
||||||
|
}
|
||||||
|
assert_eq!(y, 5);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue