From f3464c6ac252098e79a2caa2394c69750a808eaf Mon Sep 17 00:00:00 2001 From: Andreas Pehrson Date: Fri, 5 Apr 2024 16:08:48 +0200 Subject: [PATCH] Dispose of shared vpio units once they've not been used for 10 seconds --- src/backend/mod.rs | 160 ++++++++++++++++++++++++++------ src/backend/tests/api.rs | 71 ++++++++++++++ src/backend/tests/interfaces.rs | 75 +++++++++++---- 3 files changed, 257 insertions(+), 49 deletions(-) diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 5392870..80b260e 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -64,6 +64,8 @@ const APPLE_STUDIO_DISPLAY_USB_ID: &str = "05AC:1114"; const SAFE_MIN_LATENCY_FRAMES: u32 = 128; const SAFE_MAX_LATENCY_FRAMES: u32 = 512; +const VPIO_IDLE_TIMEOUT: Duration = Duration::from_secs(10); + bitflags! { #[allow(non_camel_case_types)] #[derive(Clone, Debug, PartialEq, Copy)] @@ -2155,18 +2157,29 @@ impl LatencyController { struct SharedStorageInternal { // Storage for shared elements. elements: Vec, + // Number of elements in use, i.e. all elements created/taken and not recycled. + outstanding_element_count: usize, + // Used for invalidation of in-flight tasks to clear elements. + // Incremented when something takes a shared element. + generation: usize, } #[derive(Debug)] struct SharedStorage { + queue: Queue, + idle_timeout: Duration, storage: Mutex>, } -impl SharedStorage { - fn new() -> Self { +impl SharedStorage { + fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self { Self { + queue, + idle_timeout, storage: Mutex::new(SharedStorageInternal:: { elements: Vec::default(), + outstanding_element_count: 0, + generation: 0, }), } } @@ -2174,12 +2187,40 @@ impl SharedStorage { fn take_locked(guard: &mut MutexGuard<'_, SharedStorageInternal>) -> Result { if let Some(e) = guard.elements.pop() { cubeb_log!("Taking shared element #{}.", guard.elements.len()); + guard.outstanding_element_count += 1; + guard.generation += 1; return Ok(e); } Err(Error::not_supported()) } + fn create_with_locked( + guard: &mut MutexGuard<'_, SharedStorageInternal>, + f: F, + ) -> Result + where + F: FnOnce() -> Result, + { + let start = Instant::now(); + match f() { + Ok(obj) => { + cubeb_log!( + "Just created shared element #{}. Took {}s.", + guard.outstanding_element_count, + (Instant::now() - start).as_secs_f32() + ); + guard.outstanding_element_count += 1; + guard.generation += 1; + Ok(obj) + } + Err(_) => { + cubeb_log!("Creating shared element failed"); + Err(Error::error()) + } + } + } + #[cfg(test)] fn take(&self) -> Result { let mut guard = self.storage.lock().unwrap(); @@ -2191,38 +2232,87 @@ impl SharedStorage { F: FnOnce() -> Result, { let mut guard = self.storage.lock().unwrap(); - SharedStorage::take_locked(&mut guard).or_else(|_| { - let start = Instant::now(); - match f() { - Ok(obj) => { - cubeb_log!( - "Just created shared element. Took {}s.", - (Instant::now() - start).as_secs_f32() - ); - Ok(obj) - } - Err(_) => { - cubeb_log!("Creating shared element failed"); - Err(Error::error()) - } - } - }) + SharedStorage::take_locked(&mut guard) + .or_else(|_| SharedStorage::create_with_locked(&mut guard, f)) } fn recycle(&self, obj: T) { let mut guard = self.storage.lock().unwrap(); - cubeb_log!("Recycling shared element #{}.", guard.elements.len()); + guard.outstanding_element_count -= 1; + cubeb_log!( + "Recycling shared element #{}. Nr of live elements now {}.", + guard.elements.len(), + guard.outstanding_element_count + ); guard.elements.push(obj); } + + fn clear_locked(guard: &mut MutexGuard<'_, SharedStorageInternal>) { + let count = guard.elements.len(); + let start = Instant::now(); + guard.elements.clear(); + cubeb_log!( + "Cleared {} shared element{}. Took {}s.", + count, + if count == 1 { "" } else { "s" }, + (Instant::now() - start).as_secs_f32() + ); + } + + fn clear(&self) { + debug_assert_running_serially(); + let mut guard = self.storage.lock().unwrap(); + SharedStorage::clear_locked(&mut guard); + } + + fn clear_if_all_idle_async(storage: &Arc>) { + let (queue, outstanding_element_count, generation) = { + let guard = storage.storage.lock().unwrap(); + ( + storage.queue.clone(), + guard.outstanding_element_count, + guard.generation, + ) + }; + if outstanding_element_count > 0 { + cubeb_log!( + "Not clearing shared voiceprocessing unit storage because {} elements are in use. Generation={}.", + outstanding_element_count, + generation + ); + return; + } + cubeb_log!( + "Clearing shared voiceprocessing unit storage in {}s if still at generation {}.", + storage.idle_timeout.as_secs_f32(), + generation + ); + let storage = storage.clone(); + queue.run_after(Instant::now() + storage.idle_timeout, move || { + let mut guard = storage.storage.lock().unwrap(); + if generation != guard.generation { + cubeb_log!( + "Not clearing shared voiceprocessing unit storage for generation {} as we're now at {}.", + generation, + guard.generation + ); + return; + } + SharedStorage::clear_locked(&mut guard); + }); + } } #[derive(Debug)] -struct OwningHandle { +struct OwningHandle +where + T: Send, +{ storage: Weak>, obj: Option, } -impl OwningHandle { +impl OwningHandle { fn new(storage: Weak>, obj: T) -> Self { Self { storage, @@ -2231,19 +2321,19 @@ impl OwningHandle { } } -impl AsRef for OwningHandle { +impl AsRef for OwningHandle { fn as_ref(&self) -> &T { self.obj.as_ref().unwrap() } } -impl AsMut for OwningHandle { +impl AsMut for OwningHandle { fn as_mut(&mut self) -> &mut T { self.obj.as_mut().unwrap() } } -impl Drop for OwningHandle { +impl Drop for OwningHandle { fn drop(&mut self) { let storage = self.storage.upgrade(); assert!( @@ -2256,6 +2346,7 @@ impl Drop for OwningHandle { } let obj = self.obj.take().unwrap(); storage.recycle(obj); + SharedStorage::clear_if_all_idle_async(&storage); } } @@ -2277,16 +2368,22 @@ unsafe impl Send for VoiceProcessingUnit {} struct SharedVoiceProcessingUnitManager { sync_storage: Mutex>>>, queue: Queue, + idle_timeout: Duration, } impl SharedVoiceProcessingUnitManager { - fn new(queue: Queue) -> Self { + fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self { Self { sync_storage: Mutex::new(None), queue, + idle_timeout, } } + fn new(queue: Queue) -> Self { + SharedVoiceProcessingUnitManager::with_idle_timeout(queue, VPIO_IDLE_TIMEOUT) + } + fn ensure_storage_locked( &self, guard: &mut MutexGuard>>>, @@ -2295,7 +2392,10 @@ impl SharedVoiceProcessingUnitManager { return; } cubeb_log!("Creating shared voiceprocessing storage."); - let storage = SharedStorage::::new(); + let storage = SharedStorage::::with_idle_timeout( + self.queue.clone(), + self.idle_timeout, + ); let old_storage = guard.replace(Arc::from(storage)); assert!(old_storage.is_none()); } @@ -2333,10 +2433,7 @@ impl Drop for SharedVoiceProcessingUnitManager { if guard.is_none() { return; } - let _last_ref_storage: Option> = - Arc::into_inner(guard.take().unwrap()); - // Don't assert that all recyclable units have been returned here since the - // assert in OwningHandle's drop is more likely to be actionable. + guard.as_mut().unwrap().clear(); }); } } @@ -2786,6 +2883,9 @@ impl Drop for AudioUnitContext { devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none() }); + self.shared_voice_processing_unit = + SharedVoiceProcessingUnitManager::new(self.serial_queue.clone()); + { let controller = self.latency_controller.lock().unwrap(); // Disabling this assert for bug 1083664 -- we seem to leak a stream diff --git a/src/backend/tests/api.rs b/src/backend/tests/api.rs index 7edec2e..d6a4568 100644 --- a/src/backend/tests/api.rs +++ b/src/backend/tests/api.rs @@ -1808,3 +1808,74 @@ fn test_shared_voice_processing_multiple_units() { let r3 = queue.run_sync(|| shared.take()).unwrap(); assert!(r3.is_err()); } + +#[test] +fn test_shared_voice_processing_release_on_idle() { + let queue = Queue::new_with_target( + "test_shared_voice_processing_release_on_idle", + get_serial_queue_singleton(), + ); + let mut shared = SharedVoiceProcessingUnitManager::with_idle_timeout( + queue.clone(), + Duration::from_millis(0), + ); + let r = queue.run_sync(|| shared.take_or_create()).unwrap(); + assert!(r.is_ok()); + { + let _handle = r.unwrap(); + } + queue.run_sync(|| {}); + let r = queue.run_sync(|| shared.take()).unwrap(); + assert!(r.is_err()); +} + +#[test] +fn test_shared_voice_processing_no_release_on_outstanding() { + let queue = Queue::new_with_target( + "test_shared_voice_processing_no_release_on_outstanding", + get_serial_queue_singleton(), + ); + let mut shared = SharedVoiceProcessingUnitManager::with_idle_timeout( + queue.clone(), + Duration::from_millis(0), + ); + let r1 = queue.run_sync(|| shared.take_or_create()).unwrap(); + assert!(r1.is_ok()); + let r2 = queue.run_sync(|| shared.take_or_create()).unwrap(); + assert!(r2.is_ok()); + { + let _handle1 = r1.unwrap(); + } + queue.run_sync(|| {}); + let r1 = queue.run_sync(|| shared.take()).unwrap(); + assert!(r1.is_ok()); +} + +#[test] +fn test_shared_voice_processing_release_on_idle_cancel_on_take() { + let queue = Queue::new_with_target( + "test_shared_voice_processing_release_on_idle_cancel_on_take", + get_serial_queue_singleton(), + ); + let mut shared = SharedVoiceProcessingUnitManager::with_idle_timeout( + queue.clone(), + Duration::from_millis(0), + ); + let r1 = queue.run_sync(|| shared.take_or_create()).unwrap(); + assert!(r1.is_ok()); + let r2 = queue.run_sync(|| shared.take_or_create()).unwrap(); + assert!(r2.is_ok()); + let r1 = queue + .run_sync(|| { + { + let _handle1 = r1.unwrap(); + let _handle2 = r2.unwrap(); + } + shared.take() + }) + .unwrap(); + assert!(r1.is_ok()); + queue.run_sync(|| {}); + let r2 = queue.run_sync(|| shared.take()).unwrap(); + assert!(r2.is_ok()); +} diff --git a/src/backend/tests/interfaces.rs b/src/backend/tests/interfaces.rs index 17bbda6..afa5b37 100644 --- a/src/backend/tests/interfaces.rs +++ b/src/backend/tests/interfaces.rs @@ -1306,6 +1306,9 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() { let mut t5 = start; let mut t6 = start; let mut t7 = start; + let mut t8 = start; + let mut t9 = start; + let mut t10 = start; test_ops_context_operation("multiple duplex voice streams", |context_ptr| { // First stream uses vpio, creates the shared vpio unit. test_default_duplex_voice_stream_operation_on_context( @@ -1323,7 +1326,7 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() { let stm = unsafe { &mut *(stream as *mut AudioUnitStream) }; assert!(stm.core_stream_data.using_voice_processing_unit()); - // Three concurrent vpio streams are supported but only two use recycling. + // Three concurrent vpio streams are supported. test_default_duplex_voice_stream_operation_on_context( "multiple voice streams: stream 3, duplex", context_ptr, @@ -1339,7 +1342,7 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() { t1 = Instant::now(); // Fourth stream uses vpio, allows reuse of one already created. test_default_duplex_voice_stream_operation_on_context( - "multiple voice streams: stream 3, duplex", + "multiple voice streams: stream 4, duplex", context_ptr, |stream| { let stm = unsafe { &mut *(stream as *mut AudioUnitStream) }; @@ -1348,35 +1351,49 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() { // Fifth stream uses vpio, allows reuse of one already created. test_default_duplex_voice_stream_operation_on_context( - "multiple voice streams: stream 2, duplex", + "multiple voice streams: stream 5, duplex", context_ptr, |stream| { let stm = unsafe { &mut *(stream as *mut AudioUnitStream) }; assert!(stm.core_stream_data.using_voice_processing_unit()); t3 = Instant::now(); - // Sixth stream uses vpio, but is created anew. + // Sixth stream uses vpio, allows reuse of one already created. test_default_input_voice_stream_operation_on_context( - "multiple voice streams: stream 3, input-only", + "multiple voice streams: stream 6, input-only", context_ptr, |stream| { let stm = unsafe { &mut *(stream as *mut AudioUnitStream) }; assert!(stm.core_stream_data.using_voice_processing_unit()); t4 = Instant::now(); + + // Seventh stream uses vpio, but is created anew. + test_default_input_voice_stream_operation_on_context( + "multiple voice streams: stream 7, input-only", + context_ptr, + |stream| { + let stm = unsafe { &mut *(stream as *mut AudioUnitStream) }; + assert!(stm.core_stream_data.using_voice_processing_unit()); + t5 = Instant::now(); + }, + ); + t6 = Instant::now(); }, ); - t5 = Instant::now(); + t7 = Instant::now(); }, ); - t6 = Instant::now(); + t8 = Instant::now(); }, ); + t9 = Instant::now(); }); - t7 = Instant::now(); + t10 = Instant::now(); let reuse_vpio_1 = t2 - t1; let reuse_vpio_2 = t3 - t2; - let create_standalone_vpio = t4 - t3; + let reuse_vpio_3 = t4 - t3; + let create_standalone_vpio = t5 - t4; assert!( create_standalone_vpio > reuse_vpio_1 * 2, "Failed create_standalone_vpio={}s > reuse_vpio_1={}s * 2", @@ -1389,22 +1406,42 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() { create_standalone_vpio.as_secs_f32(), reuse_vpio_2.as_secs_f32() ); - - let recycle_vpio_1 = t5 - t4; - let recycle_vpio_2 = t6 - t5; - let dispose_standalone_vpio = t7 - t6; assert!( - dispose_standalone_vpio > recycle_vpio_1 * 2, - "Failed dispose_standalone_vpio ={}s > recycle_vpio_1 ={}s * 2", - dispose_standalone_vpio.as_secs_f32(), + create_standalone_vpio > reuse_vpio_3 * 2, + "Failed create_standalone_vpio={}s > reuse_vpio_3={}s * 2", + create_standalone_vpio.as_secs_f32(), + reuse_vpio_3.as_secs_f32() + ); + + let recycle_vpio_1 = t6 - t5; + let recycle_vpio_2 = t7 - t6; + let recycle_vpio_3 = t8 - t7; + let recycle_vpio_4 = t9 - t8; + let dispose_vpios = t10 - t9; + assert!( + dispose_vpios > recycle_vpio_1 * 2, + "Failed dispose_vpios={}s > recycle_vpio_1 ={}s * 2", + dispose_vpios.as_secs_f32(), recycle_vpio_1.as_secs_f32() ); assert!( - dispose_standalone_vpio > recycle_vpio_2 * 2, - "Failed dispose_standalone_vpio ={}s > recycle_vpio_2 ={}s * 2", - dispose_standalone_vpio.as_secs_f32(), + dispose_vpios > recycle_vpio_2 * 2, + "Failed dispose_vpios={}s > recycle_vpio_2 ={}s * 2", + dispose_vpios.as_secs_f32(), recycle_vpio_2.as_secs_f32() ); + assert!( + dispose_vpios > recycle_vpio_3 * 2, + "Failed dispose_vpios={}s > recycle_vpio_3 ={}s * 2", + dispose_vpios.as_secs_f32(), + recycle_vpio_3.as_secs_f32() + ); + assert!( + dispose_vpios > recycle_vpio_4 * 2, + "Failed dispose_vpios={}s > recycle_vpio_4 ={}s * 2", + dispose_vpios.as_secs_f32(), + recycle_vpio_4.as_secs_f32() + ); } #[test]