Bug 1530715 - P29: Move mixer to a struct within a mutex. r=padenot

The mixer of the stream will be created, reinitialized, used or
destroyed on different threads, so its operations should be in the
critical sections. We do create critical sections by our custom mutex.
However, this custom mutex will be gradually replaced by the standard
Rust mutex in the following patches.

To replace the custom mutex, we put the mixer to the struct wrapped by a
Rust mutex and do all the mixer operations in the critical section
created by this struct. At the end when the custom mutex is removed,
those operations are still in critical sections.

Calling notify_state_changed needs to borrow AudioUnitStream as a
mutuable. To avoid the borrowing-twice issue, the notify_state_changed
calling is moved out the scope of the critical section created in the
output data callback.

Differential Revision: https://phabricator.services.mozilla.com/D34062

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Chun-Min Chang 2019-07-09 19:57:04 +00:00
Родитель c7c35b3626
Коммит c47370ecc1
2 изменённых файлов: 176 добавлений и 164 удалений

Просмотреть файл

@ -3,4 +3,4 @@ git repository using the update.sh script.
The cubeb-coreaudio-rs git repository is: https://github.com/ChunMinChang/cubeb-coreaudio-rs
The git commit ID used was 5c9b94910dabb1f64b1c6e6d5907304d08c53d44 (2019-06-25 11:32:22 -0700)
The git commit ID used was b37d939bb7d2530047d624a5769ce2cfaa6d0dd5 (2019-06-25 11:32:22 -0700)

Просмотреть файл

