use std::{ future::Future, marker::PhantomData, sync::{Arc, atomic::AtomicU32}, time::Duration, }; use crate::device::{ DevicePools, Pool, PoolObject, Pooled, asdf::{ DeviceObject, InnerDeviceObject, traits::ExternallyManagedObject as ExternallyManagedObjectTrait, }, }; use crate::{Result, device::DeviceInner}; use super::Device; use ash::{prelude::VkResult, vk}; use crossbeam::channel::{Receiver, Sender}; type Message = (SyncPrimitive, std::task::Waker); pub struct SyncThreadpool { channel: (Sender, Receiver), timeout: u64, thread_dies_after: Duration, max_threads: u32, num_threads: Arc, } impl Default for SyncThreadpool { fn default() -> Self { Self::new() } } #[derive(Debug)] enum SyncPrimitive { Fence(Arc), // actually, I think this is an awful idea because I would have to hold a // lock on all queues. // DeviceIdle(Device), } impl SyncThreadpool { pub fn new() -> SyncThreadpool { Self::with_max_threads(512) } pub fn with_max_threads(max_threads: u32) -> SyncThreadpool { Self { // 0-capacity channel to ensure immediate consumption of fences channel: crossbeam::channel::bounded(0), max_threads, num_threads: Arc::new(AtomicU32::new(0)), timeout: u64::MAX, thread_dies_after: Duration::from_secs(5), } } fn try_spawn_thread(&self) -> Option<()> { use std::sync::atomic::Ordering; match self .num_threads .fetch_update(Ordering::Release, Ordering::Acquire, |i| { if i < self.max_threads { Some(i + 1) } else { None } }) { Ok(tid) => { struct SyncThread { timeout: u64, thread_dies_after: Duration, num_threads: Arc, rx: Receiver, } impl SyncThread { fn run(self, barrier: Arc) { tracing::trace!("spawned new sync thread"); barrier.wait(); while let Ok((sync, waker)) = self.rx.recv_timeout(self.thread_dies_after) { tracing::trace!("received ({:?}, {:?})", sync, waker); loop { let wait_result = match &sync { SyncPrimitive::Fence(fence) => { fence.wait_on(Some(self.timeout)) } // SyncPrimitive::DeviceIdle(device) => device.wait_idle(), }; match wait_result { Ok(_) => { waker.wake(); break; } Err(vk::Result::TIMEOUT) => {} Err(err) => { tracing::error!( "failed to wait on {sync:?} in waiter thread: {err}" ); break; } } } } // because I don't want some thread to not spawn as soon as this one exists self.num_threads.fetch_sub(1, Ordering::AcqRel); } } let thread = SyncThread { timeout: self.timeout, thread_dies_after: self.thread_dies_after, num_threads: self.num_threads.clone(), rx: self.channel.1.clone(), }; let barrier = Arc::new(std::sync::Barrier::new(2)); let _ = std::thread::Builder::new() .name(format!("fence-waiter-{tid}")) .spawn({ let barrier = barrier.clone(); move || { thread.run(barrier); } }) .expect("sync-threadpool waiter thread failed to spawn."); barrier.wait(); Some(()) } Err(_) => { tracing::error!( "sync-threadpool exceeded local thread limit ({})", self.max_threads ); None } } } fn spawn_waiter(&self, fence: Arc, waker: std::task::Waker) { let mut msg = (SyncPrimitive::Fence(fence), waker); while let Err(err) = self.channel.0.try_send(msg) { match err { crossbeam::channel::TrySendError::Full(msg2) => { msg = msg2; self.try_spawn_thread(); } crossbeam::channel::TrySendError::Disconnected(_) => { tracing::error!("sync-threadpool channel disconnected?"); unreachable!() } } } } } pub enum Fence { Dedicated { fence: InnerDeviceObject, }, Pooled { fence: PoolObject>>, }, } impl Pooled for vk::Fence { fn create_from_pool(pool: &Pool) -> Result { let fence = unsafe { pool.device .raw .create_fence(&vk::FenceCreateInfo::default(), None)? }; Ok(fence) } } impl ExternallyManagedObjectTrait>> for vk::Fence { unsafe fn destroy(self, pool: &Arc>) { pool.push(self); } } impl> ExternallyManagedObjectTrait for vk::Fence { unsafe fn destroy(self, device: &T) { unsafe { device.as_ref().raw.destroy_fence(self, None); } } } impl std::fmt::Debug for Fence { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Fence").field("fence", &self.raw()).finish() } } impl Fence { pub fn new_dedicated(device: Device, name: Option<&'static str>) -> Result { let fence = unsafe { device .raw .create_fence(&vk::FenceCreateInfo::default(), None)? }; Ok(Self::Dedicated { fence: DeviceObject::new_debug_named(device.shared.clone(), fence, name), }) } pub fn from_pool(pool: &Arc>, name: Option<&'static str>) -> Result { let fence = pool.get_debug_named(name)?; Ok(Self::Pooled { fence: PoolObject::new(fence, pool.clone()), }) } pub fn raw(&self) -> vk::Fence { match self { Fence::Dedicated { fence } => **fence, Fence::Pooled { fence } => **fence, } } fn device(&self) -> &Arc { match self { Fence::Dedicated { fence } => &fence.device(), Fence::Pooled { fence } => &fence.owner().device, } } pub fn wait_on(&self, timeout: Option) -> VkResult<()> { unsafe { self.device().raw.wait_for_fences( core::slice::from_ref(&self.raw()), true, timeout.unwrap_or(u64::MAX), )? } Ok(()) } pub fn is_signaled(&self) -> bool { unsafe { self.device() .raw .get_fence_status(self.raw()) .unwrap_or(false) } } pub fn reset(&self) -> Result<()> { unsafe { self.device() .raw .reset_fences(core::slice::from_ref(&self.raw()))? } Ok(()) } pub fn into_future<'a>(self) -> FenceFuture<'a> { FenceFuture::new(Arc::new(self)) } } #[derive(Debug, Clone, Copy)] pub enum SemaphoreType { Binary, Timeline(u64), } #[derive(Debug, Clone, Copy)] pub enum SemaphoreInner { Binary(vk::Semaphore), Timeline(vk::Semaphore), } impl ExternallyManagedObjectTrait> for SemaphoreInner { unsafe fn destroy(self, owner: &Arc) { match self { SemaphoreInner::Binary(semaphore) => { owner.binary_semaphores.push(BinarySemaphore(semaphore)) } SemaphoreInner::Timeline(semaphore) => { owner.timeline_semaphores.push(TimelineSemaphore(semaphore)) } } } } impl ExternallyManagedObjectTrait> for SemaphoreInner { unsafe fn destroy(self, owner: &Arc) { match self { SemaphoreInner::Binary(semaphore) | SemaphoreInner::Timeline(semaphore) => { unsafe { owner.raw.destroy_semaphore(semaphore, None) }; } } } } impl crate::device::asdf::traits::DebugNameable for SemaphoreInner { fn debug_name(&self, device: &DeviceInner, name: &str) { unsafe { device.debug_name_object(self.raw(), name); } } } impl SemaphoreInner { pub fn raw(&self) -> vk::Semaphore { match self { SemaphoreInner::Binary(semaphore) | SemaphoreInner::Timeline(semaphore) => *semaphore, } } pub fn semaphore_type(&self) -> SemaphoreType { match self { SemaphoreInner::Binary(_) => SemaphoreType::Binary, SemaphoreInner::Timeline(_) => SemaphoreType::Timeline(!0), } } } impl From for SemaphoreInner { fn from(value: BinarySemaphore) -> Self { SemaphoreInner::Binary(value.0) } } impl From for SemaphoreInner { fn from(value: TimelineSemaphore) -> Self { SemaphoreInner::Timeline(value.0) } } pub enum Semaphore { Dedicated { semaphore: InnerDeviceObject, }, Pooled { #[allow(private_interfaces)] semaphore: PoolObject>, }, } #[derive(Debug, Clone, Copy)] pub(crate) struct BinarySemaphore(pub(crate) vk::Semaphore); #[derive(Debug, Clone, Copy)] pub(crate) struct TimelineSemaphore(pub(crate) vk::Semaphore); // This is just so that ash can name these semaphore newtypes impl vk::Handle for BinarySemaphore { const TYPE: vk::ObjectType = ::TYPE; fn as_raw(self) -> u64 { self.0.as_raw() } fn from_raw(_: u64) -> Self { unimplemented!("BinarySemaphore cannot be created from raw handle") } } impl vk::Handle for TimelineSemaphore { const TYPE: vk::ObjectType = ::TYPE; fn as_raw(self) -> u64 { self.0.as_raw() } fn from_raw(_: u64) -> Self { unimplemented!("TimelineSemaphore cannot be created from raw handle") } } impl Pooled for BinarySemaphore { fn create_from_pool(pool: &Pool) -> Result { let mut type_info = vk::SemaphoreTypeCreateInfo::default().semaphore_type(vk::SemaphoreType::BINARY); let create_info = vk::SemaphoreCreateInfo::default().push_next(&mut type_info); let inner = unsafe { pool.device.raw.create_semaphore(&create_info, None)? }; Ok(Self(inner)) } } impl Pooled for TimelineSemaphore { fn create_from_pool(pool: &Pool) -> Result { let mut type_info = vk::SemaphoreTypeCreateInfo::default() .semaphore_type(vk::SemaphoreType::TIMELINE) .initial_value(0); let create_info = vk::SemaphoreCreateInfo::default().push_next(&mut type_info); let inner = unsafe { pool.device.raw.create_semaphore(&create_info, None)? }; Ok(Self(inner)) } } impl Semaphore { pub fn new_dedicated( device: Device, semaphore_type: SemaphoreType, name: Option<&'static str>, ) -> Result { let mut type_info = vk::SemaphoreTypeCreateInfo::default(); match semaphore_type { SemaphoreType::Binary => { type_info = type_info.semaphore_type(vk::SemaphoreType::BINARY); } SemaphoreType::Timeline(value) => { type_info = type_info .semaphore_type(vk::SemaphoreType::TIMELINE) .initial_value(value); } } let create_info = vk::SemaphoreCreateInfo::default().push_next(&mut type_info); let inner = unsafe { device.dev().create_semaphore(&create_info, None)? }; let inner = match semaphore_type { SemaphoreType::Binary => SemaphoreInner::Binary(inner), SemaphoreType::Timeline(_) => SemaphoreInner::Timeline(inner), }; Ok(Self::Dedicated { semaphore: DeviceObject::new_debug_named(device.shared.clone(), inner, name), }) } pub fn from_pool( device: Device, semaphore_type: SemaphoreType, name: Option<&'static str>, ) -> Result { let semaphore = match semaphore_type { SemaphoreType::Binary => { let semaphore: SemaphoreInner = device.pools.binary_semaphores.get_debug_named(name)?.into(); PoolObject::new(semaphore, device.pools.clone()) } SemaphoreType::Timeline(value) => { let semaphore: SemaphoreInner = device .pools .timeline_semaphores .get_debug_named(name)? .into(); let info = vk::SemaphoreSignalInfo::default() .semaphore(semaphore.raw()) .value(value); unsafe { device.raw.signal_semaphore(&info)?; } PoolObject::new(semaphore, device.pools.clone()) } }; Ok(Self::Pooled { semaphore }) } pub fn semaphore(&self) -> vk::Semaphore { match self { Semaphore::Dedicated { semaphore, .. } => semaphore.raw(), Semaphore::Pooled { semaphore, .. } => semaphore.raw(), } } } pub struct FenceFuture<'a> { fence: Arc, // lifetime parameter to prevent release of resources while future is pendign _pd: PhantomData<&'a ()>, } impl FenceFuture<'_> { pub fn new(fence: Arc) -> Self { Self { fence, _pd: PhantomData, } } pub fn block(&self) -> crate::Result<()> { self.fence.wait_on(None)?; self.fence.reset()?; Ok(()) } } impl Future for FenceFuture<'_> { type Output = (); fn poll( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { if self.fence.is_signaled() { tracing::trace!("fence ({:?}) is signaled", self.fence); _ = self.fence.reset(); std::task::Poll::Ready(()) } else { self.fence .device() .sync_threadpool() .spawn_waiter(self.fence.clone(), cx.waker().clone()); std::task::Poll::Pending } } }