This commit is contained in:
Janis 2025-02-20 15:54:05 +01:00
parent 5cd7f43f45
commit a4992e7dc7
2 changed files with 112 additions and 85 deletions

View file

@ -190,6 +190,7 @@ mod job {
any::Any, any::Any,
cell::{Cell, UnsafeCell}, cell::{Cell, UnsafeCell},
fmt::Debug, fmt::Debug,
marker::PhantomPinned,
mem::{self, ManuallyDrop, MaybeUninit}, mem::{self, ManuallyDrop, MaybeUninit},
pin::Pin, pin::Pin,
ptr::{self, NonNull}, ptr::{self, NonNull},
@ -284,6 +285,7 @@ mod job {
} }
} }
#[derive(Debug)]
pub struct JobList { pub struct JobList {
head: Pin<Box<Job>>, head: Pin<Box<Job>>,
tail: Pin<Box<Job>>, tail: Pin<Box<Job>>,
@ -291,50 +293,68 @@ mod job {
impl JobList { impl JobList {
pub fn new() -> JobList { pub fn new() -> JobList {
let mut head = Box::pin(Job::empty()); let head = Box::pin(Job::empty());
let mut tail = Box::pin(Job::empty()); let tail = Box::pin(Job::empty());
// head and tail point at themselves // head and tail point at themselves
unsafe { unsafe {
(&mut *head.err_or_link.get()).link.next = NonNull::new_unchecked(&mut *head); (&mut *head.err_or_link.get()).link.next =
(&mut *head.err_or_link.get()).link.prev = NonNull::new_unchecked(&mut *tail); NonNull::new_unchecked((&raw const *head).cast_mut());
(&mut *head.err_or_link.get()).link.prev =
NonNull::new_unchecked((&raw const *tail).cast_mut());
(&mut *tail.err_or_link.get()).link.next = NonNull::new_unchecked(&mut *head); (&mut *tail.err_or_link.get()).link.next =
(&mut *tail.err_or_link.get()).link.prev = NonNull::new_unchecked(&mut *tail); NonNull::new_unchecked((&raw const *head).cast_mut());
(&mut *tail.err_or_link.get()).link.prev =
NonNull::new_unchecked((&raw const *tail).cast_mut());
} }
Self { head, tail } Self { head, tail }
} }
fn head_ptr(&self) -> *const Job {
&raw const *self.head
}
fn tail_ptr(&self) -> *const Job {
&raw const *self.tail
}
fn head(&self) -> NonNull<Job> {
unsafe { NonNull::new_unchecked(self.head_ptr().cast_mut()) }
}
fn tail(&self) -> NonNull<Job> {
unsafe { NonNull::new_unchecked(self.tail_ptr().cast_mut()) }
}
/// elem must be valid until it is popped. /// elem must be valid until it is popped.
pub unsafe fn push_front<T>(&mut self, elem: &Job<T>) { pub unsafe fn push_front<T>(&mut self, elem: Pin<&Job<T>>) {
let head_link = unsafe { self.head.link_mut() }; let head_link = unsafe { self.head.link_mut() };
let prev = head_link.prev; let prev = head_link.prev;
let prev_link = unsafe { prev.as_ref().link_mut() }; let prev_link = unsafe { prev.as_ref().link_mut() };
let elem_ptr = unsafe { NonNull::new_unchecked(elem as *const Job<T> as *mut Job) }; let elem_ptr = unsafe { NonNull::new_unchecked(&*elem as *const Job<T> as *mut Job) };
head_link.prev = elem_ptr; head_link.prev = elem_ptr;
prev_link.next = elem_ptr; prev_link.next = elem_ptr;
let elem_link = unsafe { elem.link_mut() }; let elem_link = unsafe { elem.link_mut() };
elem_link.prev = prev; elem_link.prev = prev;
elem_link.next = unsafe { NonNull::new_unchecked(&mut *self.head) }; elem_link.next = self.head();
} }
/// elem must be valid until it is popped. /// elem must be valid until it is popped.
pub unsafe fn push_back<T>(&mut self, elem: &Job<T>) { pub unsafe fn push_back<T>(&mut self, elem: Pin<&Job<T>>) {
let tail_link = unsafe { self.tail.link_mut() }; let tail_link = unsafe { self.tail.link_mut() };
let next = tail_link.next; let next = tail_link.next;
let next_link = unsafe { next.as_ref().link_mut() }; let next_link = unsafe { next.as_ref().link_mut() };
let elem_ptr = unsafe { NonNull::new_unchecked(elem as *const Job<T> as *mut Job) }; let elem_ptr = unsafe { NonNull::new_unchecked(&*elem as *const Job<T> as *mut Job) };
tail_link.next = elem_ptr; tail_link.next = elem_ptr;
next_link.prev = elem_ptr; next_link.prev = elem_ptr;
let elem_link = unsafe { elem.link_mut() }; let elem_link = unsafe { elem.link_mut() };
elem_link.next = next; elem_link.next = next;
elem_link.prev = unsafe { NonNull::new_unchecked(&mut *self.tail) }; elem_link.prev = self.tail();
} }
pub fn pop_front(&mut self) -> Option<NonNull<Job>> { pub fn pop_front(&mut self) -> Option<NonNull<Job>> {
@ -348,9 +368,9 @@ mod job {
head_link.prev = unsafe { NonNull::new_unchecked(prev) }; head_link.prev = unsafe { NonNull::new_unchecked(prev) };
let prev_link = unsafe { (&*prev).link_mut() }; let prev_link = unsafe { (&*prev).link_mut() };
prev_link.next = unsafe { NonNull::new_unchecked(&mut *self.head) }; prev_link.next = self.head();
if elem.as_ptr() == ptr::from_ref(&*self.tail).cast_mut() { if elem == self.tail() {
None None
} else { } else {
Some(elem) Some(elem)
@ -369,9 +389,9 @@ mod job {
tail_link.next = unsafe { NonNull::new_unchecked(next) }; tail_link.next = unsafe { NonNull::new_unchecked(next) };
let next_link = unsafe { (&*next).link_mut() }; let next_link = unsafe { (&*next).link_mut() };
next_link.prev = unsafe { NonNull::new_unchecked(&mut *self.tail) }; next_link.prev = self.tail();
if elem.as_ptr() == ptr::from_ref(&*self.head).cast_mut() { if elem == self.head() {
None None
} else { } else {
Some(elem) Some(elem)
@ -416,6 +436,7 @@ mod job {
val_or_this: UnsafeCell<ValueOrThis<T>>, val_or_this: UnsafeCell<ValueOrThis<T>>,
/// (prev,next) before execute(), Box<...> after /// (prev,next) before execute(), Box<...> after
err_or_link: UnsafeCell<LinkOrError<Job>>, err_or_link: UnsafeCell<LinkOrError<Job>>,
phantom: PhantomPinned,
} }
impl<T> Debug for Job<T> { impl<T> Debug for Job<T> {
@ -483,6 +504,7 @@ mod job {
next: NonNull::dangling(), next: NonNull::dangling(),
}, },
}), }),
phantom: PhantomPinned,
} }
} }
pub fn empty() -> Job<T> { pub fn empty() -> Job<T> {
@ -497,6 +519,7 @@ mod job {
next: NonNull::dangling(), next: NonNull::dangling(),
}, },
}), }),
phantom: PhantomPinned,
} }
} }
@ -536,8 +559,10 @@ mod job {
Ordering::Release, Ordering::Release,
Ordering::Relaxed, Ordering::Relaxed,
); );
eprintln!("wait({:?}): eepy", self as *const _);
std::thread::park(); std::thread::park();
spin.reset(); spin.reset();
eprintln!("wait({:?}): woken", self as *const _);
// after sleeping, state should be `Finished` // after sleeping, state should be `Finished`
continue; continue;
@ -559,6 +584,11 @@ mod job {
return result; return result;
} else { } else {
// spin until lock is released. // spin until lock is released.
eprintln!(
"wait({:?}): spinning ({:?})",
self as *const _,
JobState::from_u8(state as u8).unwrap()
);
spin.spin(); spin.spin();
} }
} }
@ -617,10 +647,10 @@ mod job {
break; break;
} }
Err(tag) => { Err(tag) => {
// eprintln!( eprintln!(
// "complete(): spin waiting for lock to complete: ({:?})", "complete(): spin waiting for lock to complete: ({:?})",
// JobState::from_u8(tag as u8).unwrap() JobState::from_u8(tag as u8).unwrap()
// ); );
spin.spin(); spin.spin();
} }
} }
@ -733,7 +763,7 @@ use std::{
cell::UnsafeCell, cell::UnsafeCell,
collections::BTreeMap, collections::BTreeMap,
mem, mem,
pin::pin, pin::{pin, Pin},
ptr::NonNull, ptr::NonNull,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
@ -849,12 +879,12 @@ impl Scope {
SCOPE.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) }) SCOPE.with(|ptr| unsafe { (&*ptr.get()).map(|ptr| ptr.as_ref()) })
} }
fn push_front<T>(&self, job: &Job<T>) { fn push_front<T>(&self, job: Pin<&Job<T>>) {
unsafe { unsafe {
self.queue.as_mut_unchecked().push_front(job); self.queue.as_mut_unchecked().push_front(job);
} }
} }
fn push_back<T>(&self, job: &Job<T>) { fn push_back<T>(&self, job: Pin<&Job<T>>) {
unsafe { unsafe {
self.queue.as_mut_unchecked().push_back(job); self.queue.as_mut_unchecked().push_back(job);
} }
@ -876,7 +906,7 @@ impl Scope {
let b = StackJob::new(b); let b = StackJob::new(b);
let job = pin!(b.as_job()); let job = pin!(b.as_job());
self.push_front(&job); self.push_front(job.as_ref());
let ra = a(); let ra = a();
@ -888,7 +918,9 @@ impl Scope {
self.tick(); self.tick();
unsafe { b.unwrap()() } unsafe { b.unwrap()() }
} else { } else {
match self.wait_until::<RB>(unsafe { mem::transmute(&job) }) { match self.wait_until::<RB>(unsafe {
mem::transmute::<Pin<&Job<()>>, Pin<&Job<RB>>>(job.as_ref())
}) {
Some(Ok(t)) => t, Some(Ok(t)) => t,
Some(Err(payload)) => std::panic::resume_unwind(payload), Some(Err(payload)) => std::panic::resume_unwind(payload),
None => unsafe { b.unwrap()() }, None => unsafe { b.unwrap()() },
@ -932,7 +964,7 @@ impl Scope {
} }
#[cold] #[cold]
pub fn wait_until<T>(&self, job: &Job<T>) -> Option<std::thread::Result<T>> { pub fn wait_until<T>(&self, job: Pin<&Job<T>>) -> Option<std::thread::Result<T>> {
// let shared_job = self.context.shared.lock().jobs.remove(&self.index); // let shared_job = self.context.shared.lock().jobs.remove(&self.index);
// if let Some(ptr) = shared_job { // if let Some(ptr) = shared_job {
@ -955,6 +987,7 @@ impl Scope {
.pop_first() .pop_first()
.map(|(_, job)| job) .map(|(_, job)| job)
}) else { }) else {
// no more jobs, sleep instead
break; break;
}; };

View file

@ -2,26 +2,31 @@ use std::pin::Pin;
use super::{util::TaggedAtomicPtr, *}; use super::{util::TaggedAtomicPtr, *};
fn pin_ptr<T>(pin: &Pin<&mut T>) -> NonNull<T> {
let a = &raw const **pin;
unsafe { NonNull::new_unchecked(a.cast_mut()) }
}
#[test] #[test]
fn job_list_pop_back() { fn job_list_pop_back() {
let mut list = JobList::new(); let mut list = JobList::new();
let mut a = Job::empty(); let mut a = pin!(Job::empty());
let mut b = Job::empty(); let mut b = pin!(Job::empty());
let mut c = Job::empty(); let mut c = pin!(Job::empty());
unsafe { unsafe {
list.push_front(&a); list.push_front(a.as_ref());
list.push_front(&b); list.push_front(b.as_ref());
list.push_back(&c); list.push_back(c.as_ref());
} }
assert_eq!(list.pop_back(), NonNull::new(&mut c)); assert_eq!(list.pop_back(), Some(pin_ptr(&c)));
unsafe { unsafe {
list.push_front(&c); list.push_front(c.as_ref());
} }
assert_eq!(list.pop_back(), NonNull::new(&mut a)); assert_eq!(list.pop_back(), Some(pin_ptr(&a)));
assert_eq!(list.pop_back(), NonNull::new(&mut b)); assert_eq!(list.pop_back(), Some(pin_ptr(&b)));
assert_eq!(list.pop_back(), NonNull::new(&mut c)); assert_eq!(list.pop_back(), Some(pin_ptr(&c)));
assert_eq!(list.pop_back(), None); assert_eq!(list.pop_back(), None);
assert_eq!(list.pop_front(), None); assert_eq!(list.pop_front(), None);
} }
@ -29,23 +34,23 @@ fn job_list_pop_back() {
#[test] #[test]
fn job_list_pop_front() { fn job_list_pop_front() {
let mut list = JobList::new(); let mut list = JobList::new();
let mut a = Job::empty(); let mut a = pin!(Job::<()>::empty());
let mut b = Job::empty(); let mut b = pin!(Job::<()>::empty());
let mut c = Job::empty(); let mut c = pin!(Job::<()>::empty());
unsafe { unsafe {
list.push_front(&a); list.push_front(a.as_ref());
list.push_front(&b); list.push_front(b.as_ref());
list.push_back(&c); list.push_back(c.as_ref());
} }
assert_eq!(list.pop_front(), NonNull::new(&mut b)); assert_eq!(list.pop_front(), Some(pin_ptr(&b)));
unsafe { unsafe {
list.push_back(&b); list.push_back(b.as_ref());
} }
assert_eq!(list.pop_front(), NonNull::new(&mut a)); assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
assert_eq!(list.pop_front(), NonNull::new(&mut c)); assert_eq!(list.pop_front(), Some(pin_ptr(&c)));
assert_eq!(list.pop_front(), NonNull::new(&mut b)); assert_eq!(list.pop_front(), Some(pin_ptr(&b)));
assert_eq!(list.pop_front(), None); assert_eq!(list.pop_front(), None);
assert_eq!(list.pop_back(), None); assert_eq!(list.pop_back(), None);
} }
@ -53,22 +58,22 @@ fn job_list_pop_front() {
#[test] #[test]
fn unlink_job_middle() { fn unlink_job_middle() {
let mut list = JobList::new(); let mut list = JobList::new();
let mut a = Job::empty(); let mut a = pin!(Job::<()>::empty());
let b = Job::<()>::empty(); let mut b = pin!(Job::<()>::empty());
let mut c = Job::empty(); let mut c = pin!(Job::<()>::empty());
unsafe { unsafe {
list.push_front(&a); list.push_front(a.as_ref());
list.push_front(&b); list.push_front(b.as_ref());
list.push_front(&c); list.push_front(c.as_ref());
} }
unsafe { unsafe {
b.unlink(); b.unlink();
} }
assert_eq!(list.pop_front(), NonNull::new(&mut c)); assert_eq!(list.pop_front(), Some(pin_ptr(&c)));
assert_eq!(list.pop_front(), NonNull::new(&mut a)); assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
assert_eq!(list.pop_front(), None); assert_eq!(list.pop_front(), None);
assert_eq!(list.pop_back(), None); assert_eq!(list.pop_back(), None);
} }
@ -76,22 +81,22 @@ fn unlink_job_middle() {
#[test] #[test]
fn unlink_job_front() { fn unlink_job_front() {
let mut list = JobList::new(); let mut list = JobList::new();
let a = Job::<()>::empty(); let mut a = pin!(Job::<()>::empty());
let mut b = Job::<()>::empty(); let mut b = pin!(Job::<()>::empty());
let mut c = Job::<()>::empty(); let mut c = pin!(Job::<()>::empty());
unsafe { unsafe {
list.push_front(&a); list.push_front(a.as_ref());
list.push_front(&b); list.push_front(b.as_ref());
list.push_front(&c); list.push_front(c.as_ref());
} }
unsafe { unsafe {
a.unlink(); a.unlink();
} }
assert_eq!(list.pop_front(), NonNull::new(&mut c)); assert_eq!(list.pop_front(), Some(pin_ptr(&c)));
assert_eq!(list.pop_front(), NonNull::new(&mut b)); assert_eq!(list.pop_front(), Some(pin_ptr(&b)));
assert_eq!(list.pop_front(), None); assert_eq!(list.pop_front(), None);
assert_eq!(list.pop_back(), None); assert_eq!(list.pop_back(), None);
} }
@ -99,22 +104,22 @@ fn unlink_job_front() {
#[test] #[test]
fn unlink_job_back() { fn unlink_job_back() {
let mut list = JobList::new(); let mut list = JobList::new();
let mut a = Job::<()>::empty(); let mut a = pin!(Job::<()>::empty());
let mut b = Job::<()>::empty(); let mut b = pin!(Job::<()>::empty());
let c = Job::<()>::empty(); let mut c = pin!(Job::<()>::empty());
unsafe { unsafe {
list.push_front(&a); list.push_front(a.as_ref());
list.push_front(&b); list.push_front(b.as_ref());
list.push_front(&c); list.push_front(c.as_ref());
} }
unsafe { unsafe {
c.unlink(); c.unlink();
} }
assert_eq!(list.pop_front(), NonNull::new(&mut b)); assert_eq!(list.pop_front(), Some(pin_ptr(&b)));
assert_eq!(list.pop_front(), NonNull::new(&mut a)); assert_eq!(list.pop_front(), Some(pin_ptr(&a)));
assert_eq!(list.pop_front(), None); assert_eq!(list.pop_front(), None);
assert_eq!(list.pop_back(), None); assert_eq!(list.pop_back(), None);
} }
@ -122,10 +127,10 @@ fn unlink_job_back() {
#[test] #[test]
fn unlink_job_single() { fn unlink_job_single() {
let mut list = JobList::new(); let mut list = JobList::new();
let a = Job::<()>::empty(); let a = pin!(Job::<()>::empty());
unsafe { unsafe {
list.push_front(&a); list.push_front(a.as_ref());
} }
unsafe { unsafe {
@ -165,14 +170,3 @@ fn tagged_ptr_exchange_failure() {
assert_eq!(ptr.tag(Ordering::Relaxed), 1); assert_eq!(ptr.tag(Ordering::Relaxed), 1);
assert_eq!(ptr.ptr(Ordering::Relaxed).as_ptr(), boxed); assert_eq!(ptr.ptr(Ordering::Relaxed).as_ptr(), boxed);
} }
fn pinstuff() {
let pinned = pin!(5);
let b = pinned.as_ref();
let a = takes_pinned_ref(pinned.as_ref());
}
fn takes_pinned_ref(r: Pin<&i32>) -> i32 {
*r
}