Dispose of shared vpio units once they've not been used for 10 seconds

This commit is contained in:
Andreas Pehrson 2024-04-05 16:08:48 +02:00 коммит произвёл Andreas Pehrson
Родитель a31e0ddbf2
Коммит f3464c6ac2
3 изменённых файлов: 257 добавлений и 49 удалений

Просмотреть файл

@ -64,6 +64,8 @@ const APPLE_STUDIO_DISPLAY_USB_ID: &str = "05AC:1114";
const SAFE_MIN_LATENCY_FRAMES: u32 = 128;
const SAFE_MAX_LATENCY_FRAMES: u32 = 512;
const VPIO_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
bitflags! {
#[allow(non_camel_case_types)]
#[derive(Clone, Debug, PartialEq, Copy)]
@ -2155,18 +2157,29 @@ impl LatencyController {
struct SharedStorageInternal<T> {
// Storage for shared elements.
elements: Vec<T>,
// Number of elements in use, i.e. all elements created/taken and not recycled.
outstanding_element_count: usize,
// Used for invalidation of in-flight tasks to clear elements.
// Incremented when something takes a shared element.
generation: usize,
}
#[derive(Debug)]
struct SharedStorage<T> {
queue: Queue,
idle_timeout: Duration,
storage: Mutex<SharedStorageInternal<T>>,
}
impl<T> SharedStorage<T> {
fn new() -> Self {
impl<T: Send> SharedStorage<T> {
fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
Self {
queue,
idle_timeout,
storage: Mutex::new(SharedStorageInternal::<T> {
elements: Vec::default(),
outstanding_element_count: 0,
generation: 0,
}),
}
}
@ -2174,12 +2187,40 @@ impl<T> SharedStorage<T> {
fn take_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) -> Result<T> {
if let Some(e) = guard.elements.pop() {
cubeb_log!("Taking shared element #{}.", guard.elements.len());
guard.outstanding_element_count += 1;
guard.generation += 1;
return Ok(e);
}
Err(Error::not_supported())
}
fn create_with_locked<F>(
guard: &mut MutexGuard<'_, SharedStorageInternal<T>>,
f: F,
) -> Result<T>
where
F: FnOnce() -> Result<T>,
{
let start = Instant::now();
match f() {
Ok(obj) => {
cubeb_log!(
"Just created shared element #{}. Took {}s.",
guard.outstanding_element_count,
(Instant::now() - start).as_secs_f32()
);
guard.outstanding_element_count += 1;
guard.generation += 1;
Ok(obj)
}
Err(_) => {
cubeb_log!("Creating shared element failed");
Err(Error::error())
}
}
}
#[cfg(test)]
fn take(&self) -> Result<T> {
let mut guard = self.storage.lock().unwrap();
@ -2191,38 +2232,87 @@ impl<T> SharedStorage<T> {
F: FnOnce() -> Result<T>,
{
let mut guard = self.storage.lock().unwrap();
SharedStorage::take_locked(&mut guard).or_else(|_| {
let start = Instant::now();
match f() {
Ok(obj) => {
cubeb_log!(
"Just created shared element. Took {}s.",
(Instant::now() - start).as_secs_f32()
);
Ok(obj)
}
Err(_) => {
cubeb_log!("Creating shared element failed");
Err(Error::error())
}
}
})
SharedStorage::take_locked(&mut guard)
.or_else(|_| SharedStorage::create_with_locked(&mut guard, f))
}
fn recycle(&self, obj: T) {
let mut guard = self.storage.lock().unwrap();
cubeb_log!("Recycling shared element #{}.", guard.elements.len());
guard.outstanding_element_count -= 1;
cubeb_log!(
"Recycling shared element #{}. Nr of live elements now {}.",
guard.elements.len(),
guard.outstanding_element_count
);
guard.elements.push(obj);
}
fn clear_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) {
let count = guard.elements.len();
let start = Instant::now();
guard.elements.clear();
cubeb_log!(
"Cleared {} shared element{}. Took {}s.",
count,
if count == 1 { "" } else { "s" },
(Instant::now() - start).as_secs_f32()
);
}
fn clear(&self) {
debug_assert_running_serially();
let mut guard = self.storage.lock().unwrap();
SharedStorage::clear_locked(&mut guard);
}
fn clear_if_all_idle_async(storage: &Arc<SharedStorage<T>>) {
let (queue, outstanding_element_count, generation) = {
let guard = storage.storage.lock().unwrap();
(
storage.queue.clone(),
guard.outstanding_element_count,
guard.generation,
)
};
if outstanding_element_count > 0 {
cubeb_log!(
"Not clearing shared voiceprocessing unit storage because {} elements are in use. Generation={}.",
outstanding_element_count,
generation
);
return;
}
cubeb_log!(
"Clearing shared voiceprocessing unit storage in {}s if still at generation {}.",
storage.idle_timeout.as_secs_f32(),
generation
);
let storage = storage.clone();
queue.run_after(Instant::now() + storage.idle_timeout, move || {
let mut guard = storage.storage.lock().unwrap();
if generation != guard.generation {
cubeb_log!(
"Not clearing shared voiceprocessing unit storage for generation {} as we're now at {}.",
generation,
guard.generation
);
return;
}
SharedStorage::clear_locked(&mut guard);
});
}
}
#[derive(Debug)]
struct OwningHandle<T> {
struct OwningHandle<T>
where
T: Send,
{
storage: Weak<SharedStorage<T>>,
obj: Option<T>,
}
impl<T> OwningHandle<T> {
impl<T: Send> OwningHandle<T> {
fn new(storage: Weak<SharedStorage<T>>, obj: T) -> Self {
Self {
storage,
@ -2231,19 +2321,19 @@ impl<T> OwningHandle<T> {
}
}
impl<T> AsRef<T> for OwningHandle<T> {
impl<T: Send> AsRef<T> for OwningHandle<T> {
fn as_ref(&self) -> &T {
self.obj.as_ref().unwrap()
}
}
impl<T> AsMut<T> for OwningHandle<T> {
impl<T: Send> AsMut<T> for OwningHandle<T> {
fn as_mut(&mut self) -> &mut T {
self.obj.as_mut().unwrap()
}
}
impl<T> Drop for OwningHandle<T> {
impl<T: Send> Drop for OwningHandle<T> {
fn drop(&mut self) {
let storage = self.storage.upgrade();
assert!(
@ -2256,6 +2346,7 @@ impl<T> Drop for OwningHandle<T> {
}
let obj = self.obj.take().unwrap();
storage.recycle(obj);
SharedStorage::clear_if_all_idle_async(&storage);
}
}
@ -2277,16 +2368,22 @@ unsafe impl Send for VoiceProcessingUnit {}
struct SharedVoiceProcessingUnitManager {
sync_storage: Mutex<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
queue: Queue,
idle_timeout: Duration,
}
impl SharedVoiceProcessingUnitManager {
fn new(queue: Queue) -> Self {
fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
Self {
sync_storage: Mutex::new(None),
queue,
idle_timeout,
}
}
fn new(queue: Queue) -> Self {
SharedVoiceProcessingUnitManager::with_idle_timeout(queue, VPIO_IDLE_TIMEOUT)
}
fn ensure_storage_locked(
&self,
guard: &mut MutexGuard<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
@ -2295,7 +2392,10 @@ impl SharedVoiceProcessingUnitManager {
return;
}
cubeb_log!("Creating shared voiceprocessing storage.");
let storage = SharedStorage::<VoiceProcessingUnit>::new();
let storage = SharedStorage::<VoiceProcessingUnit>::with_idle_timeout(
self.queue.clone(),
self.idle_timeout,
);
let old_storage = guard.replace(Arc::from(storage));
assert!(old_storage.is_none());
}
@ -2333,10 +2433,7 @@ impl Drop for SharedVoiceProcessingUnitManager {
if guard.is_none() {
return;
}
let _last_ref_storage: Option<SharedStorage<VoiceProcessingUnit>> =
Arc::into_inner(guard.take().unwrap());
// Don't assert that all recyclable units have been returned here since the
// assert in OwningHandle's drop is more likely to be actionable.
guard.as_mut().unwrap().clear();
});
}
}
@ -2786,6 +2883,9 @@ impl Drop for AudioUnitContext {
devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none()
});
self.shared_voice_processing_unit =
SharedVoiceProcessingUnitManager::new(self.serial_queue.clone());
{
let controller = self.latency_controller.lock().unwrap();
// Disabling this assert for bug 1083664 -- we seem to leak a stream

Просмотреть файл

@ -1808,3 +1808,74 @@ fn test_shared_voice_processing_multiple_units() {
let r3 = queue.run_sync(|| shared.take()).unwrap();
assert!(r3.is_err());
}
#[test]
fn test_shared_voice_processing_release_on_idle() {
let queue = Queue::new_with_target(
"test_shared_voice_processing_release_on_idle",
get_serial_queue_singleton(),
);
let mut shared = SharedVoiceProcessingUnitManager::with_idle_timeout(
queue.clone(),
Duration::from_millis(0),
);
let r = queue.run_sync(|| shared.take_or_create()).unwrap();
assert!(r.is_ok());
{
let _handle = r.unwrap();
}
queue.run_sync(|| {});
let r = queue.run_sync(|| shared.take()).unwrap();
assert!(r.is_err());
}
#[test]
fn test_shared_voice_processing_no_release_on_outstanding() {
let queue = Queue::new_with_target(
"test_shared_voice_processing_no_release_on_outstanding",
get_serial_queue_singleton(),
);
let mut shared = SharedVoiceProcessingUnitManager::with_idle_timeout(
queue.clone(),
Duration::from_millis(0),
);
let r1 = queue.run_sync(|| shared.take_or_create()).unwrap();
assert!(r1.is_ok());
let r2 = queue.run_sync(|| shared.take_or_create()).unwrap();
assert!(r2.is_ok());
{
let _handle1 = r1.unwrap();
}
queue.run_sync(|| {});
let r1 = queue.run_sync(|| shared.take()).unwrap();
assert!(r1.is_ok());
}
#[test]
fn test_shared_voice_processing_release_on_idle_cancel_on_take() {
let queue = Queue::new_with_target(
"test_shared_voice_processing_release_on_idle_cancel_on_take",
get_serial_queue_singleton(),
);
let mut shared = SharedVoiceProcessingUnitManager::with_idle_timeout(
queue.clone(),
Duration::from_millis(0),
);
let r1 = queue.run_sync(|| shared.take_or_create()).unwrap();
assert!(r1.is_ok());
let r2 = queue.run_sync(|| shared.take_or_create()).unwrap();
assert!(r2.is_ok());
let r1 = queue
.run_sync(|| {
{
let _handle1 = r1.unwrap();
let _handle2 = r2.unwrap();
}
shared.take()
})
.unwrap();
assert!(r1.is_ok());
queue.run_sync(|| {});
let r2 = queue.run_sync(|| shared.take()).unwrap();
assert!(r2.is_ok());
}

Просмотреть файл

@ -1306,6 +1306,9 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() {
let mut t5 = start;
let mut t6 = start;
let mut t7 = start;
let mut t8 = start;
let mut t9 = start;
let mut t10 = start;
test_ops_context_operation("multiple duplex voice streams", |context_ptr| {
// First stream uses vpio, creates the shared vpio unit.
test_default_duplex_voice_stream_operation_on_context(
@ -1323,7 +1326,7 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() {
let stm = unsafe { &mut *(stream as *mut AudioUnitStream) };
assert!(stm.core_stream_data.using_voice_processing_unit());
// Three concurrent vpio streams are supported but only two use recycling.
// Three concurrent vpio streams are supported.
test_default_duplex_voice_stream_operation_on_context(
"multiple voice streams: stream 3, duplex",
context_ptr,
@ -1339,7 +1342,7 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() {
t1 = Instant::now();
// Fourth stream uses vpio, allows reuse of one already created.
test_default_duplex_voice_stream_operation_on_context(
"multiple voice streams: stream 3, duplex",
"multiple voice streams: stream 4, duplex",
context_ptr,
|stream| {
let stm = unsafe { &mut *(stream as *mut AudioUnitStream) };
@ -1348,35 +1351,49 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() {
// Fifth stream uses vpio, allows reuse of one already created.
test_default_duplex_voice_stream_operation_on_context(
"multiple voice streams: stream 2, duplex",
"multiple voice streams: stream 5, duplex",
context_ptr,
|stream| {
let stm = unsafe { &mut *(stream as *mut AudioUnitStream) };
assert!(stm.core_stream_data.using_voice_processing_unit());
t3 = Instant::now();
// Sixth stream uses vpio, but is created anew.
// Sixth stream uses vpio, allows reuse of one already created.
test_default_input_voice_stream_operation_on_context(
"multiple voice streams: stream 3, input-only",
"multiple voice streams: stream 6, input-only",
context_ptr,
|stream| {
let stm = unsafe { &mut *(stream as *mut AudioUnitStream) };
assert!(stm.core_stream_data.using_voice_processing_unit());
t4 = Instant::now();
// Seventh stream uses vpio, but is created anew.
test_default_input_voice_stream_operation_on_context(
"multiple voice streams: stream 7, input-only",
context_ptr,
|stream| {
let stm = unsafe { &mut *(stream as *mut AudioUnitStream) };
assert!(stm.core_stream_data.using_voice_processing_unit());
t5 = Instant::now();
},
);
t6 = Instant::now();
},
);
t5 = Instant::now();
t7 = Instant::now();
},
);
t6 = Instant::now();
t8 = Instant::now();
},
);
t9 = Instant::now();
});
t7 = Instant::now();
t10 = Instant::now();
let reuse_vpio_1 = t2 - t1;
let reuse_vpio_2 = t3 - t2;
let create_standalone_vpio = t4 - t3;
let reuse_vpio_3 = t4 - t3;
let create_standalone_vpio = t5 - t4;
assert!(
create_standalone_vpio > reuse_vpio_1 * 2,
"Failed create_standalone_vpio={}s > reuse_vpio_1={}s * 2",
@ -1389,22 +1406,42 @@ fn test_ops_timing_sensitive_multiple_voice_stream_init_and_destroy() {
create_standalone_vpio.as_secs_f32(),
reuse_vpio_2.as_secs_f32()
);
let recycle_vpio_1 = t5 - t4;
let recycle_vpio_2 = t6 - t5;
let dispose_standalone_vpio = t7 - t6;
assert!(
dispose_standalone_vpio > recycle_vpio_1 * 2,
"Failed dispose_standalone_vpio ={}s > recycle_vpio_1 ={}s * 2",
dispose_standalone_vpio.as_secs_f32(),
create_standalone_vpio > reuse_vpio_3 * 2,
"Failed create_standalone_vpio={}s > reuse_vpio_3={}s * 2",
create_standalone_vpio.as_secs_f32(),
reuse_vpio_3.as_secs_f32()
);
let recycle_vpio_1 = t6 - t5;
let recycle_vpio_2 = t7 - t6;
let recycle_vpio_3 = t8 - t7;
let recycle_vpio_4 = t9 - t8;
let dispose_vpios = t10 - t9;
assert!(
dispose_vpios > recycle_vpio_1 * 2,
"Failed dispose_vpios={}s > recycle_vpio_1 ={}s * 2",
dispose_vpios.as_secs_f32(),
recycle_vpio_1.as_secs_f32()
);
assert!(
dispose_standalone_vpio > recycle_vpio_2 * 2,
"Failed dispose_standalone_vpio ={}s > recycle_vpio_2 ={}s * 2",
dispose_standalone_vpio.as_secs_f32(),
dispose_vpios > recycle_vpio_2 * 2,
"Failed dispose_vpios={}s > recycle_vpio_2 ={}s * 2",
dispose_vpios.as_secs_f32(),
recycle_vpio_2.as_secs_f32()
);
assert!(
dispose_vpios > recycle_vpio_3 * 2,
"Failed dispose_vpios={}s > recycle_vpio_3 ={}s * 2",
dispose_vpios.as_secs_f32(),
recycle_vpio_3.as_secs_f32()
);
assert!(
dispose_vpios > recycle_vpio_4 * 2,
"Failed dispose_vpios={}s > recycle_vpio_4 ={}s * 2",
dispose_vpios.as_secs_f32(),
recycle_vpio_4.as_secs_f32()
);
}
#[test]