diff --git a/Cargo.lock b/Cargo.lock index b6bc1e5b721c..88ec0f30d55e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,12 +117,14 @@ dependencies = [ name = "audioipc-client" version = "0.4.0" dependencies = [ + "audio_thread_priority 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "audioipc 0.2.4", "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "cubeb-backend 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "foreign-types 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/media/audioipc/client/Cargo.toml b/media/audioipc/client/Cargo.toml index 3f1f5cd2557b..c84183201205 100644 --- a/media/audioipc/client/Cargo.toml +++ b/media/audioipc/client/Cargo.toml @@ -17,3 +17,6 @@ libc = "0.2" log = "0.4" tokio-core = "0.1" tokio-uds = "0.1.7" +audio_thread_priority = "0.13.0" +lazy_static = "1.2.0" +cfg-if = "0.1.0" diff --git a/media/audioipc/client/src/context.rs b/media/audioipc/client/src/context.rs index f44a3a9b1ecc..c596a93783b8 100644 --- a/media/audioipc/client/src/context.rs +++ b/media/audioipc/client/src/context.rs @@ -4,6 +4,7 @@ // accompanying file LICENSE for details use assert_not_in_callback; +use audio_thread_priority::promote_current_thread_to_real_time; use audioipc::codec::LengthDelimitedCodec; use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles}; use audioipc::{core, rpc}; @@ -13,7 +14,7 @@ use cubeb_backend::{ Stream, StreamParams, StreamParamsRef, }; use futures::Future; -use futures_cpupool::{self, CpuPool}; +use futures_cpupool::CpuPool; use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::sync::mpsc; @@ -21,7 +22,12 @@ use std::thread; use std::{fmt, io, mem, ptr}; use stream; use tokio_core::reactor::{Handle, Remote}; -use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD}; +use {ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD}; +cfg_if! { + if #[cfg(target_os = "linux")] { + use {G_THREAD_POOL}; + } +} struct CubebClient; @@ -79,6 +85,50 @@ fn open_server_stream() -> Result { } } +fn register_thread(callback: Option) { + match promote_current_thread_to_real_time(0, 48000) { + Ok(_) => { + debug!("Audio thread promoted to real-time."); + } + Err(_) => { + error!("Could not promote thread to real-time."); + } + } + if let Some(func) = callback { + let thr = thread::current(); + let name = CString::new(thr.name().unwrap()).unwrap(); + func(name.as_ptr()); + } +} + +fn create_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { + futures_cpupool::Builder::new() + .name_prefix("AudioIPC") + .after_start(move || register_thread(init_params.thread_create_callback)) + .pool_size(init_params.pool_size) + .stack_size(init_params.stack_size) + .create() +} + +cfg_if! { + if #[cfg(target_os = "linux")] { + fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { + let mut guard = G_THREAD_POOL.lock().unwrap(); + if guard.is_some() { + // Sandbox is on, and the thread pool was created earlier, before the lockdown. + guard.take().unwrap() + } else { + // Sandbox is off, let's create the pool now, promoting the threads will work. + create_thread_pool(init_params) + } + } + } else { + fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { + create_thread_pool(init_params) + } + } +} + impl ContextOps for ClientContext { fn init(_context_name: Option<&CStr>) -> Result { fn bind_and_send_client( @@ -100,20 +150,10 @@ impl ContextOps for ClientContext { let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap()); - let thread_create_callback = params.thread_create_callback; - - let register_thread = move || { - if let Some(func) = thread_create_callback { - let thr = thread::current(); - let name = CString::new(thr.name().unwrap()).unwrap(); - func(name.as_ptr()); - } - }; - let core = t!(core::spawn_thread("AudioIPC Client RPC", move || { let handle = core::handle(); - register_thread(); + register_thread(params.thread_create_callback); open_server_stream() .ok() @@ -129,22 +169,17 @@ impl ContextOps for ClientContext { let rpc = t!(rx_rpc.recv()); - let cpupool = futures_cpupool::Builder::new() - .name_prefix("AudioIPC") - .after_start(register_thread) - .pool_size(params.pool_size) - .stack_size(params.stack_size) - .create(); - // Don't let errors bubble from here. Later calls against this context // will return errors the caller expects to handle. let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); + let pool = get_thread_pool(params); + let ctx = Box::new(ClientContext { _ops: &CLIENT_OPS as *const _, rpc: rpc, core: core, - cpu_pool: cpupool, + cpu_pool: pool, }); Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) } diff --git a/media/audioipc/client/src/lib.rs b/media/audioipc/client/src/lib.rs index 1ace11175a46..f9110e31323a 100644 --- a/media/audioipc/client/src/lib.rs +++ b/media/audioipc/client/src/lib.rs @@ -14,6 +14,11 @@ extern crate libc; extern crate log; extern crate tokio_core; extern crate tokio_uds; +extern crate audio_thread_priority; +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate cfg_if; #[macro_use] mod send_recv; @@ -25,11 +30,27 @@ use context::ClientContext; use cubeb_backend::{capi, ffi}; use std::os::raw::{c_char, c_int}; use stream::ClientStream; +use std::sync::{Mutex}; +use futures_cpupool::CpuPool; +use audio_thread_priority::RtPriorityHandle; +cfg_if! { + if #[cfg(target_os = "linux")] { + use std::sync::{Arc, Condvar}; + use std::ffi::CString; + use std::thread; + use audio_thread_priority::promote_current_thread_to_real_time; + } +} type InitParamsTls = std::cell::RefCell>; thread_local!(static IN_CALLBACK: std::cell::RefCell = std::cell::RefCell::new(false)); thread_local!(static CPUPOOL_INIT_PARAMS: InitParamsTls = std::cell::RefCell::new(None)); +thread_local!(static G_PRIORITY_HANDLES: std::cell::RefCell> = std::cell::RefCell::new(vec![])); + +lazy_static! { + static ref G_THREAD_POOL: Mutex> = Mutex::new(None); +} // This must match the definition of AudioIpcInitParams in // dom/media/CubebUtils.cpp in Gecko. @@ -84,6 +105,58 @@ where static mut G_SERVER_FD: Option = None; +cfg_if! { + if #[cfg(target_os = "linux")] { + #[no_mangle] + pub unsafe extern "C" fn audioipc_init_threads(init_params: *const AudioIpcInitParams) { + let thread_create_callback = (*init_params).thread_create_callback; + + // It is critical that this function waits until the various threads are created, promoted to + // real-time, and _then_ return, because the sandbox lockdown happens right after returning + // from here. + let pair = Arc::new((Mutex::new((*init_params).pool_size), Condvar::new())); + let pair2 = pair.clone(); + + let register_thread = move || { + if let Some(func) = thread_create_callback { + match promote_current_thread_to_real_time(0, 48000) { + Ok(handle) => { + G_PRIORITY_HANDLES.with(|handles| { + (handles.borrow_mut()).push(handle); + }); + } + Err(_) => { + error!("Could not promote audio threads to real-time during initialization."); + } + } + let thr = thread::current(); + let name = CString::new(thr.name().unwrap()).unwrap(); + func(name.as_ptr()); + let &(ref lock, ref cvar) = &*pair2; + let mut count = lock.lock().unwrap(); + *count -= 1; + cvar.notify_one(); + } + }; + + let mut pool = G_THREAD_POOL.lock().unwrap(); + + *pool = Some(futures_cpupool::Builder::new() + .name_prefix("AudioIPC") + .after_start(register_thread) + .pool_size((*init_params).pool_size) + .stack_size((*init_params).stack_size) + .create()); + + let &(ref lock, ref cvar) = &*pair; + let mut count = lock.lock().unwrap(); + while *count != 0 { + count = cvar.wait(count).unwrap(); + } + } + } +} + #[no_mangle] /// Entry point from C code. pub unsafe extern "C" fn audioipc_client_init( diff --git a/media/audioipc/server/Cargo.toml b/media/audioipc/server/Cargo.toml index 62b6d838eab5..b0f1b02448ba 100644 --- a/media/audioipc/server/Cargo.toml +++ b/media/audioipc/server/Cargo.toml @@ -18,6 +18,7 @@ slab = "0.3.0" futures = "0.1.18" tokio-core = "0.1" tokio-uds = "0.1.7" +audio_thread_priority = "0.13.0" [dependencies.error-chain] version = "0.11.0" diff --git a/media/audioipc/server/src/lib.rs b/media/audioipc/server/src/lib.rs index df1a6a339669..eff54e6bb6a0 100644 --- a/media/audioipc/server/src/lib.rs +++ b/media/audioipc/server/src/lib.rs @@ -18,6 +18,7 @@ extern crate libc; extern crate slab; extern crate tokio_core; extern crate tokio_uds; +extern crate audio_thread_priority; use audioipc::core; use audioipc::platformhandle_passing::framed_with_platformhandles; @@ -28,6 +29,7 @@ use futures::Future; use std::error::Error; use std::os::raw::c_void; use std::ptr; +use audio_thread_priority::promote_current_thread_to_real_time; mod server; @@ -57,6 +59,12 @@ fn run() -> Result { let callback_thread = try!( core::spawn_thread("AudioIPC Callback RPC", || { + match promote_current_thread_to_real_time(0, 48000) { + Ok(_) => { } + Err(_) => { + debug!("Failed to promote audio callback thread to real-time."); + } + } trace!("Starting up cubeb audio callback event loop thread..."); Ok(()) }).or_else(|e| {