@ -453,12 +453,13 @@ extern "C" fn audiounit_output_callback(
) -> OSStatus {
assert_eq!(bus, AU_OUT_BUS);
assert!(!out_buffer_list.is_null());
let out_buffer_list_ref = unsafe { &mut (*out_buffer_list) };
assert_eq!(out_buffer_list_ref.mNumberBuffers, 1);
assert!(!user_ptr.is_null());
let stm = unsafe { &mut *(user_ptr as *mut AudioUnitStream) };
let buffers = unsafe {
let out_buffer_list_ref = unsafe { &mut (*out_buffer_list) };
assert_eq!(out_buffer_list_ref.mNumberBuffers, 1);
let mut buffers = unsafe {
let ptr = out_buffer_list_ref.mBuffers.as_mut_ptr();
let len = out_buffer_list_ref.mNumberBuffers as usize;
slice::from_raw_parts_mut(ptr, len)
@ -479,10 +480,6 @@ extern "C" fn audiounit_output_callback(
}
);
let mut input_frames: i64 = 0;
let mut output_buffer = ptr::null_mut::<c_void>();
let mut input_buffer = ptr::null_mut::<c_void>();
if stm.shutdown.load(Ordering::SeqCst) {
cubeb_log!("({:p}) output shutdown.", stm as *const AudioUnitStream);
audiounit_make_silent(&mut buffers[0]);
@ -499,62 +496,70 @@ extern "C" fn audiounit_output_callback(
return NO_ERR;
}
// Get output buffer
output_buffer = match stm.mixer.as_mut() {
None => buffers[0].mData,
Some(mixer) => {
// If remixing needs to occur, we can't directly work in our final
// destination buffer as data may be overwritten or too small to start with.
mixer.update_buffer_size(output_frames as usize);
mixer.get_buffer_mut_ptr() as *mut c_void
}
};
let handler = |stm: &mut AudioUnitStream,
output_frames: u32,
buffers: &mut [AudioBuffer]|
-> (OSStatus, Option<State>) {
let mut input_frames: i64 = 0;
let mut output_buffer = ptr::null_mut::<c_void>();
let mut input_buffer = ptr::null_mut::<c_void>();
stm.frames_written
.fetch_add(i64::from(output_frames), Ordering::SeqCst);
// If Full duplex get also input buffer
if !stm.input_unit.is_null() {
// If the output callback came first and this is a duplex stream, we need to
// fill in some additional silence in the resampler.
// Otherwise, if we had more than expected callbacks in a row, or we're
// currently switching, we add some silence as well to compensate for the
// fact that we're lacking some input data.
let frames_written = stm.frames_written.load(Ordering::SeqCst);
let input_frames_needed = stm.minimum_resampling_input_frames(frames_written);
let missing_frames = input_frames_needed - stm.frames_read.load(Ordering::SeqCst);
if missing_frames > 0 {
stm.input_linear_buffer.as_mut().unwrap().push_zeros(
(missing_frames * i64::from(stm.input_desc.mChannelsPerFrame)) as usize,
);
stm.frames_read.store(input_frames_needed, Ordering::SeqCst);
let stm_ptr = stm as *const AudioUnitStream;
cubeb_log!(
"({:p}) {} pushed {} frames of input silence.",
stm_ptr,
if stm.frames_read.load(Ordering::SeqCst) == 0 {
"Input hasn't started,"
} else if stm.switching_device.load(Ordering::SeqCst) {
"Device switching,"
} else {
"Drop out,"
},
missing_frames
);
}
input_buffer = stm.input_linear_buffer.as_mut().unwrap().as_mut_ptr();
// Number of input frames in the buffer. It will change to actually used frames
// inside fill
assert_ne!(stm.input_desc.mChannelsPerFrame, 0);
input_frames = (stm.input_linear_buffer.as_ref().unwrap().elements()
/ stm.input_desc.mChannelsPerFrame as usize) as i64;
}
// Call user callback through resampler.
assert!(!output_buffer.is_null());
let outframes = {
let mut stream_device = stm.stream_device.lock().unwrap();
stream_device.resampler.fill(
// Get output buffer
output_buffer = match stream_device.mixer.as_mut() {
None => buffers[0].mData,
Some(mixer) => {
// If remixing needs to occur, we can't directly work in our final
// destination buffer as data may be overwritten or too small to start with.
mixer.update_buffer_size(output_frames as usize);
mixer.get_buffer_mut_ptr() as *mut c_void
}
};
stm.frames_written
.fetch_add(i64::from(output_frames), Ordering::SeqCst);
// Also get the input buffer if the stream is duplex
if !stm.input_unit.is_null() {
// If the output callback came first and this is a duplex stream, we need to
// fill in some additional silence in the resampler.
// Otherwise, if we had more than expected callbacks in a row, or we're
// currently switching, we add some silence as well to compensate for the
// fact that we're lacking some input data.
let frames_written = stm.frames_written.load(Ordering::SeqCst);
let input_frames_needed = stm.minimum_resampling_input_frames(frames_written);
let missing_frames = input_frames_needed - stm.frames_read.load(Ordering::SeqCst);
if missing_frames > 0 {
stm.input_linear_buffer.as_mut().unwrap().push_zeros(
(missing_frames * i64::from(stm.input_desc.mChannelsPerFrame)) as usize,
);
stm.frames_read.store(input_frames_needed, Ordering::SeqCst);
let stm_ptr = stm as *const AudioUnitStream;
cubeb_log!(
"({:p}) {} pushed {} frames of input silence.",
stm_ptr,
if stm.frames_read.load(Ordering::SeqCst) == 0 {
"Input hasn't started,"
} else if stm.switching_device.load(Ordering::SeqCst) {
"Device switching,"
} else {
"Drop out,"
},
missing_frames
);
}
input_buffer = stm.input_linear_buffer.as_mut().unwrap().as_mut_ptr();
// Number of input frames in the buffer. It will change to actually used frames
// inside fill
assert_ne!(stm.input_desc.mChannelsPerFrame, 0);
input_frames = (stm.input_linear_buffer.as_ref().unwrap().elements()
/ stm.input_desc.mChannelsPerFrame as usize) as i64;
}
// Call user callback through resampler.
assert!(!output_buffer.is_null());
let outframes = stream_device.resampler.fill(
input_buffer,
if input_buffer.is_null() {
ptr::null_mut()
@ -563,88 +568,93 @@ extern "C" fn audiounit_output_callback(
},
output_buffer,
i64::from(output_frames),
)
};
if !input_buffer.is_null() {
// Pop from the buffer the frames used by the the resampler.
stm.input_linear_buffer
.as_mut()
.unwrap()
.pop(input_frames as usize * stm.input_desc.mChannelsPerFrame as usize);
}
if outframes < 0 || outframes > i64::from(output_frames) {
*stm.shutdown.get_mut() = true;
assert_eq!(audio_output_unit_stop(stm.output_unit), NO_ERR);
if !stm.input_unit.is_null() {
assert_eq!(audio_output_unit_stop(stm.input_unit), NO_ERR);
);
if !input_buffer.is_null() {
// Pop from the buffer the frames used by the the resampler.
stm.input_linear_buffer
.as_mut()
.unwrap()
.pop(input_frames as usize * stm.input_desc.mChannelsPerFrame as usize);
}
stm.notify_state_changed(State::Error);
audiounit_make_silent(&mut buffers[0]);
return NO_ERR;
}
*stm.draining.get_mut() = outframes < i64::from(output_frames);
stm.frames_played
.store(stm.frames_queued, atomic::Ordering::SeqCst);
stm.frames_queued += outframes as u64;
let outaff = stm.output_desc.mFormatFlags;
let panning = if stm.output_desc.mChannelsPerFrame == 2 {
stm.panning.load(Ordering::Relaxed)
} else {
0.0
};
// Post process output samples.
if stm.draining.load(Ordering::SeqCst) {
// Clear missing frames (silence)
let count_bytes = |frames: usize| -> usize {
let sample_size = cubeb_sample_size(stm.output_stream_params.format());
frames * sample_size / mem::size_of::<u8>()
};
let out_bytes = unsafe {
slice::from_raw_parts_mut(
output_buffer as *mut u8,
count_bytes(output_frames as usize),
)
};
let start = count_bytes(outframes as usize);
for i in start..out_bytes.len() {
out_bytes[i] = 0;
if outframes < 0 || outframes > i64::from(output_frames) {
*stm.shutdown.get_mut() = true;
assert_eq!(audio_output_unit_stop(stm.output_unit), NO_ERR);
if !stm.input_unit.is_null() {
assert_eq!(audio_output_unit_stop(stm.input_unit), NO_ERR);
}
audiounit_make_silent(&mut buffers[0]);
return (NO_ERR, Some(State::Error));
}
}
// Mixing
if stm.mixer.is_none() {
// Pan stereo.
if panning != 0.0 {
unsafe {
if outaff & kAudioFormatFlagIsFloat != 0 {
ffi::cubeb_pan_stereo_buffer_float(
output_buffer as *mut f32,
outframes as u32,
panning,
);
} else if outaff & kAudioFormatFlagIsSignedInteger != 0 {
ffi::cubeb_pan_stereo_buffer_int(
output_buffer as *mut i16,
outframes as u32,
panning,
);
}
*stm.draining.get_mut() = outframes < i64::from(output_frames);
stm.frames_played
.store(stm.frames_queued, atomic::Ordering::SeqCst);
stm.frames_queued += outframes as u64;
let outaff = stm.output_desc.mFormatFlags;
let panning = if stm.output_desc.mChannelsPerFrame == 2 {
stm.panning.load(Ordering::Relaxed)
} else {
0.0
};
// Post process output samples.
if stm.draining.load(Ordering::SeqCst) {
// Clear missing frames (silence)
let count_bytes = |frames: usize| -> usize {
let sample_size = cubeb_sample_size(stm.output_stream_params.format());
frames * sample_size / mem::size_of::<u8>()
};
let out_bytes = unsafe {
slice::from_raw_parts_mut(
output_buffer as *mut u8,
count_bytes(output_frames as usize),
)
};
let start = count_bytes(outframes as usize);
for i in start..out_bytes.len() {
out_bytes[i] = 0;
}
}
} else {
assert!(buffers[0].mDataByteSize >= stm.output_desc.mBytesPerFrame * output_frames);
stm.mixer.as_mut().unwrap().mix(
output_frames as usize,
buffers[0].mData,
buffers[0].mDataByteSize as usize,
);
}
NO_ERR
// Mixing
if stream_device.mixer.is_none() {
// Pan stereo.
if panning != 0.0 {
unsafe {
if outaff & kAudioFormatFlagIsFloat != 0 {
ffi::cubeb_pan_stereo_buffer_float(
output_buffer as *mut f32,
outframes as u32,
panning,
);
} else if outaff & kAudioFormatFlagIsSignedInteger != 0 {
ffi::cubeb_pan_stereo_buffer_int(
output_buffer as *mut i16,
outframes as u32,
panning,
);
}
}
}
} else {
assert!(buffers[0].mDataByteSize >= stm.output_desc.mBytesPerFrame * output_frames);
stream_device.mixer.as_mut().unwrap().mix(
output_frames as usize,
buffers[0].mData,
buffers[0].mDataByteSize as usize,
);
}
(NO_ERR, None)
};
let (status, notification) = handler(stm, output_frames, &mut buffers);
if let Some(state) = notification {
stm.notify_state_changed(state);
}
status
}
extern "C" fn audiounit_property_listener_callback(
@ -2392,6 +2402,7 @@ unsafe impl Sync for AudioUnitContext {}
#[derive(Debug)]
struct StreamDevice {
aggregate_device: AggregateDevice,
mixer: Option<Mixer>,
resampler: Resampler,
}
@ -2399,6 +2410,7 @@ impl Default for StreamDevice {
fn default() -> Self {
Self {
aggregate_device: AggregateDevice::default(),
mixer: None,
resampler: Resampler::default(),
}
}
@ -2452,8 +2464,6 @@ struct AudioUnitStream<'ctx> {
panning: atomic::Atomic<f32>,
// This is true if a device change callback is currently running.
switching_device: AtomicBool,
// Mixer interface
mixer: Option<Mixer>,
// Listeners indicating what system events are monitored.
default_input_listener: Option<device_property_listener>,
default_output_listener: Option<device_property_listener>,
@ -2513,7 +2523,6 @@ impl<'ctx> AudioUnitStream<'ctx> {
current_latency_frames: AtomicU32::new(0),
panning: atomic::Atomic::new(0.0_f32),
switching_device: AtomicBool::new(false),
mixer: None,
default_input_listener: None,
default_output_listener: None,
input_alive_listener: None,
@ -3165,28 +3174,32 @@ impl<'ctx> AudioUnitStream<'ctx> {
self,
self.context.layout
);
if self.context.channels != self.output_stream_params.channels()
|| self.context.layout.load(atomic::Ordering::SeqCst)
!= self.output_stream_params.layout()
{
cubeb_log!("Incompatible channel layouts detected, setting up remixer");
self.mixer = Some(Mixer::new(
self.output_stream_params.format(),
self.output_stream_params.channels(),
self.output_stream_params.layout(),
self.context.channels,
self.context.layout.load(atomic::Ordering::SeqCst),
));
// We will be remixing the data before it reaches the output device.
// We need to adjust the number of channels and other
// AudioStreamDescription details.
self.output_desc.mChannelsPerFrame = self.context.channels;
self.output_desc.mBytesPerFrame =
(self.output_desc.mBitsPerChannel / 8) * self.output_desc.mChannelsPerFrame;
self.output_desc.mBytesPerPacket =
self.output_desc.mBytesPerFrame * self.output_desc.mFramesPerPacket;
} else {
self.mixer = None;
let mut stream_device = self.stream_device.lock().unwrap();
stream_device.mixer = if self.context.channels != self.output_stream_params.channels()
|| self.context.layout.load(atomic::Ordering::SeqCst)
!= self.output_stream_params.layout()
{
cubeb_log!("Incompatible channel layouts detected, setting up remixer");
// We will be remixing the data before it reaches the output device.
// We need to adjust the number of channels and other
// AudioStreamDescription details.
self.output_desc.mChannelsPerFrame = self.context.channels;
self.output_desc.mBytesPerFrame =
(self.output_desc.mBitsPerChannel / 8) * self.output_desc.mChannelsPerFrame;
self.output_desc.mBytesPerPacket =
self.output_desc.mBytesPerFrame * self.output_desc.mFramesPerPacket;
Some(Mixer::new(
self.output_stream_params.format(),
self.output_stream_params.channels(),
self.output_stream_params.layout(),
self.context.channels,
self.context.layout.load(atomic::Ordering::SeqCst),
))
} else {
None
};
}
r = audio_unit_set_property(
@ -3448,11 +3461,10 @@ impl<'ctx> AudioUnitStream<'ctx> {
self.output_unit = ptr::null_mut();
}
self.mixer = None;
{
let mut stream_device = self.stream_device.lock().unwrap();
stream_device.resampler.destroy();
stream_device.mixer = None;
stream_device.aggregate_device = AggregateDevice::default();
}
}