* Create a friendly wrapper around dispatch_queue_t

Instead of using low-level dispatch APIs directly, we should create a
wrapper around `dispatch_queue_t` that can retain or release the
reference count automatically.

* Hide low-level dispatch APIs

With the new `Queue` struct is introduced, there is no need to expose
the low-level dispatch APIs. Those low-level dispatch APIs should be
prevented from being used directly.

* Make the task queue stream-local

The stream start, stop, destroy, reinit tasks, from different streams,
will be dispatched to the same task queue and then be executed one by
one. In fact, those tasks dispatched from different streams can be run
separately in parallel since those tasks don't share data in common. We
can create a task queue per stream and dispatch the stream's own task to
its own task queue so those tasks fired from different streams can be
run in parallel. This should boost the speed of creating, staring,
stopping, or reinitializing multiple streams at the same time.

* Implement a sync task runner that runs the final task

Any task appended after the stream-destroy task should be cancelled.
After moving the task queue from being owned by the cubeb context to the
cubeb stream, we can easily cancel the stream X's job in the task queue
after stream X is destroyed, without interfering other stream's tasks.

* Give task queue an unique label

The task queue in the cubeb stream should have a unique lable so it
would be easier to identify the task queue's owner when debugging
issues.

* Merge create_dispatch_queue to Queue::new

* Merge release_dispatch_queue to Queue::drop

* Merge retain_dispatch_queue to Queue::clone

* Merge get_dispatch_context to Queue::get_context

* Merge set_dispatch_context to Queue::set_context

* Init should_cancel to false in new instead of run_final

* Replace generic type by AtomicBool type in {get, set}_context

* Rename {get,set}_context to {get,set}_should_cancel

* Merge async_dispatch to run_async

* Merge sync_dispatch to Queue::run_{sync, final}

* Move create_closure_and_executor into Queue

* Destroy AudioUnitContext properly

* Revise comments
This commit is contained in:
Chun-Min Chang 2020-03-19 11:04:15 -07:00 коммит произвёл GitHub
Родитель 6e3e8e8359
Коммит 799518a033
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 201 добавлений и 141 удалений

Просмотреть файл

