The order of events we want to mimic is:
- input callback
  - reinit_async
- output callback
  - data callback error -> state_callback(error)
- reinit
  - setup error -> state_callback(error)

For a particular stream these events should result in only one
state_callback(error).
This commit is contained in:
Andreas Pehrson 2024-05-22 11:07:24 +02:00 коммит произвёл Andreas Pehrson
Родитель d472f1b80a
Коммит c5d04f2a43
2 изменённых файлов: 123 добавлений и 2 удалений

Просмотреть файл

@ -2,9 +2,9 @@ extern crate itertools;
use self::itertools::iproduct;
use super::utils::{
draining_data_callback, get_devices_info_in_scope, noop_data_callback,
draining_data_callback, get_devices_info_in_scope, noop_data_callback, state_tracking_cb,
test_device_channels_in_scope, test_get_default_device, test_ops_context_operation,
test_ops_stream_operation, test_ops_stream_operation_on_context, Scope,
test_ops_stream_operation, test_ops_stream_operation_on_context, Scope, StateCallbackData,
};
use super::*;
use std::thread;
@ -1177,6 +1177,77 @@ fn test_ops_stream_device_destroy() {
});
}
pub extern "C" fn reiniting_and_erroring_data_callback(
stream: *mut ffi::cubeb_stream,
_user_ptr: *mut c_void,
_input_buffer: *const c_void,
output_buffer: *mut c_void,
nframes: i64,
) -> i64 {
assert!(!stream.is_null());
let stm = unsafe { &mut *(stream as *mut AudioUnitStream) };
// Feed silence data to output buffer
if !output_buffer.is_null() {
let channels = stm.core_stream_data.output_stream_params.channels();
let samples = nframes as usize * channels as usize;
let sample_size = cubeb_sample_size(stm.core_stream_data.output_stream_params.format());
unsafe {
ptr::write_bytes(output_buffer, 0, samples * sample_size);
}
}
// Trigger an async reinit before the backend handles the error below.
// This scenario could happen in the backend's internal input callback.
stm.reinit_async();
ffi::CUBEB_ERROR.into()
}
#[test]
fn test_ops_stream_racy_reinit() {
// Make sure the parameters meet the requirements of AudioUnitContext::stream_init
// (in the comments).
let mut input_params = ffi::cubeb_stream_params::default();
input_params.format = ffi::CUBEB_SAMPLE_FLOAT32NE;
input_params.rate = 48000;
input_params.channels = 1;
input_params.layout = ffi::CUBEB_LAYOUT_UNDEFINED;
input_params.prefs = ffi::CUBEB_STREAM_PREF_NONE;
let mut output_params = ffi::cubeb_stream_params::default();
output_params.format = ffi::CUBEB_SAMPLE_FLOAT32NE;
output_params.rate = 44100;
output_params.channels = 2;
output_params.layout = ffi::CUBEB_LAYOUT_UNDEFINED;
output_params.prefs = ffi::CUBEB_STREAM_PREF_NONE;
let mut data = StateCallbackData::default();
test_ops_stream_operation(
"stream: racy reinit",
ptr::null_mut(), // Use default input device.
&mut input_params,
ptr::null_mut(), // Use default output device.
&mut output_params,
4096, // TODO: Get latency by get_min_latency instead ?
Some(reiniting_and_erroring_data_callback),
Some(state_tracking_cb),
&mut data as *mut StateCallbackData as *mut c_void,
|stream| {
assert_eq!(unsafe { OPS.stream_start.unwrap()(stream) }, ffi::CUBEB_OK);
while data.error_cnt() == 0 && data.stopped_cnt() == 0 {
thread::sleep(Duration::from_millis(1));
}
assert_eq!(unsafe { OPS.stream_stop.unwrap()(stream) }, ffi::CUBEB_OK);
},
);
assert_eq!(data.started_cnt(), 1);
assert_eq!(data.stopped_cnt(), 0);
assert_eq!(data.drained_cnt(), 0);
assert_eq!(data.error_cnt(), 1);
}
#[test]
fn test_ops_stream_register_device_changed_callback() {
extern "C" fn callback(_: *mut c_void) {}

Просмотреть файл

@ -48,6 +48,56 @@ pub extern "C" fn draining_data_callback(
nframes - 1
}
#[derive(Default)]
pub struct StateCallbackData {
started_cnt: AtomicU32,
stopped_cnt: AtomicU32,
drained_cnt: AtomicU32,
error_cnt: AtomicU32,
}
impl StateCallbackData {
pub fn started_cnt(&self) -> u32 {
self.started_cnt.load(Ordering::SeqCst)
}
pub fn stopped_cnt(&self) -> u32 {
self.stopped_cnt.load(Ordering::SeqCst)
}
pub fn drained_cnt(&self) -> u32 {
self.drained_cnt.load(Ordering::SeqCst)
}
pub fn error_cnt(&self) -> u32 {
self.error_cnt.load(Ordering::SeqCst)
}
}
pub extern "C" fn state_tracking_cb(
stream: *mut ffi::cubeb_stream,
_usr_ptr: *mut c_void,
state: u32,
) {
let data = unsafe { (_usr_ptr as *mut StateCallbackData).as_mut() }.unwrap();
match state {
ffi::CUBEB_STATE_STARTED => {
data.started_cnt.fetch_add(1, Ordering::SeqCst);
cubeb_log!("({:p}) state is now started", stream);
}
ffi::CUBEB_STATE_STOPPED => {
data.stopped_cnt.fetch_add(1, Ordering::SeqCst);
cubeb_log!("({:p}) state is now stopped", stream);
}
ffi::CUBEB_STATE_DRAINED => {
data.drained_cnt.fetch_add(1, Ordering::SeqCst);
cubeb_log!("({:p}) state is now drained", stream);
}
ffi::CUBEB_STATE_ERROR => {
data.error_cnt.fetch_add(1, Ordering::SeqCst);
cubeb_log!("({:p}) state is now error", stream);
}
_ => unreachable!("unknown state"),
};
}
#[derive(Clone, Debug, PartialEq)]
pub enum Scope {
Input,