Bug 1530715 - P42: Remove unnecessary mutex. r=padenot

There are three potential data-race operations that may run at the same
time:
1. Data callback and stream reinitialization
2. Data callback and stream destroying
3. Stream reinitialization and stream destroying

The case 1 and 2 won't happen as long as the AudioOutputUnitStop is
called at the beginning of stream reinitialization and stream
destorying. The AudioOutputUnitStop requires to lock a mutex inside
CoreAudio framework that is also used by the data callback. Thus, if
there is a running callback, which holds the mutex inside CoreAudio
framework, when AudioOutputUnitStop is called, then the calling will
block the current thread until the data callback ends since it is
waiting for the mutex. By calling AudioOutputUnitStop at the beginning
of the stream reinitialization and stream destroying, the data race of
case 1 and 2 can be avoided.

On the other hand, the case 3 won't happen since the stream
initialization and destroying is run on the same task queue. The two
tasks on the same serial task queue are impossible to be run at the same
time. The mutex in AudioUnitStream is unnecessary because it's used for
the case 3.

Differential Revision: https://phabricator.services.mozilla.com/D34076

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Chun-Min Chang 2019-07-09 19:57:07 +00:00
Родитель a97f2bb079
Коммит 97c91ba03d
2 изменённых файлов: 254 добавлений и 293 удалений

Просмотреть файл

@ -3,4 +3,4 @@ git repository using the update.sh script.
The cubeb-coreaudio-rs git repository is: https://github.com/ChunMinChang/cubeb-coreaudio-rs
The git commit ID used was 6d49464259fa527203d7494e323be54845403b47 (2019-06-25 11:32:23 -0700)
The git commit ID used was c5b109e0c51bcd24fddd069bcf994aa0005a6497 (2019-06-25 11:37:14 -0700)

Просмотреть файл