@ -4,148 +4,203 @@ use std::ffi::CString;
use std::mem;
use std::os::raw::c_void;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = ptr::null_mut::<dispatch_queue_attr_s>();
// Queue: A wrapper around `dispatch_queue_t`.
// ------------------------------------------------------------------------------------------------
#[derive(Debug)]
pub struct Queue(dispatch_queue_t);
pub fn create_dispatch_queue(
label: &'static str,
queue_attr: dispatch_queue_attr_t,
) -> dispatch_queue_t {
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
unsafe { dispatch_queue_create(c_string, queue_attr) }
}
pub fn release_dispatch_queue(queue: dispatch_queue_t) {
// TODO: This is incredibly unsafe. Find another way to release the queue.
unsafe {
dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue));
impl Queue {
pub fn new(label: &str) -> Self {
const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t =
ptr::null_mut::<dispatch_queue_attr_s>();
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
let queue = Self(unsafe { dispatch_queue_create(c_string, DISPATCH_QUEUE_SERIAL) });
queue.set_should_cancel(Box::new(AtomicBool::new(false)));
queue
}
}
pub fn async_dispatch<F>(queue: dispatch_queue_t, work: F)
where
F: Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
dispatch_async_f(queue, closure, executor);
pub fn run_async<F>(&self, work: F)
where
F: Send + FnOnce(),
{
let should_cancel = self.get_should_cancel();
let (closure, executor) = Self::create_closure_and_executor(|| {
if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
return;
}
work();
});
unsafe {
dispatch_async_f(self.0, closure, executor);
}
}
}
pub fn sync_dispatch<F>(queue: dispatch_queue_t, work: F)
where
F: Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
dispatch_sync_f(queue, closure, executor);
pub fn run_sync<F>(&self, work: F)
where
F: Send + FnOnce(),
{
let should_cancel = self.get_should_cancel();
let (closure, executor) = Self::create_closure_and_executor(|| {
if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
return;
}
work();
});
unsafe {
dispatch_sync_f(self.0, closure, executor);
}
}
}
// Return an raw pointer to a (unboxed) closure and an executor that
// will run the closure (after re-boxing the closure) when it's called.
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, dispatch_function_t)
where
F: FnOnce(),
{
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void)
pub fn run_final<F>(&self, work: F)
where
F: Send + FnOnce(),
{
let should_cancel = self.get_should_cancel();
let (closure, executor) = Self::create_closure_and_executor(|| {
work();
should_cancel
.expect("dispatch context should be allocated!")
.store(true, Ordering::SeqCst);
});
unsafe {
dispatch_sync_f(self.0, closure, executor);
}
}
fn get_should_cancel(&self) -> Option<&mut AtomicBool> {
unsafe {
let context = dispatch_get_context(
mem::transmute::<dispatch_queue_t, dispatch_object_t>(self.0),
) as *mut AtomicBool;
context.as_mut()
}
}
fn set_should_cancel(&self, context: Box<AtomicBool>) {
unsafe {
let queue = mem::transmute::<dispatch_queue_t, dispatch_object_t>(self.0);
// Leak the context from Box.
dispatch_set_context(queue, Box::into_raw(context) as *mut c_void);
extern "C" fn finalizer(context: *mut c_void) {
// Retake the leaked context into box and then drop it.
let _ = unsafe { Box::from_raw(context as *mut AtomicBool) };
}
// The `finalizer` is only run if the `context` in `queue` is set by `dispatch_set_context`.
dispatch_set_finalizer_f(queue, Some(finalizer));
}
}
fn release(&self) {
unsafe {
// This will release the inner `dispatch_queue_t` asynchronously.
// TODO: It's incredibly unsafe to call `transmute` directly.
// Find another way to release the queue.
dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>(
self.0,
));
}
}
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, dispatch_function_t)
where
F: FnOnce(),
{
// Retake the leaked closure.
let closure = unsafe { Box::from_raw(unboxed_closure as *mut F) };
// Execute the closure.
(*closure)();
// closure is released after finishing this function call.
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void)
where
F: FnOnce(),
{
// Retake the leaked closure.
let closure = unsafe { Box::from_raw(unboxed_closure as *mut F) };
// Execute the closure.
(*closure)();
// closure is released after finishing this function call.
}
let closure = Box::new(closure); // Allocate closure on heap.
let executor: dispatch_function_t = Some(closure_executer::<F>);
(
Box::into_raw(closure) as *mut c_void, // Leak the closure.
executor,
)
}
let closure = Box::new(closure); // Allocate closure on heap.
let executor: dispatch_function_t = Some(closure_executer::<F>);
(
Box::into_raw(closure) as *mut c_void, // Leak the closure.
executor,
)
}
#[cfg(test)]
mod test {
use super::*;
use std::sync::{Arc, Mutex};
const COUNT: u32 = 10;
#[test]
fn test_async_dispatch() {
use std::sync::mpsc::channel;
get_queue_and_resource("Run with async dispatch api wrappers", |queue, resource| {
let (tx, rx) = channel();
for i in 0..COUNT {
let (res, tx) = (Arc::clone(&resource), tx.clone());
async_dispatch(queue, move || {
let mut res = res.lock().unwrap();
assert_eq!(res.last_touched, if i == 0 { None } else { Some(i - 1) });
assert_eq!(res.touched_count, i);
res.touch(i);
if i == COUNT - 1 {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap(); // Wait until it's touched COUNT times.
let resource = resource.lock().unwrap();
assert_eq!(resource.touched_count, COUNT);
assert_eq!(resource.last_touched.unwrap(), COUNT - 1);
});
impl Drop for Queue {
fn drop(&mut self) {
self.release();
}
}
#[test]
fn test_sync_dispatch() {
get_queue_and_resource("Run with sync dispatch api wrappers", |queue, resource| {
for i in 0..COUNT {
let res = Arc::clone(&resource);
sync_dispatch(queue, move || {
let mut res = res.lock().unwrap();
assert_eq!(res.last_touched, if i == 0 { None } else { Some(i - 1) });
assert_eq!(res.touched_count, i);
res.touch(i);
});
}
let resource = resource.lock().unwrap();
assert_eq!(resource.touched_count, COUNT);
assert_eq!(resource.last_touched.unwrap(), COUNT - 1);
});
}
struct Resource {
last_touched: Option<u32>,
touched_count: u32,
}
impl Resource {
fn new() -> Self {
Resource {
last_touched: None,
touched_count: 0,
}
}
fn touch(&mut self, who: u32) {
self.last_touched = Some(who);
self.touched_count += 1;
impl Clone for Queue {
fn clone(&self) -> Self {
// TODO: It's incredibly unsafe to call `transmute` directly.
// Find another way to release the queue.
unsafe {
dispatch_retain(mem::transmute::<dispatch_queue_t, dispatch_object_t>(
self.0,
));
}
Self(self.0)
}
}
#[test]
fn run_tasks_in_order() {
let mut visited = Vec::<u32>::new();
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let ptr = &mut visited as *mut Vec<u32> as usize;
fn visit(v: u32, visited_ptr: usize) {
let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
visited.push(v);
};
let queue = Queue::new("Run tasks in order");
queue.run_sync(move || visit(1, ptr));
queue.run_sync(move || visit(2, ptr));
queue.run_async(move || visit(3, ptr));
queue.run_async(move || visit(4, ptr));
// Call sync here to block the current thread and make sure all the tasks are done.
queue.run_sync(move || visit(5, ptr));
assert_eq!(visited, vec![1, 2, 3, 4, 5]);
}
#[test]
fn run_final_task() {
let mut visited = Vec::<u32>::new();
fn get_queue_and_resource<F>(label: &'static str, callback: F)
where
F: FnOnce(dispatch_queue_t, Arc<Mutex<Resource>>),
{
let queue = create_dispatch_queue(label, DISPATCH_QUEUE_SERIAL);
let resource = Arc::new(Mutex::new(Resource::new()));
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let ptr = &mut visited as *mut Vec<u32> as usize;
callback(queue, resource);
fn visit(v: u32, visited_ptr: usize) {
let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
visited.push(v);
};
// Release the queue.
release_dispatch_queue(queue);
let queue = Queue::new("Task after run_final will be cancelled");
queue.run_sync(move || visit(1, ptr));
queue.run_async(move || visit(2, ptr));
queue.run_final(move || visit(3, ptr));
queue.run_async(move || visit(4, ptr));
queue.run_sync(move || visit(5, ptr));
}
// `queue` will be dropped asynchronously and then the `finalizer` of the `queue`
// should be fired to clean up the `context` set in the `queue`.
assert_eq!(visited, vec![1, 2, 3]);
}

Просмотреть файл

@ -1696,12 +1696,12 @@ extern "C" fn audiounit_collection_changed_callback(
) -> OSStatus {
let context = unsafe { &mut *(in_client_data as *mut AudioUnitContext) };
let queue = context.serial_queue;
let queue = context.serial_queue.clone();
let mutexed_context = Arc::new(Mutex::new(context));
let also_mutexed_context = Arc::clone(&mutexed_context);
// This can be called from inside an AudioUnit function, dispatch to another queue.
async_dispatch(queue, move || {
queue.run_async(move || {
let ctx_guard = also_mutexed_context.lock().unwrap();
let ctx_ptr = *ctx_guard as *const AudioUnitContext;
@ -1849,10 +1849,7 @@ pub const OPS: Ops = capi_new!(AudioUnitContext, AudioUnitStream);
#[derive(Debug)]
pub struct AudioUnitContext {
_ops: *const Ops,
// serial_queue will be created by dispatch_queue_create(create_dispatch_queue)
// without ARC(Automatic Reference Counting) support, so it should be released
// by dispatch_release(release_dispatch_queue).
serial_queue: dispatch_queue_t,
serial_queue: Queue,
latency_controller: Mutex<LatencyController>,
devices: Mutex<SharedDevices>,
}
@ -1861,7 +1858,7 @@ impl AudioUnitContext {
fn new() -> Self {
Self {
_ops: &OPS as *const _,
serial_queue: create_dispatch_queue(DISPATCH_QUEUE_LABEL, DISPATCH_QUEUE_SERIAL),
serial_queue: Queue::new(DISPATCH_QUEUE_LABEL),
latency_controller: Mutex::new(LatencyController::default()),
devices: Mutex::new(SharedDevices::default()),
}
@ -2168,6 +2165,10 @@ impl ContextOps for AudioUnitContext {
global_latency_frames,
));
// Rename the task queue to be an unique label.
let queue_label = format!("{}.{:p}", DISPATCH_QUEUE_LABEL, boxed_stream.as_ref());
boxed_stream.queue = Queue::new(queue_label.as_str());
boxed_stream.core_stream_data =
CoreStreamData::new(boxed_stream.as_ref(), in_stm_settings, out_stm_settings);
@ -2218,11 +2219,14 @@ impl Drop for AudioUnitContext {
}
}
// Unregister the callback if necessary.
self.remove_devices_changed_listener(DeviceType::INPUT);
self.remove_devices_changed_listener(DeviceType::OUTPUT);
release_dispatch_queue(self.serial_queue);
// Make sure all the pending (device-collection-changed-callback) tasks
// in queue are done, and cancel all the tasks appended after `drop` is executed.
let queue = self.serial_queue.clone();
queue.run_final(|| {
// Unregister the callback if necessary.
self.remove_devices_changed_listener(DeviceType::INPUT);
self.remove_devices_changed_listener(DeviceType::OUTPUT);
});
}
}
@ -3071,6 +3075,8 @@ impl<'ctx> Drop for CoreStreamData<'ctx> {
struct AudioUnitStream<'ctx> {
context: &'ctx mut AudioUnitContext,
user_ptr: *mut c_void,
// Task queue for the stream.
queue: Queue,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
@ -3107,6 +3113,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
AudioUnitStream {
context,
user_ptr,
queue: Queue::new(DISPATCH_QUEUE_LABEL),
data_callback,
state_callback,
device_changed_callback: Mutex::new(None),
@ -3266,12 +3273,12 @@ impl<'ctx> AudioUnitStream<'ctx> {
return;
}
let queue = self.context.serial_queue;
let queue = self.queue.clone();
let mutexed_stm = Arc::new(Mutex::new(self));
let also_mutexed_stm = Arc::clone(&mutexed_stm);
// Use a new thread, through the queue, to avoid deadlock when calling
// Get/SetProperties method from inside notify callback
async_dispatch(queue, move || {
queue.run_async(move || {
let mut stm_guard = also_mutexed_stm.lock().unwrap();
let stm_ptr = *stm_guard as *const AudioUnitStream;
if stm_guard.destroy_pending.load(Ordering::SeqCst) {
@ -3327,12 +3334,12 @@ impl<'ctx> AudioUnitStream<'ctx> {
// Execute the stream destroy work.
self.destroy_pending.store(true, Ordering::SeqCst);
let queue = self.context.serial_queue;
let queue = self.queue.clone();
let stream_ptr = self as *const AudioUnitStream;
// Execute close in serial queue to avoid collision
// with reinit when un/plug devices
sync_dispatch(queue, move || {
queue.run_final(move || {
// Call stop_audiounits to avoid potential data race. If there is a running data callback,
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
@ -3361,11 +3368,10 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
self.draining.store(false, Ordering::SeqCst);
// Execute start in serial queue to avoid racing with destroy or reinit.
let queue = self.context.serial_queue;
let mut result = Err(Error::error());
let started = &mut result;
let stream = &self;
sync_dispatch(queue, move || {
self.queue.run_sync(move || {
*started = stream.core_stream_data.start_audiounits();
});
@ -3385,9 +3391,8 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
self.shutdown.store(true, Ordering::SeqCst);
// Execute stop in serial queue to avoid racing with destroy or reinit.
let queue = self.context.serial_queue;
let stream = &self;
sync_dispatch(queue, move || {
self.queue.run_sync(move || {
stream.core_stream_data.stop_audiounits();
});