yea.. basically deadlocked
This commit is contained in:
parent
9cc125e558
commit
6fe5351e59
|
@ -1140,6 +1140,8 @@ impl<T> JobReceiver<T> {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracing::trace!("received job ({:?}) result", &raw const *self);
|
||||||
|
|
||||||
// SAFETY: if we received a non-EMPTY tag, the value must be initialized.
|
// SAFETY: if we received a non-EMPTY tag, the value must be initialized.
|
||||||
// because we atomically set the taag to EMPTY, we can be sure that we're the only ones accessing the value.
|
// because we atomically set the taag to EMPTY, we can be sure that we're the only ones accessing the value.
|
||||||
let slot = unsafe { (&mut *self.channel.value.get()).assume_init_mut() };
|
let slot = unsafe { (&mut *self.channel.value.get()).assume_init_mut() };
|
||||||
|
|
|
@ -412,16 +412,17 @@ impl WorkerLatch {
|
||||||
**guard = false; // reset the mutex to false after waking up
|
**guard = false; // reset the mutex to false after waking up
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wait_with_lock_internal<T>(&self, other: &mut parking_lot::MutexGuard<'_, T>) {
|
fn wait_with_lock_internal<T>(
|
||||||
let key = &self.condvar as *const _ as usize;
|
condvar: &AtomicUsize,
|
||||||
let lock_addr = &self.mutex as *const _ as usize;
|
mutex: &mut parking_lot::MutexGuard<'_, bool>,
|
||||||
|
other: &mut parking_lot::MutexGuard<'_, T>,
|
||||||
|
) {
|
||||||
|
**mutex = true;
|
||||||
|
let key = condvar as *const _ as usize;
|
||||||
|
let lock_addr = parking_lot::MutexGuard::mutex(mutex) as *const _ as usize;
|
||||||
let mut requeued = false;
|
let mut requeued = false;
|
||||||
|
|
||||||
let mut guard = self.mutex.lock();
|
let state = condvar;
|
||||||
|
|
||||||
let state = unsafe { AtomicUsize::from_ptr(&self.condvar as *const _ as *mut usize) };
|
|
||||||
|
|
||||||
*guard = true; // set the mutex to true to indicate that the worker is waiting
|
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
let token = parking_lot_core::park(
|
let token = parking_lot_core::park(
|
||||||
|
@ -437,7 +438,7 @@ impl WorkerLatch {
|
||||||
true
|
true
|
||||||
},
|
},
|
||||||
|| {
|
|| {
|
||||||
drop(guard); // drop the guard to release the lock
|
parking_lot::MutexGuard::mutex(&mutex).force_unlock();
|
||||||
parking_lot::MutexGuard::mutex(&other).force_unlock();
|
parking_lot::MutexGuard::mutex(&other).force_unlock();
|
||||||
},
|
},
|
||||||
|k, was_last_thread| {
|
|k, was_last_thread| {
|
||||||
|
@ -455,35 +456,55 @@ impl WorkerLatch {
|
||||||
token
|
token
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// relock
|
|
||||||
let mut other2 = parking_lot::MutexGuard::mutex(&other).lock();
|
|
||||||
tracing::trace!("WorkerLatch wait_with_lock_internal: relocked other");
|
|
||||||
|
|
||||||
// because `other` is logically unlocked, we swap it with `other2` and then forget `other2`
|
// because `other` is logically unlocked, we swap it with `other2` and then forget `other2`
|
||||||
|
let mut other2 = parking_lot::MutexGuard::mutex(&other).lock();
|
||||||
core::mem::swap(&mut other2, other);
|
core::mem::swap(&mut other2, other);
|
||||||
core::mem::forget(other2);
|
core::mem::forget(other2);
|
||||||
|
|
||||||
let mut guard = self.mutex.lock();
|
// because `other` is logically unlocked, we swap it with `other2` and then forget `other2`
|
||||||
tracing::trace!("WorkerLatch wait_with_lock_internal: relocked self");
|
let mut mutex2 = parking_lot::MutexGuard::mutex(&mutex).lock();
|
||||||
|
core::mem::swap(&mut mutex2, mutex);
|
||||||
|
core::mem::forget(mutex2);
|
||||||
|
|
||||||
*guard = false; // reset the mutex to false after waking up
|
**mutex = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(other))]
|
#[tracing::instrument(level = "trace", skip_all, fields(
|
||||||
|
this = self as *const Self as usize,
|
||||||
|
))]
|
||||||
pub fn wait_with_lock<T>(&self, other: &mut parking_lot::MutexGuard<'_, T>) {
|
pub fn wait_with_lock<T>(&self, other: &mut parking_lot::MutexGuard<'_, T>) {
|
||||||
self.wait_with_lock_internal(other);
|
Self::wait_with_lock_internal(&self.condvar, &mut self.mutex.lock(), other);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, fields(
|
||||||
|
this = self as *const Self as usize,
|
||||||
|
))]
|
||||||
|
pub fn wait_with_lock_unless<F, T>(
|
||||||
|
&self,
|
||||||
|
other: &mut parking_lot::MutexGuard<'_, T>,
|
||||||
|
mut pred: F,
|
||||||
|
) where
|
||||||
|
F: FnMut(&mut T) -> bool,
|
||||||
|
{
|
||||||
|
let mut guard = self.mutex.lock();
|
||||||
|
if !pred(other.deref_mut()) {
|
||||||
|
Self::wait_with_lock_internal(&self.condvar, &mut guard, other);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn wait_with_lock_while<T, F>(&self, other: &mut parking_lot::MutexGuard<'_, T>, mut f: F)
|
pub fn wait_with_lock_while<T, F>(&self, other: &mut parking_lot::MutexGuard<'_, T>, mut f: F)
|
||||||
where
|
where
|
||||||
F: FnMut(&mut T) -> bool,
|
F: FnMut(&mut T) -> bool,
|
||||||
{
|
{
|
||||||
|
let mut guard = self.mutex.lock();
|
||||||
while f(other.deref_mut()) {
|
while f(other.deref_mut()) {
|
||||||
self.wait_with_lock_internal(other);
|
Self::wait_with_lock_internal(&self.condvar, &mut guard, other);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(f))]
|
#[tracing::instrument(level = "trace", skip_all, fields(
|
||||||
|
this = self as *const Self as usize,
|
||||||
|
))]
|
||||||
pub fn wait_until<F, T>(&self, mut f: F) -> T
|
pub fn wait_until<F, T>(&self, mut f: F) -> T
|
||||||
where
|
where
|
||||||
F: FnMut() -> Option<T>,
|
F: FnMut() -> Option<T>,
|
||||||
|
@ -501,13 +522,32 @@ impl WorkerLatch {
|
||||||
*self.mutex.lock()
|
*self.mutex.lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace")]
|
#[tracing::instrument(level = "trace", skip_all, fields(
|
||||||
|
this = self as *const Self as usize,
|
||||||
|
))]
|
||||||
fn notify(&self) {
|
fn notify(&self) {
|
||||||
let key = &self.condvar as *const _ as usize;
|
let from = &self.condvar as *const _ as usize;
|
||||||
|
let to = &self.mutex as *const _ as usize;
|
||||||
|
|
||||||
|
let validate = || {
|
||||||
|
if self.condvar.load(Ordering::Relaxed) != to {
|
||||||
|
return parking_lot_core::RequeueOp::Abort;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.condvar.store(0, Ordering::Relaxed);
|
||||||
|
|
||||||
|
parking_lot_core::RequeueOp::UnparkOneRequeueRest
|
||||||
|
};
|
||||||
|
|
||||||
|
let callback = |_op: parking_lot_core::RequeueOp,
|
||||||
|
_result: parking_lot_core::UnparkResult| {
|
||||||
|
parking_lot_core::DEFAULT_UNPARK_TOKEN
|
||||||
|
};
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
let n = parking_lot_core::unpark_all(key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
|
//let n = parking_lot_core::unpark_requeue(from, to, validate, callback);
|
||||||
tracing::trace!("WorkerLatch notify_one: unparked {} threads", n);
|
let n = parking_lot_core::unpark_all(from, parking_lot_core::DEFAULT_UNPARK_TOKEN);
|
||||||
|
tracing::trace!("WorkerLatch notify_one: unparked {:?}", n);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,9 @@ impl WorkerThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerThread {
|
impl WorkerThread {
|
||||||
#[tracing::instrument(level = "trace", skip_all)]
|
#[tracing::instrument(level = "trace", skip_all, fields(
|
||||||
|
worker = self.heartbeat.index(),
|
||||||
|
))]
|
||||||
pub fn run(self: Box<Self>, barrier: Arc<Barrier>) {
|
pub fn run(self: Box<Self>, barrier: Arc<Barrier>) {
|
||||||
let this = Box::into_raw(self);
|
let this = Box::into_raw(self);
|
||||||
unsafe {
|
unsafe {
|
||||||
|
@ -114,7 +116,7 @@ impl WorkerThread {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all)]
|
#[tracing::instrument(level = "trace", skip_all)]
|
||||||
pub(crate) fn find_work_or_wait_unless<F>(&self, mut pred: F) -> Option<NonNull<Job>>
|
pub(crate) fn find_work_or_wait_unless<F>(&self, pred: F) -> Option<NonNull<Job>>
|
||||||
where
|
where
|
||||||
F: FnMut(&mut crate::context::Shared) -> bool,
|
F: FnMut(&mut crate::context::Shared) -> bool,
|
||||||
{
|
{
|
||||||
|
@ -128,12 +130,12 @@ impl WorkerThread {
|
||||||
// this is very important, because the lock must be held when
|
// this is very important, because the lock must be held when
|
||||||
// notifying us of the result of a job we scheduled.
|
// notifying us of the result of a job we scheduled.
|
||||||
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||||
if !pred(std::ops::DerefMut::deref_mut(&mut guard)) {
|
|
||||||
// no jobs found, wait for a heartbeat or a new job
|
// no jobs found, wait for a heartbeat or a new job
|
||||||
tracing::trace!("WorkerThread::find_work_or_wait_unless: waiting for new job");
|
// tracing::trace!(worker = self.heartbeat.index(), "waiting for new job");
|
||||||
self.heartbeat.latch().wait_with_lock(&mut guard);
|
self.heartbeat
|
||||||
tracing::trace!("WorkerThread::find_work_or_wait_unless: woken up from wait");
|
.latch()
|
||||||
}
|
.wait_with_lock_unless(&mut guard, pred);
|
||||||
|
// tracing::trace!(worker = self.heartbeat.index(), "woken up from wait");
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue