Bug 1429847 - Allow promoting CpuPool threads for audio remoting separately from the client creation. r=kinetik

Differential Revision: https://phabricator.services.mozilla.com/D34887

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Paul Adenot 2019-06-21 19:49:08 +00:00
Родитель 356d945008
Коммит 88c421c259
6 изменённых файлов: 143 добавлений и 21 удалений

2
Cargo.lock сгенерированный
Просмотреть файл

@ -117,12 +117,14 @@ dependencies = [
name = "audioipc-client"
version = "0.4.0"
dependencies = [
"audio_thread_priority 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
"audioipc 0.2.4",
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"cubeb-backend 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"foreign-types 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",

Просмотреть файл

@ -17,3 +17,6 @@ libc = "0.2"
log = "0.4"
tokio-core = "0.1"
tokio-uds = "0.1.7"
audio_thread_priority = "0.13.0"
lazy_static = "1.2.0"
cfg-if = "0.1.0"

Просмотреть файл

@ -4,6 +4,7 @@
// accompanying file LICENSE for details
use assert_not_in_callback;
use audio_thread_priority::promote_current_thread_to_real_time;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles};
use audioipc::{core, rpc};
@ -13,7 +14,7 @@ use cubeb_backend::{
Stream, StreamParams, StreamParamsRef,
};
use futures::Future;
use futures_cpupool::{self, CpuPool};
use futures_cpupool::CpuPool;
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::sync::mpsc;
@ -21,7 +22,12 @@ use std::thread;
use std::{fmt, io, mem, ptr};
use stream;
use tokio_core::reactor::{Handle, Remote};
use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
use {ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
cfg_if! {
if #[cfg(target_os = "linux")] {
use {G_THREAD_POOL};
}
}
struct CubebClient;
@ -79,6 +85,50 @@ fn open_server_stream() -> Result<audioipc::MessageStream> {
}
}
fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) {
match promote_current_thread_to_real_time(0, 48000) {
Ok(_) => {
debug!("Audio thread promoted to real-time.");
}
Err(_) => {
error!("Could not promote thread to real-time.");
}
}
if let Some(func) = callback {
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
}
}
fn create_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(move || register_thread(init_params.thread_create_callback))
.pool_size(init_params.pool_size)
.stack_size(init_params.stack_size)
.create()
}
cfg_if! {
if #[cfg(target_os = "linux")] {
fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
let mut guard = G_THREAD_POOL.lock().unwrap();
if guard.is_some() {
// Sandbox is on, and the thread pool was created earlier, before the lockdown.
guard.take().unwrap()
} else {
// Sandbox is off, let's create the pool now, promoting the threads will work.
create_thread_pool(init_params)
}
}
} else {
fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
create_thread_pool(init_params)
}
}
}
impl ContextOps for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<Context> {
fn bind_and_send_client(
@ -100,20 +150,10 @@ impl ContextOps for ClientContext {
let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap());
let thread_create_callback = params.thread_create_callback;
let register_thread = move || {
if let Some(func) = thread_create_callback {
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
}
};
let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
let handle = core::handle();
register_thread();
register_thread(params.thread_create_callback);
open_server_stream()
.ok()
@ -129,22 +169,17 @@ impl ContextOps for ClientContext {
let rpc = t!(rx_rpc.recv());
let cpupool = futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size(params.pool_size)
.stack_size(params.stack_size)
.create();
// Don't let errors bubble from here. Later calls against this context
// will return errors the caller expects to handle.
let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected);
let pool = get_thread_pool(params);
let ctx = Box::new(ClientContext {
_ops: &CLIENT_OPS as *const _,
rpc: rpc,
core: core,
cpu_pool: cpupool,
cpu_pool: pool,
});
Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
}

Просмотреть файл

@ -14,6 +14,11 @@ extern crate libc;
extern crate log;
extern crate tokio_core;
extern crate tokio_uds;
extern crate audio_thread_priority;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate cfg_if;
#[macro_use]
mod send_recv;
@ -25,11 +30,27 @@ use context::ClientContext;
use cubeb_backend::{capi, ffi};
use std::os::raw::{c_char, c_int};
use stream::ClientStream;
use std::sync::{Mutex};
use futures_cpupool::CpuPool;
use audio_thread_priority::RtPriorityHandle;
cfg_if! {
if #[cfg(target_os = "linux")] {
use std::sync::{Arc, Condvar};
use std::ffi::CString;
use std::thread;
use audio_thread_priority::promote_current_thread_to_real_time;
}
}
type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>;
thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
thread_local!(static CPUPOOL_INIT_PARAMS: InitParamsTls = std::cell::RefCell::new(None));
thread_local!(static G_PRIORITY_HANDLES: std::cell::RefCell<Vec<RtPriorityHandle>> = std::cell::RefCell::new(vec![]));
lazy_static! {
static ref G_THREAD_POOL: Mutex<Option<CpuPool>> = Mutex::new(None);
}
// This must match the definition of AudioIpcInitParams in
// dom/media/CubebUtils.cpp in Gecko.
@ -84,6 +105,58 @@ where
static mut G_SERVER_FD: Option<PlatformHandle> = None;
cfg_if! {
if #[cfg(target_os = "linux")] {
#[no_mangle]
pub unsafe extern "C" fn audioipc_init_threads(init_params: *const AudioIpcInitParams) {
let thread_create_callback = (*init_params).thread_create_callback;
// It is critical that this function waits until the various threads are created, promoted to
// real-time, and _then_ return, because the sandbox lockdown happens right after returning
// from here.
let pair = Arc::new((Mutex::new((*init_params).pool_size), Condvar::new()));
let pair2 = pair.clone();
let register_thread = move || {
if let Some(func) = thread_create_callback {
match promote_current_thread_to_real_time(0, 48000) {
Ok(handle) => {
G_PRIORITY_HANDLES.with(|handles| {
(handles.borrow_mut()).push(handle);
});
}
Err(_) => {
error!("Could not promote audio threads to real-time during initialization.");
}
}
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
let &(ref lock, ref cvar) = &*pair2;
let mut count = lock.lock().unwrap();
*count -= 1;
cvar.notify_one();
}
};
let mut pool = G_THREAD_POOL.lock().unwrap();
*pool = Some(futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size((*init_params).pool_size)
.stack_size((*init_params).stack_size)
.create());
let &(ref lock, ref cvar) = &*pair;
let mut count = lock.lock().unwrap();
while *count != 0 {
count = cvar.wait(count).unwrap();
}
}
}
}
#[no_mangle]
/// Entry point from C code.
pub unsafe extern "C" fn audioipc_client_init(

Просмотреть файл

@ -18,6 +18,7 @@ slab = "0.3.0"
futures = "0.1.18"
tokio-core = "0.1"
tokio-uds = "0.1.7"
audio_thread_priority = "0.13.0"
[dependencies.error-chain]
version = "0.11.0"

Просмотреть файл

@ -18,6 +18,7 @@ extern crate libc;
extern crate slab;
extern crate tokio_core;
extern crate tokio_uds;
extern crate audio_thread_priority;
use audioipc::core;
use audioipc::platformhandle_passing::framed_with_platformhandles;
@ -28,6 +29,7 @@ use futures::Future;
use std::error::Error;
use std::os::raw::c_void;
use std::ptr;
use audio_thread_priority::promote_current_thread_to_real_time;
mod server;
@ -57,6 +59,12 @@ fn run() -> Result<ServerWrapper> {
let callback_thread = try!(
core::spawn_thread("AudioIPC Callback RPC", || {
match promote_current_thread_to_real_time(0, 48000) {
Ok(_) => { }
Err(_) => {
debug!("Failed to promote audio callback thread to real-time.");
}
}
trace!("Starting up cubeb audio callback event loop thread...");
Ok(())
}).or_else(|e| {