@ -413,140 +413,151 @@ extern "C" fn audiounit_input_callback(
return NO_ERR;
}
let handler =
|stm: &mut AudioUnitStream,
flags: *mut AudioUnitRenderActionFlags,
tstamp: *const AudioTimeStamp,
bus: u32,
input_frames: u32|
-> (ErrorHandle, Option<State>) {
let mut core_stream_data = stm.core_stream_data.lock().unwrap();
assert_eq!(core_stream_data.stm_ptr, user_ptr as *const AudioUnitStream);
let handler = |stm: &mut AudioUnitStream,
flags: *mut AudioUnitRenderActionFlags,
tstamp: *const AudioTimeStamp,
bus: u32,
input_frames: u32|
-> (ErrorHandle, Option<State>) {
assert_eq!(
stm.core_stream_data.stm_ptr,
user_ptr as *const AudioUnitStream
);
// Create the AudioBufferList to store input.
let mut input_buffer_list = AudioBufferList::default();
input_buffer_list.mBuffers[0].mDataByteSize =
core_stream_data.input_desc.mBytesPerFrame * input_frames;
input_buffer_list.mBuffers[0].mData = ptr::null_mut();
input_buffer_list.mBuffers[0].mNumberChannels =
core_stream_data.input_desc.mChannelsPerFrame;
input_buffer_list.mNumberBuffers = 1;
assert!(!core_stream_data.input_unit.is_null());
let status = audio_unit_render(
core_stream_data.input_unit,
flags,
tstamp,
bus,
input_frames,
&mut input_buffer_list,
);
if (status != NO_ERR)
&& (status != kAudioUnitErr_CannotDoInCurrentContext
|| core_stream_data.output_unit.is_null())
{
return (ErrorHandle::Return(status), None);
}
let handle = if status == kAudioUnitErr_CannotDoInCurrentContext {
assert!(!core_stream_data.output_unit.is_null());
// kAudioUnitErr_CannotDoInCurrentContext is returned when using a BT
// headset and the profile is changed from A2DP to HFP/HSP. The previous
// output device is no longer valid and must be reset.
// For now state that no error occurred and feed silence, stream will be
// resumed once reinit has completed.
cubeb_logv!(
"({:p}) input: reinit pending feeding silence instead",
core_stream_data.stm_ptr
);
let elements =
(input_frames * core_stream_data.input_desc.mChannelsPerFrame) as usize;
core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push_zeros(elements);
ErrorHandle::Reinit
} else {
assert_eq!(status, NO_ERR);
// Copy input data in linear buffer.
let elements =
(input_frames * core_stream_data.input_desc.mChannelsPerFrame) as usize;
core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push(input_buffer_list.mBuffers[0].mData, elements);
ErrorHandle::Return(status)
};
// Advance input frame counter.
stm.frames_read
.fetch_add(i64::from(input_frames), atomic::Ordering::SeqCst);
// Create the AudioBufferList to store input.
let mut input_buffer_list = AudioBufferList::default();
input_buffer_list.mBuffers[0].mDataByteSize =
stm.core_stream_data.input_desc.mBytesPerFrame * input_frames;
input_buffer_list.mBuffers[0].mData = ptr::null_mut();
input_buffer_list.mBuffers[0].mNumberChannels =
stm.core_stream_data.input_desc.mChannelsPerFrame;
input_buffer_list.mNumberBuffers = 1;
assert!(!stm.core_stream_data.input_unit.is_null());
let status = audio_unit_render(
stm.core_stream_data.input_unit,
flags,
tstamp,
bus,
input_frames,
&mut input_buffer_list,
);
if (status != NO_ERR)
&& (status != kAudioUnitErr_CannotDoInCurrentContext
|| stm.core_stream_data.output_unit.is_null())
{
return (ErrorHandle::Return(status), None);
}
let handle = if status == kAudioUnitErr_CannotDoInCurrentContext {
assert!(!stm.core_stream_data.output_unit.is_null());
// kAudioUnitErr_CannotDoInCurrentContext is returned when using a BT
// headset and the profile is changed from A2DP to HFP/HSP. The previous
// output device is no longer valid and must be reset.
// For now state that no error occurred and feed silence, stream will be
// resumed once reinit has completed.
cubeb_logv!(
"({:p}) input: reinit pending feeding silence instead",
stm.core_stream_data.stm_ptr
);
let elements =
(input_frames * stm.core_stream_data.input_desc.mChannelsPerFrame) as usize;
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push_zeros(elements);
ErrorHandle::Reinit
} else {
assert_eq!(status, NO_ERR);
// Copy input data in linear buffer.
let elements =
(input_frames * stm.core_stream_data.input_desc.mChannelsPerFrame) as usize;
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.push(input_buffer_list.mBuffers[0].mData, elements);
ErrorHandle::Return(status)
};
// Advance input frame counter.
stm.frames_read
.fetch_add(i64::from(input_frames), atomic::Ordering::SeqCst);
cubeb_logv!(
"({:p}) input: buffers {}, size {}, channels {}, rendered frames {}, total frames {}.",
core_stream_data.stm_ptr,
stm.core_stream_data.stm_ptr,
input_buffer_list.mNumberBuffers,
input_buffer_list.mBuffers[0].mDataByteSize,
input_buffer_list.mBuffers[0].mNumberChannels,
input_frames,
core_stream_data.input_linear_buffer.as_ref().unwrap().elements()
/ core_stream_data.input_desc.mChannelsPerFrame as usize
);
// Full Duplex. We'll call data_callback in the AudioUnit output callback.
if !core_stream_data.output_unit.is_null() {
return (handle, None);
}
// Input only. Call the user callback through resampler.
// Resampler will deliver input buffer in the correct rate.
assert!(
input_frames as usize
<= core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.elements()
/ core_stream_data.input_desc.mChannelsPerFrame as usize
);
let mut total_input_frames = (core_stream_data
stm.core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.elements()
/ core_stream_data.input_desc.mChannelsPerFrame as usize)
as i64;
assert!(!core_stream_data
/ stm.core_stream_data.input_desc.mChannelsPerFrame as usize
);
// Full Duplex. We'll call data_callback in the AudioUnit output callback.
if !stm.core_stream_data.output_unit.is_null() {
return (handle, None);
}
// Input only. Call the user callback through resampler.
// Resampler will deliver input buffer in the correct rate.
assert!(
input_frames as usize
<= stm
.core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.elements()
/ stm.core_stream_data.input_desc.mChannelsPerFrame as usize
);
let mut total_input_frames =
(stm.core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.as_ptr()
.is_null());
let input_buffer = core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.as_mut_ptr();
let outframes = core_stream_data.resampler.fill(
input_buffer,
&mut total_input_frames,
ptr::null_mut(),
0,
.elements()
/ stm.core_stream_data.input_desc.mChannelsPerFrame as usize) as i64;
assert!(!stm
.core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.as_ptr()
.is_null());
let input_buffer = stm
.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.as_mut_ptr();
let outframes = stm.core_stream_data.resampler.fill(
input_buffer,
&mut total_input_frames,
ptr::null_mut(),
0,
);
if outframes < total_input_frames {
assert_eq!(
audio_output_unit_stop(stm.core_stream_data.input_unit),
NO_ERR
);
if outframes < total_input_frames {
assert_eq!(audio_output_unit_stop(core_stream_data.input_unit), NO_ERR);
return (handle, Some(State::Drained));
}
// Reset input buffer
core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.clear();
return (handle, Some(State::Drained));
}
// Reset input buffer
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
.clear();
(handle, None)
};
(handle, None)
};
let (handle, notification) = handler(stm, flags, tstamp, bus, input_frames);
if let Some(state) = notification {
@ -600,10 +611,7 @@ extern "C" fn audiounit_output_callback(
}
if stm.draining.load(Ordering::SeqCst) {
{
let core_stream_data = stm.core_stream_data.lock().unwrap();
core_stream_data.stop_audiounits();
}
stm.core_stream_data.stop_audiounits();
stm.notify_state_changed(State::Drained);
audiounit_make_silent(&mut buffers[0]);
return NO_ERR;
@ -613,10 +621,8 @@ extern "C" fn audiounit_output_callback(
output_frames: u32,
buffers: &mut [AudioBuffer]|
-> (OSStatus, Option<State>) {
let mut core_stream_data = stm.core_stream_data.lock().unwrap();
// Get output buffer
let output_buffer = match core_stream_data.mixer.as_mut() {
let output_buffer = match stm.core_stream_data.mixer.as_mut() {
None => buffers[0].mData,
Some(mixer) => {
// If remixing needs to occur, we can't directly work in our final
@ -630,17 +636,18 @@ extern "C" fn audiounit_output_callback(
.fetch_add(i64::from(output_frames), Ordering::SeqCst);
// Also get the input buffer if the stream is duplex
let (input_buffer, mut input_frames) = if !core_stream_data.input_unit.is_null() {
assert!(core_stream_data.input_linear_buffer.is_some());
let input_frames = core_stream_data
let (input_buffer, mut input_frames) = if !stm.core_stream_data.input_unit.is_null() {
assert!(stm.core_stream_data.input_linear_buffer.is_some());
let input_frames = stm
.core_stream_data
.input_linear_buffer
.as_ref()
.unwrap()
.elements()
/ core_stream_data.input_desc.mChannelsPerFrame as usize;
/ stm.core_stream_data.input_desc.mChannelsPerFrame as usize;
cubeb_logv!("Total input frames: {}", input_frames);
assert_ne!(core_stream_data.input_desc.mChannelsPerFrame, 0);
assert_ne!(stm.core_stream_data.input_desc.mChannelsPerFrame, 0);
// If the output callback came first and this is a duplex stream, we need to
// fill in some additional silence in the resampler.
// Otherwise, if we had more than expected callbacks in a row, or we're
@ -648,16 +655,16 @@ extern "C" fn audiounit_output_callback(
// fact that we're lacking some input data.
let frames_written = stm.frames_written.load(Ordering::SeqCst);
let input_frames_needed = minimum_resampling_input_frames(
core_stream_data.input_hw_rate,
f64::from(core_stream_data.output_stream_params.rate()),
stm.core_stream_data.input_hw_rate,
f64::from(stm.core_stream_data.output_stream_params.rate()),
frames_written,
);
let missing_frames = input_frames_needed - stm.frames_read.load(Ordering::SeqCst);
let elements = (missing_frames
* i64::from(core_stream_data.input_desc.mChannelsPerFrame))
* i64::from(stm.core_stream_data.input_desc.mChannelsPerFrame))
as usize;
if missing_frames > 0 {
core_stream_data
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
@ -665,7 +672,7 @@ extern "C" fn audiounit_output_callback(
stm.frames_read.store(input_frames_needed, Ordering::SeqCst);
cubeb_log!(
"({:p}) {} pushed {} frames of input silence.",
core_stream_data.stm_ptr,
stm.core_stream_data.stm_ptr,
if stm.frames_read.load(Ordering::SeqCst) == 0 {
"Input hasn't started,"
} else if stm.switching_device.load(Ordering::SeqCst) {
@ -677,7 +684,7 @@ extern "C" fn audiounit_output_callback(
);
}
(
core_stream_data
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
@ -690,7 +697,7 @@ extern "C" fn audiounit_output_callback(
// Call user callback through resampler.
assert!(!output_buffer.is_null());
let outframes = core_stream_data.resampler.fill(
let outframes = stm.core_stream_data.resampler.fill(
input_buffer,
if input_buffer.is_null() {
ptr::null_mut()
@ -703,8 +710,8 @@ extern "C" fn audiounit_output_callback(
if !input_buffer.is_null() {
// Pop from the buffer the frames used by the the resampler.
let elements =
input_frames as usize * core_stream_data.input_desc.mChannelsPerFrame as usize;
core_stream_data
input_frames as usize * stm.core_stream_data.input_desc.mChannelsPerFrame as usize;
stm.core_stream_data
.input_linear_buffer
.as_mut()
.unwrap()
@ -713,7 +720,7 @@ extern "C" fn audiounit_output_callback(
if outframes < 0 || outframes > i64::from(output_frames) {
*stm.shutdown.get_mut() = true;
core_stream_data.stop_audiounits();
stm.core_stream_data.stop_audiounits();
audiounit_make_silent(&mut buffers[0]);
return (NO_ERR, Some(State::Error));
}
@ -723,8 +730,8 @@ extern "C" fn audiounit_output_callback(
.store(stm.frames_queued, atomic::Ordering::SeqCst);
stm.frames_queued += outframes as u64;
let outaff = core_stream_data.output_desc.mFormatFlags;
let panning = if core_stream_data.output_desc.mChannelsPerFrame == 2 {
let outaff = stm.core_stream_data.output_desc.mFormatFlags;
let panning = if stm.core_stream_data.output_desc.mChannelsPerFrame == 2 {
stm.panning.load(Ordering::Relaxed)
} else {
0.0
@ -734,7 +741,8 @@ extern "C" fn audiounit_output_callback(
if stm.draining.load(Ordering::SeqCst) {
// Clear missing frames (silence)
let count_bytes = |frames: usize| -> usize {
let sample_size = cubeb_sample_size(core_stream_data.output_stream_params.format());
let sample_size =
cubeb_sample_size(stm.core_stream_data.output_stream_params.format());
frames * sample_size / mem::size_of::<u8>()
};
let out_bytes = unsafe {
@ -750,7 +758,7 @@ extern "C" fn audiounit_output_callback(
}
// Mixing
if core_stream_data.mixer.is_none() {
if stm.core_stream_data.mixer.is_none() {
// Pan stereo.
if panning != 0.0 {
unsafe {
@ -772,9 +780,9 @@ extern "C" fn audiounit_output_callback(
} else {
assert!(
buffers[0].mDataByteSize
>= core_stream_data.output_desc.mBytesPerFrame * output_frames
>= stm.core_stream_data.output_desc.mBytesPerFrame * output_frames
);
core_stream_data.mixer.as_mut().unwrap().mix(
stm.core_stream_data.mixer.as_mut().unwrap().mix(
output_frames as usize,
buffers[0].mData,
buffers[0].mDataByteSize as usize,
@ -841,8 +849,8 @@ extern "C" fn audiounit_property_listener_callback(
);
// If this is the default input device ignore the event,
// kAudioHardwarePropertyDefaultInputDevice will take care of the switch
let core_stream_data = stm.core_stream_data.lock().unwrap();
if core_stream_data
if stm
.core_stream_data
.input_device
.flags
.contains(device_flags::DEV_SYSTEM_DEFAULT)
@ -2464,16 +2472,10 @@ impl ContextOps for AudioUnitContext {
global_latency_frames,
));
boxed_stream.core_stream_data = Mutex::new(CoreStreamData::new(
boxed_stream.as_ref(),
in_stm_settings,
out_stm_settings,
));
boxed_stream.core_stream_data =
CoreStreamData::new(boxed_stream.as_ref(), in_stm_settings, out_stm_settings);
if let Err(r) = {
let mut core_stream_data = boxed_stream.core_stream_data.lock().unwrap();
core_stream_data.setup()
} {
if let Err(r) = boxed_stream.core_stream_data.setup() {
cubeb_log!(
"({:p}) Could not setup the audiounit stream.",
boxed_stream.as_ref()
@ -2531,12 +2533,6 @@ impl Drop for AudioUnitContext {
unsafe impl Send for AudioUnitContext {}
unsafe impl Sync for AudioUnitContext {}
// In the process of defusing our own custom mutex, those variables in the critical sections
// created by our own custom mutex will be moved to this struct. As a result, they will still
// be in the critical sections but the sections are created by this struct. However, some
// struct members don't really need locks to be used since their code paths give thread-safe
// guarantees. In fact, the mutex around this struct will be removed at the end of the
// mutex-defusing refactoring so it's fine to keep them in this struct for now.
#[derive(Debug)]
struct CoreStreamData<'ctx> {
stm_ptr: *const AudioUnitStream<'ctx>,
@ -3348,7 +3344,7 @@ struct AudioUnitStream<'ctx> {
panning: atomic::Atomic<f32>,
// This is true if a device change callback is currently running.
switching_device: AtomicBool,
core_stream_data: Mutex<CoreStreamData<'ctx>>,
core_stream_data: CoreStreamData<'ctx>,
}
impl<'ctx> AudioUnitStream<'ctx> {
@ -3377,7 +3373,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
current_latency_frames: AtomicU32::new(0),
panning: atomic::Atomic::new(0.0_f32),
switching_device: AtomicBool::new(false),
core_stream_data: Mutex::new(CoreStreamData::default()),
core_stream_data: CoreStreamData::default(),
}
}
@ -3414,99 +3410,105 @@ impl<'ctx> AudioUnitStream<'ctx> {
}
fn reinit(&mut self) -> Result<()> {
// Call stop_audiounits to avoid potential data race. If there is a running data callback,
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
// CoreAudio framework that is used by the data callback.
if !self.shutdown.load(Ordering::SeqCst) {
let core_stream_data = self.core_stream_data.lock().unwrap();
core_stream_data.stop_audiounits();
self.core_stream_data.stop_audiounits();
}
{
let mut core_stream_data = self.core_stream_data.lock().unwrap();
assert!(
!self.core_stream_data.input_unit.is_null()
|| !self.core_stream_data.output_unit.is_null()
);
let vol_rv = if self.core_stream_data.output_unit.is_null() {
Err(Error::error())
} else {
get_volume(self.core_stream_data.output_unit)
};
assert!(
!core_stream_data.input_unit.is_null() || !core_stream_data.output_unit.is_null()
);
let vol_rv = if core_stream_data.output_unit.is_null() {
Err(Error::error())
} else {
get_volume(core_stream_data.output_unit)
};
self.core_stream_data.close();
core_stream_data.close();
// Reinit occurs in one of the following case:
// - When the device is not alive any more
// - When the default system device change.
// - The bluetooth device changed from A2DP to/from HFP/HSP profile
// We first attempt to re-use the same device id, should that fail we will
// default to the (potentially new) default device.
let has_input = !self.core_stream_data.input_unit.is_null();
let input_device = if has_input {
self.core_stream_data.input_device.id
} else {
kAudioObjectUnknown
};
// Reinit occurs in one of the following case:
// - When the device is not alive any more
// - When the default system device change.
// - The bluetooth device changed from A2DP to/from HFP/HSP profile
// We first attempt to re-use the same device id, should that fail we will
// default to the (potentially new) default device.
let has_input = !core_stream_data.input_unit.is_null();
let input_device = if has_input {
core_stream_data.input_device.id
} else {
kAudioObjectUnknown
};
if has_input {
core_stream_data.input_device = create_device_info(input_device, DeviceType::INPUT).map_err(|e| {
cubeb_log!(
"({:p}) Create input device info failed. This can happen when last media device is unplugged",
core_stream_data.stm_ptr
);
core_stream_data.close();
e
})?;
}
// Always use the default output on reinit. This is not correct in every
// case but it is sufficient for Firefox and prevent reinit from reporting
// failures. It will change soon when reinit mechanism will be updated.
core_stream_data.output_device = create_device_info(kAudioObjectUnknown, DeviceType::OUTPUT).map_err(|e| {
if has_input {
self.core_stream_data.input_device = create_device_info(input_device, DeviceType::INPUT).map_err(|e| {
cubeb_log!(
"({:p}) Create output device info failed. This can happen when last media device is unplugged",
core_stream_data.stm_ptr
"({:p}) Create input device info failed. This can happen when last media device is unplugged",
self.core_stream_data.stm_ptr
);
core_stream_data.close();
self.core_stream_data.close();
e
})?;
}
if core_stream_data.setup().is_err() {
cubeb_log!("({:p}) Stream reinit failed.", core_stream_data.stm_ptr);
if has_input && input_device != kAudioObjectUnknown {
// Attempt to re-use the same device-id failed, so attempt again with
// default input device.
core_stream_data.input_device = create_device_info(kAudioObjectUnknown, DeviceType::INPUT).map_err(|e| {
cubeb_log!(
"({:p}) Create input device info failed. This can happen when last media device is unplugged",
core_stream_data.stm_ptr
);
core_stream_data.close();
e
})?;
core_stream_data.setup().map_err(|e| {
cubeb_log!(
"({:p}) Second stream reinit failed.",
core_stream_data.stm_ptr
);
core_stream_data.close();
e
})?;
}
}
// Always use the default output on reinit. This is not correct in every
// case but it is sufficient for Firefox and prevent reinit from reporting
// failures. It will change soon when reinit mechanism will be updated.
self.core_stream_data.output_device = create_device_info(kAudioObjectUnknown, DeviceType::OUTPUT).map_err(|e| {
cubeb_log!(
"({:p}) Create output device info failed. This can happen when last media device is unplugged",
self.core_stream_data.stm_ptr
);
self.core_stream_data.close();
e
})?;
if vol_rv.is_ok() {
set_volume(core_stream_data.output_unit, vol_rv.unwrap());
}
// If the stream was running, start it again.
if !self.shutdown.load(Ordering::SeqCst) {
core_stream_data.start_audiounits().map_err(|e| {
cubeb_log!("({:p}) Start audiounit failed.", core_stream_data.stm_ptr);
core_stream_data.close();
if self.core_stream_data.setup().is_err() {
cubeb_log!(
"({:p}) Stream reinit failed.",
self.core_stream_data.stm_ptr
);
if has_input && input_device != kAudioObjectUnknown {
// Attempt to re-use the same device-id failed, so attempt again with
// default input device.
self.core_stream_data.input_device = create_device_info(kAudioObjectUnknown, DeviceType::INPUT).map_err(|e| {
cubeb_log!(
"({:p}) Create input device info failed. This can happen when last media device is unplugged",
self.core_stream_data.stm_ptr
);
self.core_stream_data.close();
e
})?;
self.core_stream_data.setup().map_err(|e| {
cubeb_log!(
"({:p}) Second stream reinit failed.",
self.core_stream_data.stm_ptr
);
self.core_stream_data.close();
e
})?;
}
}
if vol_rv.is_ok() {
set_volume(self.core_stream_data.output_unit, vol_rv.unwrap());
}
// If the stream was running, start it again.
if !self.shutdown.load(Ordering::SeqCst) {
self.core_stream_data.start_audiounits().map_err(|e| {
cubeb_log!(
"({:p}) Start audiounit failed.",
self.core_stream_data.stm_ptr
);
self.core_stream_data.close();
e
})?;
}
Ok(())
}
@ -3549,18 +3551,18 @@ impl<'ctx> AudioUnitStream<'ctx> {
}
fn destroy_internal(&mut self) {
let mut core_stream_data = self.core_stream_data.lock().unwrap();
core_stream_data.close();
self.core_stream_data.close();
assert!(self.context.active_streams() >= 1);
self.context.update_latency_by_removing_stream();
}
fn destroy(&mut self) {
// Call stop_audiounits to avoid potential data race. If there is a running data callback,
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
// CoreAudio framework that is used by the data callback.
if !self.shutdown.load(Ordering::SeqCst) {
{
let core_stream_data = self.core_stream_data.lock().unwrap();
core_stream_data.stop_audiounits();
}
self.core_stream_data.stop_audiounits();
*self.shutdown.get_mut() = true;
}
@ -3593,24 +3595,7 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
*self.shutdown.get_mut() = false;
*self.draining.get_mut() = false;
// The underlying CoreAudio API in start_audiounit will require to lock a mutex inside
// the CoreAudio framework itself. Locking `core_stream_data` first and requires another
// lock inside CoreAudio again will cause a deadlock when calling this function at the
// same time with a running data callback. When CoreAudio data callback is running, it
// holds the mutex inside CoreAudio and asks to lock `core_stream_data`. These two lock the
// mutexes in reverse order and lead to a potential deadlock.
let (input_unit, output_unit) = {
let core_stream_data = self.core_stream_data.lock().unwrap();
(core_stream_data.input_unit, core_stream_data.output_unit)
};
if !input_unit.is_null() {
start_audiounit(input_unit)?;
}
if !output_unit.is_null() {
start_audiounit(output_unit)?;
}
self.core_stream_data.start_audiounits()?;
self.notify_state_changed(State::Started);
@ -3623,24 +3608,7 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
fn stop(&mut self) -> Result<()> {
*self.shutdown.get_mut() = true;
// The underlying CoreAudio API in stop_audiounit will require to lock a mutex inside
// the CoreAudio framework itself. Locking `core_stream_data` first and requires another
// lock inside CoreAudio again will cause a deadlock when calling this function at the
// same time with a running data callback. When CoreAudio data callback is running, it
// holds the mutex inside CoreAudio and asks to lock `core_stream_data`. These two lock the
// mutexes in reverse order and lead to a potential deadlock.
let (input_unit, output_unit) = {
let core_stream_data = self.core_stream_data.lock().unwrap();
(core_stream_data.input_unit, core_stream_data.output_unit)
};
if !input_unit.is_null() {
stop_audiounit(input_unit)?;
}
if !output_unit.is_null() {
stop_audiounit(output_unit)?;
}
self.core_stream_data.stop_audiounits();
self.notify_state_changed(State::Stopped);
@ -3672,18 +3640,11 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
Ok(self.current_latency_frames.load(Ordering::SeqCst))
}
fn set_volume(&mut self, volume: f32) -> Result<()> {
let output_unit = {
let core_stream_data = self.core_stream_data.lock().unwrap();
core_stream_data.output_unit
};
set_volume(output_unit, volume)
set_volume(self.core_stream_data.output_unit, volume)
}
fn set_panning(&mut self, panning: f32) -> Result<()> {
{
let core_stream_data = self.core_stream_data.lock().unwrap();
if core_stream_data.output_desc.mChannelsPerFrame > 2 {
return Err(Error::invalid_format());
}
if self.core_stream_data.output_desc.mChannelsPerFrame > 2 {
return Err(Error::invalid_format());
}
self.panning.store(panning, Ordering::Relaxed);
Ok(())