Bug 1530715 - P42: Remove unnecessary mutex. r=padenot

There are three potential data-race operations that may run at the same
time:
1. Data callback and stream reinitialization
2. Data callback and stream destroying
3. Stream reinitialization and stream destroying

The case 1 and 2 won't happen as long as the AudioOutputUnitStop is
called at the beginning of stream reinitialization and stream
destorying. The AudioOutputUnitStop requires to lock a mutex inside
CoreAudio framework that is also used by the data callback. Thus, if
there is a running callback, which holds the mutex inside CoreAudio
framework, when AudioOutputUnitStop is called, then the calling will
block the current thread until the data callback ends since it is
waiting for the mutex. By calling AudioOutputUnitStop at the beginning
of the stream reinitialization and stream destroying, the data race of
case 1 and 2 can be avoided.

On the other hand, the case 3 won't happen since the stream
initialization and destroying is run on the same task queue. The two
tasks on the same serial task queue are impossible to be run at the same
time. The mutex in AudioUnitStream is unnecessary because it's used for
the case 3.

Differential Revision: https://phabricator.services.mozilla.com/D34076

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Chun-Min Chang 2019-07-10 08:06:56 +00:00
Родитель c4f1202ebd
Коммит ec036851de
2 изменённых файлов: 254 добавлений и 293 удалений

Просмотреть файл

@ -3,4 +3,4 @@ git repository using the update.sh script.
The cubeb-coreaudio-rs git repository is: https://github.com/ChunMinChang/cubeb-coreaudio-rs The cubeb-coreaudio-rs git repository is: https://github.com/ChunMinChang/cubeb-coreaudio-rs
The git commit ID used was 6d49464259fa527203d7494e323be54845403b47 (2019-06-25 11:32:23 -0700) The git commit ID used was c5b109e0c51bcd24fddd069bcf994aa0005a6497 (2019-06-25 11:37:14 -0700)

Просмотреть файл

