From 9d132066d3395e597f0fb263ed49abf7beef8494 Mon Sep 17 00:00:00 2001 From: Janis Date: Sat, 21 Dec 2024 04:15:18 +0100 Subject: [PATCH] fix singleuse command buffer, waiting on swapchain drop --- crates/renderer/src/commands.rs | 56 ++++++--- crates/renderer/src/lib.rs | 209 +++++++++++++++++++++----------- crates/renderer/src/sync.rs | 31 ++++- 3 files changed, 201 insertions(+), 95 deletions(-) diff --git a/crates/renderer/src/commands.rs b/crates/renderer/src/commands.rs index 433f7c1..a828988 100644 --- a/crates/renderer/src/commands.rs +++ b/crates/renderer/src/commands.rs @@ -8,6 +8,7 @@ use ash::{prelude::*, vk}; pub struct SingleUseCommandPool { device: Device, pool: vk::CommandPool, + queue: Queue, } impl Drop for SingleUseCommandPool { @@ -19,15 +20,27 @@ impl Drop for SingleUseCommandPool { } impl SingleUseCommandPool { - pub fn new(device: Device, family_index: u32) -> VkResult { + pub fn new(device: Device, queue: Queue) -> VkResult> { let pool_info = vk::CommandPoolCreateInfo::default() - .queue_family_index(family_index) + .queue_family_index(queue.family()) .flags(vk::CommandPoolCreateFlags::TRANSIENT); let pool = unsafe { device.dev().create_command_pool(&pool_info, None)? }; - Ok(Self { device, pool }) + Ok(Arc::new(Self { + device, + pool, + queue, + })) + } + + pub fn alloc(self: &Arc) -> VkResult { + SingleUseCommand::new(self.device.clone(), self.clone()) + } + + pub fn queue(&self) -> &Queue { + &self.queue } pub fn pool(&self) -> vk::CommandPool { @@ -37,7 +50,7 @@ impl SingleUseCommandPool { pub struct SingleUseCommand { device: Device, - pool: vk::CommandPool, + pool: Arc, buffer: vk::CommandBuffer, } @@ -46,23 +59,28 @@ impl Drop for SingleUseCommand { unsafe { self.device .dev() - .free_command_buffers(self.pool, &[self.buffer]) + .free_command_buffers(self.pool.pool(), &[self.buffer]) }; } } impl SingleUseCommand { - pub fn new(device: Device, pool: vk::CommandPool) -> VkResult { + pub fn new( + device: Device, + pool: Arc, + ) -> VkResult { let buffer = unsafe { let alloc_info = vk::CommandBufferAllocateInfo::default() .command_buffer_count(1) - .command_pool(pool) + .command_pool(pool.pool()) .level(vk::CommandBufferLevel::PRIMARY); let buffer = device.dev().allocate_command_buffers(&alloc_info)?[0]; - let begin_info = vk::CommandBufferBeginInfo::default() - .flags(vk::CommandBufferUsageFlags::ONE_TIME_SUBMIT); - device.dev().begin_command_buffer(buffer, &begin_info)?; + device.dev().begin_command_buffer( + buffer, + &vk::CommandBufferBeginInfo::default() + .flags(vk::CommandBufferUsageFlags::ONE_TIME_SUBMIT), + )?; buffer }; @@ -73,13 +91,12 @@ impl SingleUseCommand { }) } - pub fn command_buffer(&self) -> vk::CommandBuffer { + pub fn buffer(&self) -> vk::CommandBuffer { self.buffer } pub fn submit_fence( &self, - queue: Queue, wait: Option<(vk::Semaphore, vk::PipelineStageFlags)>, signal: Option, fence: Option, @@ -105,35 +122,37 @@ impl SingleUseCommand { } let fence = fence.unwrap_or(vk::Fence::null()); - queue.with_locked(|queue| unsafe { + self.pool.queue().with_locked(|queue| unsafe { self.device.dev().queue_submit(queue, &[submit_info], fence) })?; - tracing::info!("submitted queue {:?} and fence {:?}", queue, fence); + tracing::info!( + "submitted queue {:?} and fence {:?}", + self.pool.queue(), + fence + ); Ok(()) } pub fn submit_async<'a>( &'a self, - queue: Queue, wait: Option<(vk::Semaphore, vk::PipelineStageFlags)>, signal: Option, fence: Arc, ) -> VkResult> { let device = self.device.clone(); - self.submit_fence(queue, wait, signal, Some(fence.fence()))?; + self.submit_fence(wait, signal, Some(fence.fence()))?; Ok(unsafe { FenceFuture::new(fence) }) } pub fn submit_blocking( self, - queue: Queue, wait: Option<(vk::Semaphore, vk::PipelineStageFlags)>, signal: Option, ) -> VkResult<()> { let fence = Arc::new(sync::Fence::create(self.device.clone())?); - let future = self.submit_async(queue, wait, signal, fence)?; + let future = self.submit_async(wait, signal, fence)?; future.block(); Ok(()) } @@ -145,7 +164,6 @@ mod tests { async fn async_submit(cmd: SingleUseCommand, queue: Queue) { cmd.submit_async( - queue, None, None, Arc::new(sync::Fence::create(cmd.device.clone()).unwrap()), diff --git a/crates/renderer/src/lib.rs b/crates/renderer/src/lib.rs index 9373d96..0171025 100644 --- a/crates/renderer/src/lib.rs +++ b/crates/renderer/src/lib.rs @@ -1,4 +1,10 @@ -#![feature(c_str_module, closure_lifetime_binder, let_chains, negative_impls)] +#![feature( + c_str_module, + closure_lifetime_binder, + let_chains, + negative_impls, + map_try_insert +)] #![allow(unused)] use std::{ borrow::Borrow, @@ -104,13 +110,20 @@ fn compatible_extension_properties( } #[derive(Clone, Debug)] -struct Queue(Arc>); +struct Queue(Arc>, u32); impl Queue { fn new(device: &ash::Device, family: u32, index: u32) -> Self { - Self(Arc::new(Mutex::new(unsafe { - device.get_device_queue(family, index) - }))) + Self( + Arc::new(Mutex::new(unsafe { + device.get_device_queue(family, index) + })), + family, + ) + } + + pub fn family(&self) -> u32 { + self.1 } pub fn with_locked T>(&self, map: F) -> T { @@ -432,20 +445,21 @@ impl AsRef for Instance { #[derive(Debug, Default)] struct DeviceQueueFamilies { - graphics: u32, - present: Option, - async_compute: Option, - transfer: Option, + families: Vec<(u32, u32)>, + graphics: (u32, u32), + present: Option<(u32, u32)>, + async_compute: Option<(u32, u32)>, + transfer: Option<(u32, u32)>, } impl DeviceQueueFamilies { fn swapchain_family_indices(&self) -> ArrayVec<[u32; 2]> { - let mut indices = array_vec!([u32; 2] => self.graphics); + let mut indices = array_vec!([u32; 2] => self.graphics.0); if let Some(present) = self.present - && present != self.graphics + && present.0 != self.graphics.0 { - indices.push(present); + indices.push(present.0); } indices @@ -458,6 +472,9 @@ struct DeviceInner { device: ash::Device, swapchain: khr::swapchain::Device, debug_utils: ash::ext::debug_utils::Device, + allocated_queues: BTreeMap<(u32, u32), Queue>, + // these are resident in allocated_queues, and may in fact be clones of each + // other, for ease of access main_queue: Queue, compute_queue: Queue, transfer_queue: Queue, @@ -465,7 +482,15 @@ struct DeviceInner { sync_threadpool: sync::SyncThreadpool, } -#[derive(Clone)] +impl core::fmt::Debug for DeviceInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DeviceInner") + .field("device", &self.device.handle()) + .finish() + } +} + +#[derive(Clone, Debug)] pub struct Device(Arc); pub type WeakDevice = std::sync::Weak; @@ -500,6 +525,32 @@ impl Device { fn present_queue(&self) -> &Queue { &self.0.present_queue } + + unsafe fn lock_queues(&self) { + // this is obviously awful, allocating for this + self.0 + .allocated_queues + .values() + .for_each(|q| core::mem::forget(q.lock())); + } + + unsafe fn unlock_queues(&self) { + self.0 + .allocated_queues + .values() + .for_each(|q| unsafe { q.0.force_unlock() }); + } + + fn wait_idle(&self) -> VkResult<()> { + tracing::warn!("locking all queues and waiting for device to idle"); + unsafe { + self.lock_queues(); + self.dev().device_wait_idle()?; + self.unlock_queues(); + } + tracing::warn!("finished waiting: unlocking all queues."); + Ok(()) + } } impl AsRef for Device { @@ -594,6 +645,7 @@ pub struct Swapchain { impl Drop for Swapchain { fn drop(&mut self) { unsafe { + self.device.wait_idle(); info!("dropping swapchain {:?}", self.swapchain); for view in &self.image_views { self.device.dev().destroy_image_view(*view, None); @@ -1497,18 +1549,7 @@ impl Vulkan { queue_families.find_first(|family| family.is_present) } else { None - }; - - let async_compute = - queue_families.find_first(|family| family.is_compute); - let transfer = queue_families.find_first(|family| family.is_transfer); - - // family of each queue, of which one is allocated for each queue, with graphics being the fallback queue for compute and transfer, and present possibly being `None`, in which case it is Graphics - let queues = DeviceQueueFamilies { - graphics, - async_compute, - transfer, - present: present.or({ + }.or({ if display_handle.is_none() { // in this case the graphics queue will be used by default tracing::info!("no present queue available, using graphics queue as fallback for headless_surface"); @@ -1516,17 +1557,12 @@ impl Vulkan { } else { tracing::warn!("no present queue available, this is unexpected!"); None} - }), - }; + }); - queues - } + let async_compute = + queue_families.find_first(|family| family.is_compute); + let transfer = queue_families.find_first(|family| family.is_transfer); - fn create_device( - instance: Arc, - pdev: PhysicalDevice, - features: &mut PhysicalDeviceFeatures, - ) -> Result { let mut unique_families = BTreeMap::::new(); let mut helper = |family: u32| { @@ -1546,24 +1582,43 @@ impl Vulkan { (family, index) }; - let graphics_family_and_index = helper(pdev.queue_families.graphics); - let compute_family_and_index = - pdev.queue_families.async_compute.map(|f| helper(f)); - let transfer_family_and_index = - pdev.queue_families.transfer.map(|f| helper(f)); - let present_family_and_index = - pdev.queue_families.present.map(|f| helper(f)); + let graphics = helper(graphics); + let async_compute = async_compute.map(|f| helper(f)); + let transfer = transfer.map(|f| helper(f)); + let present = present.map(|f| helper(f)); - let priorities = vec![ - 1.0f32; - unique_families.iter().fold(0, |acc, (_, num)| acc + *num) - as usize - ]; - - let queue_infos = unique_families + let families = unique_families .into_iter() - .filter(|&(_, count)| count > 0) - .map(|(family, queues)| { + .filter(|&(_family, count)| count > 0) + .collect::>(); + + // family of each queue, of which one is allocated for each queue, with + // graphics being the fallback queue for compute and transfer, and + // present possibly being `None`, in which case it is Graphics + let queues = DeviceQueueFamilies { + families, + graphics, + async_compute, + transfer, + present, + }; + + queues + } + + fn create_device( + instance: Arc, + pdev: PhysicalDevice, + features: &mut PhysicalDeviceFeatures, + ) -> Result { + // we have 4 queues at most: graphics, compute, transfer, present + let priorities = [1.0f32; 4]; + + let queue_infos = pdev + .queue_families + .families + .iter() + .map(|&(family, queues)| { vk::DeviceQueueCreateInfo::default() .queue_family_index(family) .queue_priorities(&priorities[..queues as usize]) @@ -1588,23 +1643,38 @@ impl Vulkan { &device_info, None, )?; - let main_queue = Queue::new( - &device, - graphics_family_and_index.0, - graphics_family_and_index.1, - ); - device.get_device_queue( - graphics_family_and_index.0, - graphics_family_and_index.1, - ); - let present_queue = present_family_and_index - .map(|(f, i)| Queue::new(&device, f, i)) + + let allocated_queues = queue_infos + .iter() + .flat_map(|info| { + (0..info.queue_count).map(|i| { + ( + (info.queue_family_index, i), + Queue::new(&device, info.queue_family_index, i), + ) + }) + }) + .collect::>(); + + let get_queue = |(family, index)| { + allocated_queues.get(&(family, index)).cloned().unwrap() + }; + + let main_queue = get_queue(pdev.queue_families.graphics); + let present_queue = pdev + .queue_families + .present + .map(get_queue) .unwrap_or(main_queue.clone()); - let compute_queue = compute_family_and_index - .map(|(f, i)| Queue::new(&device, f, i)) + let compute_queue = pdev + .queue_families + .async_compute + .map(get_queue) .unwrap_or(main_queue.clone()); - let transfer_queue = transfer_family_and_index - .map(|(f, i)| Queue::new(&device, f, i)) + let transfer_queue = pdev + .queue_families + .transfer + .map(get_queue) .unwrap_or(compute_queue.clone()); Device::new(DeviceInner { @@ -1619,6 +1689,7 @@ impl Vulkan { &device, ), instance, + allocated_queues, main_queue, present_queue, compute_queue, @@ -1944,13 +2015,12 @@ impl Renderer { let pool = commands::SingleUseCommandPool::new( dev.clone(), - dev.queue_families().graphics, + dev.graphics_queue().clone(), )?; for ctx in self.window_contexts.values() { - let cmd = - commands::SingleUseCommand::new(dev.clone(), pool.pool())?; - let buffer = cmd.command_buffer(); + let cmd = pool.alloc()?; + let buffer = cmd.buffer(); let (frame, suboptimal) = smol::block_on( ctx.current_swapchain.read().clone().acquire_image(), @@ -2035,7 +2105,6 @@ impl Renderer { dev.dev().cmd_pipeline_barrier2(buffer, &dependency_info); let future = cmd.submit_async( - dev.graphics_queue().clone(), Some((frame.acquire, vk::PipelineStageFlags::ALL_COMMANDS)), Some(frame.release), Arc::new(sync::Fence::create(dev.clone())?), diff --git a/crates/renderer/src/sync.rs b/crates/renderer/src/sync.rs index af71301..89d65e2 100644 --- a/crates/renderer/src/sync.rs +++ b/crates/renderer/src/sync.rs @@ -9,7 +9,7 @@ use super::Device; use ash::{prelude::*, vk}; use crossbeam::channel::{Receiver, Sender}; -type Message = (Arc, std::task::Waker); +type Message = (SyncPrimitive, std::task::Waker); pub struct SyncThreadpool { channel: (Sender, Receiver), @@ -19,6 +19,14 @@ pub struct SyncThreadpool { num_threads: Arc, } +#[derive(Debug)] +enum SyncPrimitive { + Fence(Arc), + // actually, I think this is an awful idea because I would have to hold a + // lock on all queues. + DeviceIdle(Device), +} + impl SyncThreadpool { pub fn new() -> SyncThreadpool { Self::with_max_threads(512) @@ -60,16 +68,25 @@ impl SyncThreadpool { fn run(self, barrier: Arc) { tracing::info!("spawned new sync thread"); barrier.wait(); - while let Ok((fence, waker)) = + while let Ok((sync, waker)) = self.rx.recv_timeout(self.thread_dies_after) { tracing::info!( "received ({:?}, {:?})", - fence, + sync, waker ); loop { - match fence.wait_on(Some(self.timeout)) { + let wait_result = match &sync { + SyncPrimitive::Fence(fence) => { + fence.wait_on(Some(self.timeout)) + } + SyncPrimitive::DeviceIdle(device) => { + device.wait_idle() + } + }; + + match wait_result { Ok(_) => { waker.wake(); break; @@ -77,13 +94,15 @@ impl SyncThreadpool { Err(vk::Result::TIMEOUT) => {} Err(err) => { tracing::error!( - "failed to wait on fence in waiter thread: {err}" + "failed to wait on {sync:?} in waiter thread: {err}" ); break; } } } } + + // because I don't want some thread to not spawn as soon as this one exists self.num_threads.fetch_sub(1, Ordering::AcqRel); } } @@ -120,7 +139,7 @@ impl SyncThreadpool { fn spawn_waiter(&self, fence: Arc, waker: std::task::Waker) { use std::sync::atomic::Ordering; - let mut msg = (fence, waker); + let mut msg = (SyncPrimitive::Fence(fence), waker); while let Err(err) = self.channel.0.try_send(msg) { match err { crossbeam::channel::TrySendError::Full(msg2) => {