From ebec679875e8b6b3749123570d686c512344a1ef Mon Sep 17 00:00:00 2001 From: Janis Date: Thu, 20 Feb 2025 19:25:31 +0100 Subject: [PATCH] warnings --- benches/join.rs | 17 ---------- src/job/mod.rs | 4 +-- src/job/spice.rs | 2 +- src/lib.rs | 50 +++++++++++++++++++--------- src/melange.rs | 20 +++++++---- src/praetor/mod.rs | 79 ++++++++++++++------------------------------ src/praetor/tests.rs | 36 +++++++++++--------- 7 files changed, 97 insertions(+), 111 deletions(-) diff --git a/benches/join.rs b/benches/join.rs index 94b31ab..ccdaa7d 100644 --- a/benches/join.rs +++ b/benches/join.rs @@ -1,16 +1,10 @@ #![feature(test)] -use std::{ - sync::{atomic::AtomicUsize, Arc}, - thread, - time::Duration, -}; fn available_parallelism() -> usize { bevy_tasks::available_parallelism().max(4) } use executor::{self}; use test::Bencher; -use tree::Node; extern crate test; @@ -62,17 +56,6 @@ mod tree { } } -const PRIMES: &'static [usize] = &[ - 1181, 1187, 1193, 1201, 1213, 1217, 1223, 1229, 1231, 1237, 1249, 1259, 1277, 1279, 1283, 1289, - 1291, 1297, 1301, 1303, 1307, 1319, 1321, 1327, 1361, 1367, 1373, 1381, 1399, 1409, 1423, 1427, - 1429, 1433, 1439, 1447, 1451, 1453, 1459, 1471, 1481, 1483, 1487, 1489, 1493, 1499, 1511, 1523, - 1531, 1543, 1549, 1553, 1559, 1567, 1571, 1579, 1583, 1597, 1601, 1607, 1609, 1613, 1619, 1621, - 1627, 1637, 1657, 1663, 1667, 1669, 1693, 1697, 1699, 1709, 1721, 1723, 1733, 1741, 1747, 1753, - 1759, 1777, 1783, 1787, 1789, 1801, 1811, 1823, 1831, 1847, 1861, 1867, 1871, 1873, 1877, 1879, - 1889, 1901, 1907, -]; - -const REPEAT: usize = 0x800; const TREE_SIZE: usize = 16; #[bench] diff --git a/src/job/mod.rs b/src/job/mod.rs index 6d09c1d..34d4eac 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -1,7 +1,5 @@ ///! Rayon's job logic -use std::{cell::UnsafeCell, marker::PhantomPinned, sync::atomic::AtomicBool}; - -use crate::latch::Latch; +use std::{cell::UnsafeCell, marker::PhantomPinned}; pub mod spice; pub mod v2; diff --git a/src/job/spice.rs b/src/job/spice.rs index 9cb6cba..8b13789 100644 --- a/src/job/spice.rs +++ b/src/job/spice.rs @@ -1 +1 @@ -use std::{cell::UnsafeCell, sync::atomic::AtomicU8}; + diff --git a/src/lib.rs b/src/lib.rs index e0f9611..9f7a42d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -594,6 +594,7 @@ impl ThreadPool { core::ptr::from_ref(self) as usize } + #[allow(dead_code)] fn push_local_or_inject_balanced(&self, task: TaskRef) { let global_len = self.global_queue.len(); WorkerThread::with(|worker| match worker { @@ -622,6 +623,7 @@ impl ThreadPool { }) } + #[allow(dead_code)] fn inject_many(&self, tasks: I) where I: Iterator, @@ -634,6 +636,7 @@ impl ThreadPool { self.wake_any(n); } + #[allow(unused_variables)] fn inject_maybe_local(&self, task: TaskRef) { #[cfg(all(not(feature = "never-local"), feature = "prefer-local"))] self.push_local_or_inject(task); @@ -649,6 +652,7 @@ impl ThreadPool { self.wake_any(1); } + #[allow(dead_code)] fn resize usize>(&'static self, size: F) -> usize { if WorkerThread::is_worker_thread() { // acquire required here? @@ -726,22 +730,32 @@ impl ThreadPool { new_size } + #[allow(dead_code)] fn ensure_one_worker(&'static self) -> usize { self.resize(|current| current.max(1)) } + #[allow(dead_code)] fn resize_to_available(&'static self) { self.resize_to(available_parallelism().map(NonZero::get).unwrap_or(1)); } + + #[allow(dead_code)] fn resize_to(&'static self, new_size: usize) -> usize { self.resize(|_| new_size) } + + #[allow(dead_code)] fn grow_by(&'static self, num_threads: usize) -> usize { self.resize(|current| current.saturating_add(num_threads)) } + + #[allow(dead_code)] fn shrink_by(&'static self, num_threads: usize) -> usize { self.resize(|current| current.saturating_sub(num_threads)) } + + #[allow(dead_code)] fn shrink_to(&'static self, num_threads: usize) -> usize { self.resize(|_| num_threads) } @@ -827,7 +841,7 @@ impl ThreadPool { } impl ThreadPool { - fn spawn(&'static self, f: Fn) + pub fn spawn(&'static self, f: Fn) where Fn: FnOnce() + Send + 'static, { @@ -837,7 +851,7 @@ impl ThreadPool { self.inject_maybe_local(taskref); } - fn spawn_future(&'static self, future: Fut) -> Task + pub fn spawn_future(&'static self, future: Fut) -> Task where Fut: Future + Send + 'static, T: Send + 'static, @@ -861,7 +875,7 @@ impl ThreadPool { task } - fn spawn_async(&'static self, f: Fn) -> Task + pub fn spawn_async(&'static self, f: Fn) -> Task where Fn: FnOnce() -> Fut + Send + 'static, Fut: Future + Send + 'static, @@ -870,7 +884,7 @@ impl ThreadPool { self.spawn_future(async move { f().await }) } - fn block_on(&'static self, mut future: Fut) + pub fn block_on(&'static self, mut future: Fut) where Fut: Future + Send + 'static, T: Send + 'static, @@ -895,7 +909,7 @@ impl ThreadPool { }); } - fn join(&'static self, f: F, g: G) -> (T, U) + pub fn join(&'static self, f: F, g: G) -> (T, U) where F: FnOnce() -> T + Send, G: FnOnce() -> U + Send, @@ -905,6 +919,7 @@ impl ThreadPool { self.join_threaded(f, g) } + #[allow(dead_code)] fn join_seq(&'static self, f: F, g: G) -> (T, U) where F: FnOnce() -> T + Send, @@ -969,7 +984,7 @@ impl ThreadPool { }) } - fn scope<'scope, Fn, T>(&'static self, f: Fn) -> T + pub fn scope<'scope, Fn, T>(&'static self, f: Fn) -> T where Fn: FnOnce(&Scope<'scope>) -> T + Send, T: Send, @@ -989,7 +1004,6 @@ pub struct WorkerThread { pool: &'static ThreadPool, index: usize, rng: rng::XorShift64Star, - last_heartbeat: UnsafeCell, } const HEARTBEAT_INTERVAL: core::time::Duration = const { core::time::Duration::from_micros(100) }; @@ -1008,6 +1022,7 @@ impl WorkerThread { self.pool } #[inline] + #[allow(dead_code)] fn index(&self) -> usize { self.index } @@ -1111,6 +1126,7 @@ impl WorkerThread { #[inline] fn find_any_task(&self) -> Option { // TODO: attempt stealing work here, too. + #[allow(unused_mut)] let mut task = self .pop_task() .or_else(|| self.claim_shoved_task()) @@ -1170,7 +1186,6 @@ impl WorkerThread { pool, index, rng: rng::XorShift64Star::new(pool as *const _ as u64 + index as u64), - last_heartbeat: UnsafeCell::new(std::time::Instant::now()), }); WORKER_THREAD_STATE.with(|cell| { @@ -1233,8 +1248,6 @@ fn heartbeat_loop(pool: &'static ThreadPool) { state.notify_termination(); } -use vec_queue::TaskQueue; - mod vec_queue { use std::{cell::UnsafeCell, collections::VecDeque}; @@ -1243,35 +1256,43 @@ mod vec_queue { impl TaskQueue { /// Creates a new [`TaskQueue`]. #[inline] + #[allow(dead_code)] pub const fn new() -> Self { Self(UnsafeCell::new(VecDeque::new())) } #[inline] + #[allow(dead_code)] pub fn get_mut(&self) -> &mut VecDeque { unsafe { &mut *self.0.get() } } #[inline] + #[allow(dead_code)] pub fn pop_front(&self) -> Option { self.get_mut().pop_front() } #[inline] + #[allow(dead_code)] pub fn pop_back(&self) -> Option { self.get_mut().pop_back() } #[inline] + #[allow(dead_code)] pub fn push_back(&self, t: T) { self.get_mut().push_back(t); } #[inline] + #[allow(dead_code)] pub fn push_front(&self, t: T) { self.get_mut().push_front(t); } #[inline] + #[allow(dead_code)] pub fn take(&self) -> VecDeque { let this = core::mem::replace(self.get_mut(), VecDeque::new()); this } #[inline] + #[allow(dead_code)] pub fn drain(&self) -> impl Iterator { self.take().into_iter() } @@ -1444,19 +1465,18 @@ mod rng { } } -mod scope { +pub mod scope { use std::{ future::Future, - marker::{PhantomData, PhantomPinned}, - pin::pin, + marker::PhantomData, ptr::{self, NonNull}, }; use async_task::{Runnable, Task}; use crate::{ - latch::{CountWakeLatch, Latch, Probe, ThreadWakeLatch}, - task::{HeapTask, StackTask, TaskRef}, + latch::{CountWakeLatch, Latch}, + task::{HeapTask, TaskRef}, ThreadPool, WorkerThread, }; diff --git a/src/melange.rs b/src/melange.rs index 129e7a6..d042ac0 100644 --- a/src/melange.rs +++ b/src/melange.rs @@ -1,9 +1,7 @@ use std::{ - cell::Cell, collections::VecDeque, marker::PhantomPinned, - ops::{Deref, DerefMut}, - ptr::{self, NonNull}, + ptr::NonNull, sync::{ atomic::{AtomicBool, Ordering}, Arc, Weak, @@ -14,7 +12,6 @@ use std::{ use crossbeam::utils::CachePadded; use parking_lot::{Condvar, Mutex}; -use parking_lot_core::SpinWait; use crate::{latch::*, ThreadControl}; mod job { @@ -156,6 +153,7 @@ mod job { unsafe { (self.harness)(self.this.as_ptr().cast(), (self as *const Self).cast(), s) }; } + #[allow(dead_code)] fn complete(&self, result: T) { let mut state = self.state.load(Ordering::Relaxed); let mask = JobState::Inline as u8; @@ -199,14 +197,18 @@ mod job { impl Job {} + #[allow(dead_code)] pub struct HeapJob { f: F, } impl HeapJob { + #[allow(dead_code)] pub fn new(f: F) -> Box { Box::new(Self { f }) } + + #[allow(dead_code)] pub fn into_boxed_job(self: Box) -> Box> where F: FnOnce(&mut WorkerThread) -> T + Send, @@ -251,21 +253,25 @@ mod job { } } + #[allow(dead_code)] pub struct StackJob { f: UnsafeCell>, } impl StackJob { + #[allow(dead_code)] pub fn new(f: F) -> Self { Self { f: UnsafeCell::new(ManuallyDrop::new(f)), } } + #[allow(dead_code)] pub unsafe fn unwrap(&self) -> F { unsafe { ManuallyDrop::take(&mut *self.f.get()) } } + #[allow(dead_code)] pub fn as_job(&self) -> Job where F: FnOnce(&mut WorkerThread) -> T + Send, @@ -368,6 +374,7 @@ impl SharedContext { .next() } + #[allow(dead_code)] fn pop_random_task(&mut self) -> Option> { let i = self.rng.next_usize(self.shared_tasks.len()); let (a, b) = self.shared_tasks.split_at_mut(i); @@ -381,7 +388,6 @@ pub struct WorkerThread { queue: VecDeque>, heartbeat: Arc, join_count: u8, - sleep_count: usize, _marker: PhantomPinned, } @@ -396,10 +402,10 @@ impl WorkerThread { queue: VecDeque::default(), join_count: 0, heartbeat, - sleep_count: 0, _marker: PhantomPinned, } } + #[allow(dead_code)] fn state(&self) -> &CachePadded { &self.context.threads[self.index] } @@ -578,6 +584,7 @@ impl WorkerThread { } impl Context { + #[allow(dead_code)] fn heartbeat(self: Arc, interaval: Duration) { let mut n = 0; loop { @@ -615,6 +622,7 @@ impl Context { } } + #[allow(dead_code)] fn heartbeat2(self: Arc, interval: Duration) { let mut i = 0; loop { diff --git a/src/praetor/mod.rs b/src/praetor/mod.rs index e8fd850..f209a3d 100644 --- a/src/praetor/mod.rs +++ b/src/praetor/mod.rs @@ -1,6 +1,6 @@ mod util { use std::{ - cell::{Cell, UnsafeCell}, + cell::UnsafeCell, marker::PhantomData, mem::ManuallyDrop, num::NonZero, @@ -81,6 +81,7 @@ mod util { .map_err(|ptr| ptr.addr() & mask) } + #[allow(dead_code)] pub fn set_ptr(&self, ptr: *mut T, success: Ordering, failure: Ordering) { let mask = Self::mask(); let ptr = ptr.cast::<()>(); @@ -127,15 +128,19 @@ mod util { pub struct SendPtr(NonNull); impl SendPtr { + #[allow(dead_code)] pub fn as_ptr(&self) -> *mut T { self.0.as_ptr() } + #[allow(dead_code)] pub unsafe fn new_unchecked(t: *const T) -> Self { unsafe { Self(NonNull::new_unchecked(t.cast_mut())) } } + #[allow(dead_code)] pub fn new(t: *const T) -> Option { NonNull::new(t.cast_mut()).map(Self) } + #[allow(dead_code)] pub fn cast(self) -> SendPtr { SendPtr(self.0.cast::()) } @@ -154,58 +159,24 @@ mod util { unsafe { &mut *self.0.as_ptr() } } } - - pub struct XorShift64Star { - state: Cell, - } - - impl XorShift64Star { - /// Initializes the prng with a seed. Provided seed must be nonzero. - pub fn new(seed: u64) -> Self { - XorShift64Star { - state: Cell::new(seed), - } - } - - /// Returns a pseudorandom number. - pub fn next(&self) -> u64 { - let mut x = self.state.get(); - debug_assert_ne!(x, 0); - x ^= x >> 12; - x ^= x << 25; - x ^= x >> 27; - self.state.set(x); - x.wrapping_mul(0x2545_f491_4f6c_dd1d) - } - - /// Return a pseudorandom number from `0..n`. - pub fn next_usize(&self, n: usize) -> usize { - (self.next() % n as u64) as usize - } - } } mod job { use std::{ any::Any, - cell::{Cell, UnsafeCell}, + cell::UnsafeCell, fmt::Debug, marker::PhantomPinned, mem::{self, ManuallyDrop, MaybeUninit}, pin::Pin, ptr::{self, NonNull}, - sync::atomic::{AtomicPtr, AtomicU8, Ordering}, + sync::atomic::Ordering, thread::Thread, }; use parking_lot_core::SpinWait; - use super::util::{SendPtr, TaggedAtomicPtr}; - - #[cfg_attr(target_pointer_width = "64", repr(align(16)))] - #[cfg_attr(target_pointer_width = "32", repr(align(8)))] - #[derive(Debug, Default, Clone, Copy)] - struct Size2([usize; 2]); + use super::util::TaggedAtomicPtr; pub struct Value(pub MaybeUninit>>); @@ -236,7 +207,7 @@ mod job { // the value can be stored inline iff the size of T is equal or // smaller than the size of the boxed type and the alignment of the // boxed type is an integer multiple of the alignment of T - mem::size_of::() < mem::size_of::>>() + mem::size_of::() <= mem::size_of::>>() && mem::align_of::>>() % mem::align_of::() == 0 } @@ -278,6 +249,7 @@ mod job { } impl JobState { + #[allow(dead_code)] const MASK: u8 = 0; // Self::Inline as u8 | Self::IsError as u8; fn from_u8(v: u8) -> Option { match v { @@ -687,14 +659,17 @@ mod job { impl Job {} + #[allow(dead_code)] pub struct HeapJob { f: F, } impl HeapJob { + #[allow(dead_code)] pub fn new(f: F) -> Box { Box::new(Self { f }) } + #[allow(dead_code)] pub fn into_boxed_job(self: Box) -> Box> where F: FnOnce() -> T + Send, @@ -793,6 +768,7 @@ thread_local! { impl Scope { /// locks shared context + #[allow(dead_code)] fn new() -> Self { let context = Context::global().clone(); Self::new_in(context) @@ -862,6 +838,7 @@ impl Scope { SCOPE.with(|ptr| unsafe { (&mut *ptr.get()).take() }) } + #[allow(dead_code)] fn current() -> Option> { SCOPE.with(|ptr| unsafe { *ptr.get() }) } @@ -875,6 +852,7 @@ impl Scope { self.queue.as_mut_unchecked().push_front(job); } } + #[allow(dead_code)] fn push_back(&self, job: Pin<&Job>) { unsafe { self.queue.as_mut_unchecked().push_back(job); @@ -989,7 +967,6 @@ impl Scope { self.heartbeat.store(false, Ordering::Relaxed); } - #[cold] pub fn wait_until(&self, job: Pin<&Job>) -> Option> { let shared_job = self.context.shared.lock().jobs.remove(&self.index); @@ -1011,11 +988,11 @@ impl Scope { .jobs .pop_first() .map(|(_, job)| job) - .or_else(|| { - self.pop_front().inspect(|job| unsafe { - job.as_ref().set_pending(); - }) - }) + // .or_else(|| { + // self.pop_front().inspect(|job| unsafe { + // job.as_ref().set_pending(); + // }) + // }) else { break; }; @@ -1029,19 +1006,15 @@ impl Scope { } } -fn join(a: A, b: B) -> (RA, RB) +#[allow(dead_code)] +pub fn join(a: A, b: B) -> (RA, RB) where RA: Send, RB: Send, A: FnOnce() -> RA + Send, B: FnOnce() -> RB + Send, { - Scope::with(|scope| scope.join_heartbeat(a, b)) -} - -struct Heartbeat { - weak: Arc, - index: usize, + Scope::with(|scope| scope.join(a, b)) } pub struct ThreadPool { @@ -1077,7 +1050,6 @@ struct SharedContext { // monotonic increasing id heartbeats_id: usize, should_stop: bool, - rng: util::XorShift64Star, } unsafe impl Send for SharedContext {} @@ -1104,7 +1076,6 @@ impl Context { heartbeats: BTreeMap::new(), heartbeats_id: 0, should_stop: false, - rng: util::XorShift64Star::new(37), }), shared_job: Condvar::new(), }); diff --git a/src/praetor/tests.rs b/src/praetor/tests.rs index 22ade42..f41cf03 100644 --- a/src/praetor/tests.rs +++ b/src/praetor/tests.rs @@ -10,9 +10,9 @@ fn pin_ptr(pin: &Pin<&mut T>) -> NonNull { #[test] fn job_list_pop_back() { let mut list = JobList::new(); - let mut a = pin!(Job::empty()); - let mut b = pin!(Job::empty()); - let mut c = pin!(Job::empty()); + let a = pin!(Job::empty()); + let b = pin!(Job::empty()); + let c = pin!(Job::empty()); unsafe { list.push_front(a.as_ref()); @@ -34,9 +34,9 @@ fn job_list_pop_back() { #[test] fn job_list_pop_front() { let mut list = JobList::new(); - let mut a = pin!(Job::<()>::empty()); - let mut b = pin!(Job::<()>::empty()); - let mut c = pin!(Job::<()>::empty()); + let a = pin!(Job::<()>::empty()); + let b = pin!(Job::<()>::empty()); + let c = pin!(Job::<()>::empty()); unsafe { list.push_front(a.as_ref()); @@ -58,9 +58,9 @@ fn job_list_pop_front() { #[test] fn unlink_job_middle() { let mut list = JobList::new(); - let mut a = pin!(Job::<()>::empty()); - let mut b = pin!(Job::<()>::empty()); - let mut c = pin!(Job::<()>::empty()); + let a = pin!(Job::<()>::empty()); + let b = pin!(Job::<()>::empty()); + let c = pin!(Job::<()>::empty()); unsafe { list.push_front(a.as_ref()); @@ -81,9 +81,9 @@ fn unlink_job_middle() { #[test] fn unlink_job_front() { let mut list = JobList::new(); - let mut a = pin!(Job::<()>::empty()); - let mut b = pin!(Job::<()>::empty()); - let mut c = pin!(Job::<()>::empty()); + let a = pin!(Job::<()>::empty()); + let b = pin!(Job::<()>::empty()); + let c = pin!(Job::<()>::empty()); unsafe { list.push_front(a.as_ref()); @@ -104,9 +104,9 @@ fn unlink_job_front() { #[test] fn unlink_job_back() { let mut list = JobList::new(); - let mut a = pin!(Job::<()>::empty()); - let mut b = pin!(Job::<()>::empty()); - let mut c = pin!(Job::<()>::empty()); + let a = pin!(Job::<()>::empty()); + let b = pin!(Job::<()>::empty()); + let c = pin!(Job::<()>::empty()); unsafe { list.push_front(a.as_ref()); @@ -191,6 +191,11 @@ fn value_inline_struct() { assert_eq!(inner.c, 3.2); } +#[test] +fn value_inline_ptr_sized() { + assert!(Value::::is_inline()); +} + #[test] fn value_boxed() { #[derive(Default, PartialEq, Debug)] @@ -228,6 +233,7 @@ fn value_inline_drop() { *self.inner += 1; } } + assert!(Value::>::is_inline()); let mut dropped = 0; { let inner = {