@ -413,140 +413,151 @@ extern "C" fn audiounit_input_callback(
return NO_ERR; return NO_ERR;
} }
let handler = let handler = |stm: &mut AudioUnitStream,
|stm: &mut AudioUnitStream, flags: *mut AudioUnitRenderActionFlags,
flags: *mut AudioUnitRenderActionFlags, tstamp: *const AudioTimeStamp,
tstamp: *const AudioTimeStamp, bus: u32,
bus: u32, input_frames: u32|
input_frames: u32| -> (ErrorHandle, Option<State>) {
-> (ErrorHandle, Option<State>) { assert_eq!(
let mut core_stream_data = stm.core_stream_data.lock().unwrap(); stm.core_stream_data.stm_ptr,
assert_eq!(core_stream_data.stm_ptr, user_ptr as *const AudioUnitStream); user_ptr as *const AudioUnitStream
);
// Create the AudioBufferList to store input. // Create the AudioBufferList to store input.
let mut input_buffer_list = AudioBufferList::default(); let mut input_buffer_list = AudioBufferList::default();
input_buffer_list.mBuffers[0].mDataByteSize = input_buffer_list.mBuffers[0].mDataByteSize =
core_stream_data.input_desc.mBytesPerFrame * input_frames; stm.core_stream_data.input_desc.mBytesPerFrame * input_frames;
input_buffer_list.mBuffers[0].mData = ptr::null_mut(); input_buffer_list.mBuffers[0].mData = ptr::null_mut();
input_buffer_list.mBuffers[0].mNumberChannels = input_buffer_list.mBuffers[0].mNumberChannels =
core_stream_data.input_desc.mChannelsPerFrame; stm.core_stream_data.input_desc.mChannelsPerFrame;
input_buffer_list.mNumberBuffers = 1; input_buffer_list.mNumberBuffers = 1;
assert!(!core_stream_data.input_unit.is_null());
let status = audio_unit_render(
core_stream_data.input_unit,
flags,
tstamp,
bus,
input_frames,
&mut input_buffer_list,
);
if (status != NO_ERR)
&& (status != kAudioUnitErr_CannotDoInCurrentContext
|| core_stream_data.output_unit.is_null())
{
return (ErrorHandle::Return(status), None);
}
let handle = if status == kAudioUnitErr_CannotDoInCurrentContext {
assert!(!core_stream_data.output_unit.is_null());
// kAudioUnitErr_CannotDoInCurrentContext is returned when using a BT
// headset and the profile is changed from A2DP to HFP/HSP. The previous
// output device is no longer valid and must be reset.
// For now state that no error occurred and feed silence, stream will be
// resumed once reinit has completed.
cubeb_logv!(
"({:p}) input: reinit pending feeding silence instead",
core_stream_data.stm_ptr
);
let elements =
(input_frames * core_stream_data.input_desc.mChannelsPerFrame) as usize;
core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push_zeros(elements);
ErrorHandle::Reinit
} else {
assert_eq!(status, NO_ERR);
// Copy input data in linear buffer.
let elements =
(input_frames * core_stream_data.input_desc.mChannelsPerFrame) as usize;
core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push(input_buffer_list.mBuffers[0].mData, elements);
ErrorHandle::Return(status)
};
// Advance input frame counter.
stm.frames_read
.fetch_add(i64::from(input_frames), atomic::Ordering::SeqCst);
assert!(!stm.core_stream_data.input_unit.is_null());
let status = audio_unit_render(
stm.core_stream_data.input_unit,
flags,
tstamp,
bus,
input_frames,
&mut input_buffer_list,
);
if (status != NO_ERR)
&& (status != kAudioUnitErr_CannotDoInCurrentContext
|| stm.core_stream_data.output_unit.is_null())
{
return (ErrorHandle::Return(status), None);
}
let handle = if status == kAudioUnitErr_CannotDoInCurrentContext {
assert!(!stm.core_stream_data.output_unit.is_null());
// kAudioUnitErr_CannotDoInCurrentContext is returned when using a BT
// headset and the profile is changed from A2DP to HFP/HSP. The previous
// output device is no longer valid and must be reset.
// For now state that no error occurred and feed silence, stream will be
// resumed once reinit has completed.
cubeb_logv!( cubeb_logv!(
"({:p}) input: reinit pending feeding silence instead",
stm.core_stream_data.stm_ptr
);
let elements =
(input_frames * stm.core_stream_data.input_desc.mChannelsPerFrame) as usize;
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push_zeros(elements);
ErrorHandle::Reinit
} else {
assert_eq!(status, NO_ERR);
// Copy input data in linear buffer.
let elements =
(input_frames * stm.core_stream_data.input_desc.mChannelsPerFrame) as usize;
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push(input_buffer_list.mBuffers[0].mData, elements);
ErrorHandle::Return(status)
};
// Advance input frame counter.
stm.frames_read
.fetch_add(i64::from(input_frames), atomic::Ordering::SeqCst);
cubeb_logv!(
"({:p}) input: buffers {}, size {}, channels {}, rendered frames {}, total frames {}.", "({:p}) input: buffers {}, size {}, channels {}, rendered frames {}, total frames {}.",
core_stream_data.stm_ptr, stm.core_stream_data.stm_ptr,
input_buffer_list.mNumberBuffers, input_buffer_list.mNumberBuffers,
input_buffer_list.mBuffers[0].mDataByteSize, input_buffer_list.mBuffers[0].mDataByteSize,
input_buffer_list.mBuffers[0].mNumberChannels, input_buffer_list.mBuffers[0].mNumberChannels,
input_frames, input_frames,
core_stream_data.input_linear_buffer.as_ref().unwrap().elements() stm.core_stream_data
/ core_stream_data.input_desc.mChannelsPerFrame as usize
);
// Full Duplex. We'll call data_callback in the AudioUnit output callback.
if !core_stream_data.output_unit.is_null() {
return (handle, None);
}
// Input only. Call the user callback through resampler.
// Resampler will deliver input buffer in the correct rate.
assert!(
input_frames as usize
<= core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.elements()
/ core_stream_data.input_desc.mChannelsPerFrame as usize
);
let mut total_input_frames = (core_stream_data
.input_linear_buffer .input_linear_buffer
.as_ref() .as_ref()
.unwrap() .unwrap()
.elements() .elements()
/ core_stream_data.input_desc.mChannelsPerFrame as usize) / stm.core_stream_data.input_desc.mChannelsPerFrame as usize
as i64; );
assert!(!core_stream_data
// Full Duplex. We'll call data_callback in the AudioUnit output callback.
if !stm.core_stream_data.output_unit.is_null() {
return (handle, None);
}
// Input only. Call the user callback through resampler.
// Resampler will deliver input buffer in the correct rate.
assert!(
input_frames as usize
<= stm
.core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.elements()
/ stm.core_stream_data.input_desc.mChannelsPerFrame as usize
);
let mut total_input_frames =
(stm.core_stream_data
.input_linear_buffer .input_linear_buffer
.as_ref() .as_ref()
.unwrap() .unwrap()
.as_ptr() .elements()
.is_null()); / stm.core_stream_data.input_desc.mChannelsPerFrame as usize) as i64;
let input_buffer = core_stream_data assert!(!stm
.input_linear_buffer .core_stream_data
.as_mut() .input_linear_buffer
.unwrap() .as_ref()
.as_mut_ptr(); .unwrap()
let outframes = core_stream_data.resampler.fill( .as_ptr()
input_buffer, .is_null());
&mut total_input_frames, let input_buffer = stm
ptr::null_mut(), .core_stream_data
0, .input_linear_buffer
.as_mut()
.unwrap()
.as_mut_ptr();
let outframes = stm.core_stream_data.resampler.fill(
input_buffer,
&mut total_input_frames,
ptr::null_mut(),
0,
);
if outframes < total_input_frames {
assert_eq!(
audio_output_unit_stop(stm.core_stream_data.input_unit),
NO_ERR
); );
if outframes < total_input_frames { return (handle, Some(State::Drained));
assert_eq!(audio_output_unit_stop(core_stream_data.input_unit), NO_ERR); }
return (handle, Some(State::Drained)); // Reset input buffer
} stm.core_stream_data
// Reset input buffer .input_linear_buffer
core_stream_data .as_mut()
.input_linear_buffer .unwrap()
.as_mut() .clear();
.unwrap()
.clear();
(handle, None) (handle, None)
}; };
let (handle, notification) = handler(stm, flags, tstamp, bus, input_frames); let (handle, notification) = handler(stm, flags, tstamp, bus, input_frames);
if let Some(state) = notification { if let Some(state) = notification {
@ -600,10 +611,7 @@ extern "C" fn audiounit_output_callback(
} }
if stm.draining.load(Ordering::SeqCst) { if stm.draining.load(Ordering::SeqCst) {
{ stm.core_stream_data.stop_audiounits();
let core_stream_data = stm.core_stream_data.lock().unwrap();
core_stream_data.stop_audiounits();
}
stm.notify_state_changed(State::Drained); stm.notify_state_changed(State::Drained);
audiounit_make_silent(&mut buffers[0]); audiounit_make_silent(&mut buffers[0]);
return NO_ERR; return NO_ERR;
@ -613,10 +621,8 @@ extern "C" fn audiounit_output_callback(
output_frames: u32, output_frames: u32,
buffers: &mut [AudioBuffer]| buffers: &mut [AudioBuffer]|
-> (OSStatus, Option<State>) { -> (OSStatus, Option<State>) {
let mut core_stream_data = stm.core_stream_data.lock().unwrap();
// Get output buffer // Get output buffer
let output_buffer = match core_stream_data.mixer.as_mut() { let output_buffer = match stm.core_stream_data.mixer.as_mut() {
None => buffers[0].mData, None => buffers[0].mData,
Some(mixer) => { Some(mixer) => {
// If remixing needs to occur, we can't directly work in our final // If remixing needs to occur, we can't directly work in our final
@ -630,17 +636,18 @@ extern "C" fn audiounit_output_callback(
.fetch_add(i64::from(output_frames), Ordering::SeqCst); .fetch_add(i64::from(output_frames), Ordering::SeqCst);
// Also get the input buffer if the stream is duplex // Also get the input buffer if the stream is duplex
let (input_buffer, mut input_frames) = if !core_stream_data.input_unit.is_null() { let (input_buffer, mut input_frames) = if !stm.core_stream_data.input_unit.is_null() {
assert!(core_stream_data.input_linear_buffer.is_some()); assert!(stm.core_stream_data.input_linear_buffer.is_some());
let input_frames = core_stream_data let input_frames = stm
.core_stream_data
.input_linear_buffer .input_linear_buffer
.as_ref() .as_ref()
.unwrap() .unwrap()
.elements() .elements()
/ core_stream_data.input_desc.mChannelsPerFrame as usize; / stm.core_stream_data.input_desc.mChannelsPerFrame as usize;
cubeb_logv!("Total input frames: {}", input_frames); cubeb_logv!("Total input frames: {}", input_frames);
assert_ne!(core_stream_data.input_desc.mChannelsPerFrame, 0); assert_ne!(stm.core_stream_data.input_desc.mChannelsPerFrame, 0);
// If the output callback came first and this is a duplex stream, we need to // If the output callback came first and this is a duplex stream, we need to
// fill in some additional silence in the resampler. // fill in some additional silence in the resampler.
// Otherwise, if we had more than expected callbacks in a row, or we're // Otherwise, if we had more than expected callbacks in a row, or we're
@ -648,16 +655,16 @@ extern "C" fn audiounit_output_callback(
// fact that we're lacking some input data. // fact that we're lacking some input data.
let frames_written = stm.frames_written.load(Ordering::SeqCst); let frames_written = stm.frames_written.load(Ordering::SeqCst);
let input_frames_needed = minimum_resampling_input_frames( let input_frames_needed = minimum_resampling_input_frames(
core_stream_data.input_hw_rate, stm.core_stream_data.input_hw_rate,
f64::from(core_stream_data.output_stream_params.rate()), f64::from(stm.core_stream_data.output_stream_params.rate()),
frames_written, frames_written,
); );
let missing_frames = input_frames_needed - stm.frames_read.load(Ordering::SeqCst); let missing_frames = input_frames_needed - stm.frames_read.load(Ordering::SeqCst);
let elements = (missing_frames let elements = (missing_frames
* i64::from(core_stream_data.input_desc.mChannelsPerFrame)) * i64::from(stm.core_stream_data.input_desc.mChannelsPerFrame))
as usize; as usize;
if missing_frames > 0 { if missing_frames > 0 {
core_stream_data stm.core_stream_data
.input_linear_buffer .input_linear_buffer
.as_mut() .as_mut()
.unwrap() .unwrap()
@ -665,7 +672,7 @@ extern "C" fn audiounit_output_callback(
stm.frames_read.store(input_frames_needed, Ordering::SeqCst); stm.frames_read.store(input_frames_needed, Ordering::SeqCst);
cubeb_log!( cubeb_log!(
"({:p}) {} pushed {} frames of input silence.", "({:p}) {} pushed {} frames of input silence.",
core_stream_data.stm_ptr, stm.core_stream_data.stm_ptr,
if stm.frames_read.load(Ordering::SeqCst) == 0 { if stm.frames_read.load(Ordering::SeqCst) == 0 {
"Input hasn't started," "Input hasn't started,"
} else if stm.switching_device.load(Ordering::SeqCst) { } else if stm.switching_device.load(Ordering::SeqCst) {
@ -677,7 +684,7 @@ extern "C" fn audiounit_output_callback(
); );
} }
( (
core_stream_data stm.core_stream_data
.input_linear_buffer .input_linear_buffer
.as_mut() .as_mut()
.unwrap() .unwrap()
@ -690,7 +697,7 @@ extern "C" fn audiounit_output_callback(
// Call user callback through resampler. // Call user callback through resampler.
assert!(!output_buffer.is_null()); assert!(!output_buffer.is_null());
let outframes = core_stream_data.resampler.fill( let outframes = stm.core_stream_data.resampler.fill(
input_buffer, input_buffer,
if input_buffer.is_null() { if input_buffer.is_null() {
ptr::null_mut() ptr::null_mut()
@ -703,8 +710,8 @@ extern "C" fn audiounit_output_callback(
if !input_buffer.is_null() { if !input_buffer.is_null() {
// Pop from the buffer the frames used by the the resampler. // Pop from the buffer the frames used by the the resampler.
let elements = let elements =
input_frames as usize * core_stream_data.input_desc.mChannelsPerFrame as usize; input_frames as usize * stm.core_stream_data.input_desc.mChannelsPerFrame as usize;
core_stream_data stm.core_stream_data
.input_linear_buffer .input_linear_buffer
.as_mut() .as_mut()
.unwrap() .unwrap()
@ -713,7 +720,7 @@ extern "C" fn audiounit_output_callback(
if outframes < 0 || outframes > i64::from(output_frames) { if outframes < 0 || outframes > i64::from(output_frames) {
*stm.shutdown.get_mut() = true; *stm.shutdown.get_mut() = true;
core_stream_data.stop_audiounits(); stm.core_stream_data.stop_audiounits();
audiounit_make_silent(&mut buffers[0]); audiounit_make_silent(&mut buffers[0]);
return (NO_ERR, Some(State::Error)); return (NO_ERR, Some(State::Error));
} }
@ -723,8 +730,8 @@ extern "C" fn audiounit_output_callback(
.store(stm.frames_queued, atomic::Ordering::SeqCst); .store(stm.frames_queued, atomic::Ordering::SeqCst);
stm.frames_queued += outframes as u64; stm.frames_queued += outframes as u64;
let outaff = core_stream_data.output_desc.mFormatFlags; let outaff = stm.core_stream_data.output_desc.mFormatFlags;
let panning = if core_stream_data.output_desc.mChannelsPerFrame == 2 { let panning = if stm.core_stream_data.output_desc.mChannelsPerFrame == 2 {
stm.panning.load(Ordering::Relaxed) stm.panning.load(Ordering::Relaxed)
} else { } else {
0.0 0.0
@ -734,7 +741,8 @@ extern "C" fn audiounit_output_callback(
if stm.draining.load(Ordering::SeqCst) { if stm.draining.load(Ordering::SeqCst) {
// Clear missing frames (silence) // Clear missing frames (silence)
let count_bytes = |frames: usize| -> usize { let count_bytes = |frames: usize| -> usize {
let sample_size = cubeb_sample_size(core_stream_data.output_stream_params.format()); let sample_size =
cubeb_sample_size(stm.core_stream_data.output_stream_params.format());
frames * sample_size / mem::size_of::<u8>() frames * sample_size / mem::size_of::<u8>()
}; };
let out_bytes = unsafe { let out_bytes = unsafe {
@ -750,7 +758,7 @@ extern "C" fn audiounit_output_callback(
} }
// Mixing // Mixing
if core_stream_data.mixer.is_none() { if stm.core_stream_data.mixer.is_none() {
// Pan stereo. // Pan stereo.
if panning != 0.0 { if panning != 0.0 {
unsafe { unsafe {
@ -772,9 +780,9 @@ extern "C" fn audiounit_output_callback(
} else { } else {
assert!( assert!(
buffers[0].mDataByteSize buffers[0].mDataByteSize
>= core_stream_data.output_desc.mBytesPerFrame * output_frames >= stm.core_stream_data.output_desc.mBytesPerFrame * output_frames
); );
core_stream_data.mixer.as_mut().unwrap().mix( stm.core_stream_data.mixer.as_mut().unwrap().mix(
output_frames as usize, output_frames as usize,
buffers[0].mData, buffers[0].mData,
buffers[0].mDataByteSize as usize, buffers[0].mDataByteSize as usize,
@ -841,8 +849,8 @@ extern "C" fn audiounit_property_listener_callback(
); );
// If this is the default input device ignore the event, // If this is the default input device ignore the event,
// kAudioHardwarePropertyDefaultInputDevice will take care of the switch // kAudioHardwarePropertyDefaultInputDevice will take care of the switch
let core_stream_data = stm.core_stream_data.lock().unwrap(); if stm
if core_stream_data .core_stream_data
.input_device .input_device
.flags .flags
.contains(device_flags::DEV_SYSTEM_DEFAULT) .contains(device_flags::DEV_SYSTEM_DEFAULT)
@ -2464,16 +2472,10 @@ impl ContextOps for AudioUnitContext {
global_latency_frames, global_latency_frames,
)); ));
boxed_stream.core_stream_data = Mutex::new(CoreStreamData::new( boxed_stream.core_stream_data =
boxed_stream.as_ref(), CoreStreamData::new(boxed_stream.as_ref(), in_stm_settings, out_stm_settings);
in_stm_settings,
out_stm_settings,
));
if let Err(r) = { if let Err(r) = boxed_stream.core_stream_data.setup() {
let mut core_stream_data = boxed_stream.core_stream_data.lock().unwrap();
core_stream_data.setup()
} {
cubeb_log!( cubeb_log!(
"({:p}) Could not setup the audiounit stream.", "({:p}) Could not setup the audiounit stream.",
boxed_stream.as_ref() boxed_stream.as_ref()
@ -2531,12 +2533,6 @@ impl Drop for AudioUnitContext {
unsafe impl Send for AudioUnitContext {} unsafe impl Send for AudioUnitContext {}
unsafe impl Sync for AudioUnitContext {} unsafe impl Sync for AudioUnitContext {}
// In the process of defusing our own custom mutex, those variables in the critical sections
// created by our own custom mutex will be moved to this struct. As a result, they will still
// be in the critical sections but the sections are created by this struct. However, some
// struct members don't really need locks to be used since their code paths give thread-safe
// guarantees. In fact, the mutex around this struct will be removed at the end of the
// mutex-defusing refactoring so it's fine to keep them in this struct for now.
#[derive(Debug)] #[derive(Debug)]
struct CoreStreamData<'ctx> { struct CoreStreamData<'ctx> {
stm_ptr: *const AudioUnitStream<'ctx>, stm_ptr: *const AudioUnitStream<'ctx>,
@ -3348,7 +3344,7 @@ struct AudioUnitStream<'ctx> {
panning: atomic::Atomic<f32>, panning: atomic::Atomic<f32>,
// This is true if a device change callback is currently running. // This is true if a device change callback is currently running.
switching_device: AtomicBool, switching_device: AtomicBool,
core_stream_data: Mutex<CoreStreamData<'ctx>>, core_stream_data: CoreStreamData<'ctx>,
} }
impl<'ctx> AudioUnitStream<'ctx> { impl<'ctx> AudioUnitStream<'ctx> {
@ -3377,7 +3373,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
current_latency_frames: AtomicU32::new(0), current_latency_frames: AtomicU32::new(0),
panning: atomic::Atomic::new(0.0_f32), panning: atomic::Atomic::new(0.0_f32),
switching_device: AtomicBool::new(false), switching_device: AtomicBool::new(false),
core_stream_data: Mutex::new(CoreStreamData::default()), core_stream_data: CoreStreamData::default(),
} }
} }
@ -3414,99 +3410,105 @@ impl<'ctx> AudioUnitStream<'ctx> {
} }
fn reinit(&mut self) -> Result<()> { fn reinit(&mut self) -> Result<()> {
// Call stop_audiounits to avoid potential data race. If there is a running data callback,
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
// CoreAudio framework that is used by the data callback.
if !self.shutdown.load(Ordering::SeqCst) { if !self.shutdown.load(Ordering::SeqCst) {
let core_stream_data = self.core_stream_data.lock().unwrap(); self.core_stream_data.stop_audiounits();
core_stream_data.stop_audiounits();
} }
{ assert!(
let mut core_stream_data = self.core_stream_data.lock().unwrap(); !self.core_stream_data.input_unit.is_null()
|| !self.core_stream_data.output_unit.is_null()
);
let vol_rv = if self.core_stream_data.output_unit.is_null() {
Err(Error::error())
} else {
get_volume(self.core_stream_data.output_unit)
};
assert!( self.core_stream_data.close();
!core_stream_data.input_unit.is_null() || !core_stream_data.output_unit.is_null()
);
let vol_rv = if core_stream_data.output_unit.is_null() {
Err(Error::error())
} else {
get_volume(core_stream_data.output_unit)
};
core_stream_data.close(); // Reinit occurs in one of the following case:
// - When the device is not alive any more
// - When the default system device change.
// - The bluetooth device changed from A2DP to/from HFP/HSP profile
// We first attempt to re-use the same device id, should that fail we will
// default to the (potentially new) default device.
let has_input = !self.core_stream_data.input_unit.is_null();
let input_device = if has_input {
self.core_stream_data.input_device.id
} else {
kAudioObjectUnknown
};
// Reinit occurs in one of the following case: if has_input {
// - When the device is not alive any more self.core_stream_data.input_device = create_device_info(input_device, DeviceType::INPUT).map_err(|e| {
// - When the default system device change.
// - The bluetooth device changed from A2DP to/from HFP/HSP profile
// We first attempt to re-use the same device id, should that fail we will
// default to the (potentially new) default device.
let has_input = !core_stream_data.input_unit.is_null();
let input_device = if has_input {
core_stream_data.input_device.id
} else {
kAudioObjectUnknown
};
if has_input {
core_stream_data.input_device = create_device_info(input_device, DeviceType::INPUT).map_err(|e| {
cubeb_log!(
"({:p}) Create input device info failed. This can happen when last media device is unplugged",
core_stream_data.stm_ptr
);
core_stream_data.close();
e
})?;
}
// Always use the default output on reinit. This is not correct in every
// case but it is sufficient for Firefox and prevent reinit from reporting
// failures. It will change soon when reinit mechanism will be updated.
core_stream_data.output_device = create_device_info(kAudioObjectUnknown, DeviceType::OUTPUT).map_err(|e| {
cubeb_log!( cubeb_log!(
"({:p}) Create output device info failed. This can happen when last media device is unplugged", "({:p}) Create input device info failed. This can happen when last media device is unplugged",
core_stream_data.stm_ptr self.core_stream_data.stm_ptr
); );
core_stream_data.close(); self.core_stream_data.close();
e e
})?; })?;
}
if core_stream_data.setup().is_err() { // Always use the default output on reinit. This is not correct in every
cubeb_log!("({:p}) Stream reinit failed.", core_stream_data.stm_ptr); // case but it is sufficient for Firefox and prevent reinit from reporting
if has_input && input_device != kAudioObjectUnknown { // failures. It will change soon when reinit mechanism will be updated.
// Attempt to re-use the same device-id failed, so attempt again with self.core_stream_data.output_device = create_device_info(kAudioObjectUnknown, DeviceType::OUTPUT).map_err(|e| {
// default input device. cubeb_log!(
core_stream_data.input_device = create_device_info(kAudioObjectUnknown, DeviceType::INPUT).map_err(|e| { "({:p}) Create output device info failed. This can happen when last media device is unplugged",
cubeb_log!( self.core_stream_data.stm_ptr
"({:p}) Create input device info failed. This can happen when last media device is unplugged", );
core_stream_data.stm_ptr self.core_stream_data.close();
); e
core_stream_data.close(); })?;
e
})?;
core_stream_data.setup().map_err(|e| {
cubeb_log!(
"({:p}) Second stream reinit failed.",
core_stream_data.stm_ptr
);
core_stream_data.close();
e
})?;
}
}
if vol_rv.is_ok() { if self.core_stream_data.setup().is_err() {
set_volume(core_stream_data.output_unit, vol_rv.unwrap()); cubeb_log!(
} "({:p}) Stream reinit failed.",
self.core_stream_data.stm_ptr
// If the stream was running, start it again. );
if !self.shutdown.load(Ordering::SeqCst) { if has_input && input_device != kAudioObjectUnknown {
core_stream_data.start_audiounits().map_err(|e| { // Attempt to re-use the same device-id failed, so attempt again with
cubeb_log!("({:p}) Start audiounit failed.", core_stream_data.stm_ptr); // default input device.
core_stream_data.close(); self.core_stream_data.input_device = create_device_info(kAudioObjectUnknown, DeviceType::INPUT).map_err(|e| {
cubeb_log!(
"({:p}) Create input device info failed. This can happen when last media device is unplugged",
self.core_stream_data.stm_ptr
);
self.core_stream_data.close();
e
})?;
self.core_stream_data.setup().map_err(|e| {
cubeb_log!(
"({:p}) Second stream reinit failed.",
self.core_stream_data.stm_ptr
);
self.core_stream_data.close();
e e
})?; })?;
} }
} }
if vol_rv.is_ok() {
set_volume(self.core_stream_data.output_unit, vol_rv.unwrap());
}
// If the stream was running, start it again.
if !self.shutdown.load(Ordering::SeqCst) {
self.core_stream_data.start_audiounits().map_err(|e| {
cubeb_log!(
"({:p}) Start audiounit failed.",
self.core_stream_data.stm_ptr
);
self.core_stream_data.close();
e
})?;
}
Ok(()) Ok(())
} }
@ -3549,18 +3551,18 @@ impl<'ctx> AudioUnitStream<'ctx> {
} }
fn destroy_internal(&mut self) { fn destroy_internal(&mut self) {
let mut core_stream_data = self.core_stream_data.lock().unwrap(); self.core_stream_data.close();
core_stream_data.close();
assert!(self.context.active_streams() >= 1); assert!(self.context.active_streams() >= 1);
self.context.update_latency_by_removing_stream(); self.context.update_latency_by_removing_stream();
} }
fn destroy(&mut self) { fn destroy(&mut self) {
// Call stop_audiounits to avoid potential data race. If there is a running data callback,
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
// CoreAudio framework that is used by the data callback.
if !self.shutdown.load(Ordering::SeqCst) { if !self.shutdown.load(Ordering::SeqCst) {
{ self.core_stream_data.stop_audiounits();
let core_stream_data = self.core_stream_data.lock().unwrap();
core_stream_data.stop_audiounits();
}
*self.shutdown.get_mut() = true; *self.shutdown.get_mut() = true;
} }
@ -3593,24 +3595,7 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
*self.shutdown.get_mut() = false; *self.shutdown.get_mut() = false;
*self.draining.get_mut() = false; *self.draining.get_mut() = false;
// The underlying CoreAudio API in start_audiounit will require to lock a mutex inside self.core_stream_data.start_audiounits()?;
// the CoreAudio framework itself. Locking `core_stream_data` first and requires another
// lock inside CoreAudio again will cause a deadlock when calling this function at the
// same time with a running data callback. When CoreAudio data callback is running, it
// holds the mutex inside CoreAudio and asks to lock `core_stream_data`. These two lock the
// mutexes in reverse order and lead to a potential deadlock.
let (input_unit, output_unit) = {
let core_stream_data = self.core_stream_data.lock().unwrap();
(core_stream_data.input_unit, core_stream_data.output_unit)
};
if !input_unit.is_null() {
start_audiounit(input_unit)?;
}
if !output_unit.is_null() {
start_audiounit(output_unit)?;
}
self.notify_state_changed(State::Started); self.notify_state_changed(State::Started);
@ -3623,24 +3608,7 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
fn stop(&mut self) -> Result<()> { fn stop(&mut self) -> Result<()> {
*self.shutdown.get_mut() = true; *self.shutdown.get_mut() = true;
// The underlying CoreAudio API in stop_audiounit will require to lock a mutex inside self.core_stream_data.stop_audiounits();
// the CoreAudio framework itself. Locking `core_stream_data` first and requires another
// lock inside CoreAudio again will cause a deadlock when calling this function at the
// same time with a running data callback. When CoreAudio data callback is running, it
// holds the mutex inside CoreAudio and asks to lock `core_stream_data`. These two lock the
// mutexes in reverse order and lead to a potential deadlock.
let (input_unit, output_unit) = {
let core_stream_data = self.core_stream_data.lock().unwrap();
(core_stream_data.input_unit, core_stream_data.output_unit)
};
if !input_unit.is_null() {
stop_audiounit(input_unit)?;
}
if !output_unit.is_null() {
stop_audiounit(output_unit)?;
}
self.notify_state_changed(State::Stopped); self.notify_state_changed(State::Stopped);
@ -3672,18 +3640,11 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
Ok(self.current_latency_frames.load(Ordering::SeqCst)) Ok(self.current_latency_frames.load(Ordering::SeqCst))
} }
fn set_volume(&mut self, volume: f32) -> Result<()> { fn set_volume(&mut self, volume: f32) -> Result<()> {
let output_unit = { set_volume(self.core_stream_data.output_unit, volume)
let core_stream_data = self.core_stream_data.lock().unwrap();
core_stream_data.output_unit
};
set_volume(output_unit, volume)
} }
fn set_panning(&mut self, panning: f32) -> Result<()> { fn set_panning(&mut self, panning: f32) -> Result<()> {
{ if self.core_stream_data.output_desc.mChannelsPerFrame > 2 {
let core_stream_data = self.core_stream_data.lock().unwrap(); return Err(Error::invalid_format());
if core_stream_data.output_desc.mChannelsPerFrame > 2 {
return Err(Error::invalid_format());
}
} }
self.panning.store(panning, Ordering::Relaxed); self.panning.store(panning, Ordering::Relaxed);
Ok(()) Ok(())