Make AudioUnitStream::stopped handling race-safe

This helps avoid some extraneous state callbacks in some cases.
This commit is contained in:
Andreas Pehrson 2024-05-22 11:50:13 +02:00 коммит произвёл Andreas Pehrson
Родитель c6ce6c20d5
Коммит 0989726a1b
1 изменённых файлов: 50 добавлений и 49 удалений

Просмотреть файл

@ -628,14 +628,14 @@ extern "C" fn audiounit_input_callback(
0,
);
if outframes < 0 {
stm.stopped.store(true, Ordering::SeqCst);
stm.notify_state_changed(State::Error);
let queue = stm.queue.clone();
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
queue.run_async(move || {
stm.core_stream_data.stop_audiounits();
});
if !stm.stopped.swap(true, Ordering::SeqCst) {
stm.notify_state_changed(State::Error);
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
});
}
return ErrorHandle::Return(status);
}
if outframes < total_input_frames {
@ -654,15 +654,16 @@ extern "C" fn audiounit_input_callback(
// If the input (input-only stream) is drained, cancel this callback. Whenever an output
// is involved, the output callback handles stopping all units and notifying of state.
if stm.core_stream_data.output_unit.is_null() && stm.draining.load(Ordering::SeqCst) {
stm.stopped.store(true, Ordering::SeqCst);
if stm.core_stream_data.output_unit.is_null()
&& stm.draining.load(Ordering::SeqCst)
&& !stm.stopped.swap(true, Ordering::SeqCst)
{
cubeb_alog!("({:p}) Input-only drained.", stm as *const AudioUnitStream);
stm.notify_state_changed(State::Drained);
let queue = stm.queue.clone();
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
let stm_ptr = user_ptr as usize;
queue.run_async(move || {
stm.queue.clone().run_async(move || {
let stm = unsafe { &mut *(stm_ptr as *mut AudioUnitStream) };
stm.core_stream_data.stop_audiounits();
});
@ -749,10 +750,6 @@ extern "C" fn audiounit_output_callback(
if stm.draining.load(Ordering::SeqCst) {
// Cancel all callbacks. For input-only streams, the input callback handles
// cancelling itself.
stm.stopped.store(true, Ordering::SeqCst);
cubeb_alog!("({:p}) output drained.", stm as *const AudioUnitStream);
stm.notify_state_changed(State::Drained);
let queue = stm.queue.clone();
audiounit_make_silent(&buffers[0]);
#[cfg(feature = "audio-dump")]
{
@ -762,11 +759,15 @@ extern "C" fn audiounit_output_callback(
output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
);
}
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
queue.run_async(move || {
stm.core_stream_data.stop_audiounits();
});
if !stm.stopped.swap(true, Ordering::SeqCst) {
cubeb_alog!("({:p}) output drained.", stm as *const AudioUnitStream);
stm.notify_state_changed(State::Drained);
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
});
}
return NO_ERR;
}
@ -886,9 +887,6 @@ extern "C" fn audiounit_output_callback(
);
if outframes < 0 || outframes > i64::from(output_frames) {
stm.stopped.store(true, Ordering::SeqCst);
stm.notify_state_changed(State::Error);
let queue = stm.queue.clone();
audiounit_make_silent(&buffers[0]);
#[cfg(feature = "audio-dump")]
@ -899,11 +897,14 @@ extern "C" fn audiounit_output_callback(
output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
);
}
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
queue.run_async(move || {
stm.core_stream_data.stop_audiounits();
});
if !stm.stopped.swap(true, Ordering::SeqCst) {
stm.notify_state_changed(State::Error);
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
});
}
return NO_ERR;
}
@ -1001,15 +1002,16 @@ extern "C" fn audiounit_property_listener_callback(
// Handle the events
if explicit_device_dead {
cubeb_log!("The user-selected input or output device is dead, entering error state");
stm.stopped.store(true, Ordering::SeqCst);
if !stm.stopped.swap(true, Ordering::SeqCst) {
cubeb_log!("The user-selected input or output device is dead, entering error state");
// Use a different thread, through the queue, to avoid deadlock when calling
// Get/SetProperties method from inside notify callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
stm.close_on_error();
});
// Use a different thread, through the queue, to avoid deadlock when calling
// Get/SetProperties method from inside notify callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
stm.close_on_error();
});
}
return NO_ERR;
}
{
@ -4863,9 +4865,8 @@ impl<'ctx> AudioUnitStream<'ctx> {
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
// CoreAudio framework that is used by the data callback.
if !self.stopped.load(Ordering::SeqCst) {
if !self.stopped.swap(true, Ordering::SeqCst) {
self.core_stream_data.stop_audiounits();
self.stopped.store(true, Ordering::SeqCst);
}
self.destroy_internal();
@ -4909,18 +4910,18 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
Ok(())
}
fn stop(&mut self) -> Result<()> {
self.stopped.store(true, Ordering::SeqCst);
if !self.stopped.swap(true, Ordering::SeqCst) {
// Execute stop in serial queue to avoid racing with destroy or reinit.
self.queue
.run_sync(|| self.core_stream_data.stop_audiounits());
// Execute stop in serial queue to avoid racing with destroy or reinit.
self.queue
.run_sync(|| self.core_stream_data.stop_audiounits());
self.notify_state_changed(State::Stopped);
self.notify_state_changed(State::Stopped);
cubeb_log!(
"Cubeb stream ({:p}) stopped successfully.",
self as *const AudioUnitStream
);
cubeb_log!(
"Cubeb stream ({:p}) stopped successfully.",
self as *const AudioUnitStream
);
}
Ok(())
}
fn position(&mut self) -> Result<u64> {