Bug 1445067 - P1: Update audioipc to commit 219a811. r=kinetik

This pulls in the fixes to shutdown RPC channels correctly when all
client proxies are dropped. This stops leaking fd and shmem.

MozReview-Commit-ID: 8Kb0iFPn8Pc

--HG--
extra : rebase_source : 8fcce9dfbec570f4d3ec035a6dd6576d1d137cd5
This commit is contained in:
Dan Glastonbury 2018-04-18 13:42:43 +10:00
Родитель 7f1b6e0663
Коммит 774e019c62
10 изменённых файлов: 241 добавлений и 122 удалений

Просмотреть файл

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was b93386611d7d9689c4f0177a4704f0adc16bc2d1 (2018-03-09 14:45:24 +1000)
The git commit ID used was 219a811b62b83bd6e4cb54ae6aebc56bbb43203c (2018-04-18 13:57:59 +1200)

Просмотреть файл

@ -1,6 +1,6 @@
[package]
name = "audioipc"
version = "0.2.3"
version = "0.2.4"
authors = [
"Matthew Gregan <kinetik@flim.org>",
"Dan Glastonbury <dan.glastonbury@gmail.com>"

Просмотреть файл

@ -47,7 +47,7 @@ impl CoreThread {
impl Drop for CoreThread {
fn drop(&mut self) {
trace!("Shutting down {:?}", self);
debug!("Shutting down {:?}", self);
if let Some(inner) = self.inner.take() {
let _ = inner.shutdown.send(());
drop(inner.join.join());

Просмотреть файл

@ -85,7 +85,7 @@ where
{
// If there is a buffered frame, try to write it to `A`
fn do_write(&mut self) -> Poll<(), io::Error> {
debug!("do_write...");
trace!("do_write...");
// Create a frame from any pending message in `write_buf`.
if !self.write_buf.is_empty() {
self.set_frame(None);
@ -141,8 +141,7 @@ where
_ => panic!(),
}
}
debug!("process {} frames", processed);
trace!("process {} frames", processed);
trace!("pending frames: {:?}", self.frames);
Ok(().into())

Просмотреть файл

@ -118,6 +118,7 @@ impl From<DeviceInfo> for ffi::cubeb_device_info {
}
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub struct StreamParams {
pub format: ffi::cubeb_sample_format,

Просмотреть файл

@ -90,6 +90,7 @@ where
Async::Ready(None) => {
trace!(" --> got None");
// The service is done with the connection.
self.run = false;
break;
}
// Nothing to dispatch
@ -139,6 +140,7 @@ where
try!(self.flush());
if self.is_done() {
trace!(" --> is done.");
return Ok(().into());
}

Просмотреть файл

@ -42,6 +42,9 @@ macro_rules! t(
pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the
// common fields.
#[repr(C)]
pub struct ClientContext {
_ops: *const Ops,
rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
@ -227,7 +230,7 @@ impl ContextOps for ClientContext {
}
let stream_name = match stream_name {
Some(s) => Some(s.to_bytes().to_vec()),
Some(s) => Some(s.to_bytes_with_nul().to_vec()),
None => None,
};
@ -258,7 +261,7 @@ impl ContextOps for ClientContext {
impl Drop for ClientContext {
fn drop(&mut self) {
debug!("ClientContext drop...");
debug!("ClientContext dropped...");
let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
unsafe {
if G_SERVER_FD.is_some() {

Просмотреть файл

@ -40,11 +40,15 @@ impl Drop for Device {
}
}
// ClientStream's layout *must* match cubeb.c's `struct cubeb_stream` for the
// common fields.
#[repr(C)]
#[derive(Debug)]
pub struct ClientStream<'ctx> {
// This must be a reference to Context for cubeb, cubeb accesses
// stream methods via stream->context->ops
context: &'ctx ClientContext,
user_ptr: *mut c_void,
token: usize,
}
@ -66,9 +70,10 @@ impl rpc::Server for CallbackServer {
fn process(&mut self, req: Self::Request) -> Self::Future {
match req {
CallbackReq::Data(nframes, frame_size) => {
debug!(
trace!(
"stream_thread: Data Callback: nframes={} frame_size={}",
nframes, frame_size
nframes,
frame_size
);
// Clone values that need to be moved into the cpu pool thread.
@ -104,7 +109,7 @@ impl rpc::Server for CallbackServer {
})
}
CallbackReq::State(state) => {
debug!("stream_thread: State Callback: {:?}", state);
trace!("stream_thread: State Callback: {:?}", state);
let user_ptr = self.user_ptr;
let cb = self.state_cb.unwrap();
self.cpu_pool.spawn_fn(move || {
@ -134,7 +139,7 @@ impl<'ctx> ClientStream<'ctx> {
let rpc = ctx.rpc();
let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated()));
trace!("token = {}, fds = {:?}", data.token, data.fds);
debug!("token = {}, fds = {:?}", data.token, data.fds);
let stm = data.fds[0];
let stream = unsafe { net::UnixStream::from_raw_fd(stm) };
@ -172,6 +177,7 @@ impl<'ctx> ClientStream<'ctx> {
let stream = Box::into_raw(Box::new(ClientStream {
context: ctx,
user_ptr: user_ptr,
token: data.token,
}));
Ok(unsafe { Stream::from_ptr(stream as *mut _) })
@ -180,7 +186,7 @@ impl<'ctx> ClientStream<'ctx> {
impl<'ctx> Drop for ClientStream<'ctx> {
fn drop(&mut self) {
trace!("ClientStream drop...");
debug!("ClientStream dropped...");
let rpc = self.context.rpc();
let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
}
@ -268,5 +274,13 @@ pub fn init(
state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void,
) -> Result<Stream> {
ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
let stm = try!(ClientStream::init(
ctx,
init_params,
data_callback,
state_callback,
user_ptr
));
debug_assert_eq!(stm.user_ptr(), user_ptr);
Ok(stm)
}

Просмотреть файл

@ -1,6 +1,6 @@
[package]
name = "audioipc-server"
version = "0.2.2"
version = "0.2.3"
authors = [
"Matthew Gregan <kinetik@flim.org>",
"Dan Glastonbury <dan.glastonbury@gmail.com>"
@ -9,7 +9,7 @@ description = "Remote cubeb server"
[dependencies]
audioipc = { path = "../audioipc" }
cubeb = "0.5.2"
cubeb-core = "0.5.1"
bytes = "0.4"
lazycell = "^0.4"
libc = "0.2"

Просмотреть файл

@ -6,7 +6,7 @@ extern crate log;
extern crate audioipc;
extern crate bytes;
extern crate cubeb;
extern crate cubeb_core as cubeb;
extern crate futures;
extern crate lazycell;
extern crate libc;
@ -22,16 +22,19 @@ use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, Device, Devic
ServerMessage, StreamCreate, StreamInitParams, StreamParams};
use audioipc::rpc;
use audioipc::shm::{SharedMemReader, SharedMemWriter};
use futures::Future;
use cubeb::ffi;
use futures::future::{self, FutureResult};
use futures::sync::oneshot;
use std::{ptr, slice};
use futures::Future;
use std::cell::RefCell;
use std::convert::From;
use std::error::Error;
use std::os::raw::c_void;
use std::ffi::{CStr, CString};
use std::mem::{size_of, ManuallyDrop};
use std::os::raw::{c_long, c_void};
use std::os::unix::net;
use std::os::unix::prelude::*;
use std::{panic, ptr, slice};
use tokio_core::reactor::Remote;
use tokio_uds::UnixStream;
@ -60,7 +63,8 @@ where
CONTEXT_KEY.with(|k| {
let mut context = k.borrow_mut();
if context.is_none() {
*context = Some(cubeb::init("AudioIPC Server"));
let name = CString::new("AudioIPC Server").unwrap();
*context = Some(cubeb::Context::init(Some(name.as_c_str()), None));
}
f(context.as_ref().unwrap())
})
@ -80,7 +84,84 @@ impl rpc::Client for CallbackClient {
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
type StreamSlab = slab::Slab<cubeb::Stream<u8>, usize>;
struct ServerStreamCallbacks {
/// Size of input frame in bytes
input_frame_size: u16,
/// Size of output frame in bytes
output_frame_size: u16,
/// Shared memory buffer for sending input data to client
input_shm: SharedMemWriter,
/// Shared memory buffer for receiving output data from client
output_shm: SharedMemReader,
/// RPC interface to callback server running in client
rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
}
impl ServerStreamCallbacks {
fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
trace!("Stream data callback: {} {}", input.len(), output.len());
// len is of input and output is frame len. Turn these into the real lengths.
let real_input = unsafe {
let nbytes = input.len() * self.input_frame_size as usize;
slice::from_raw_parts(input.as_ptr(), nbytes)
};
self.input_shm.write(real_input).unwrap();
let r = self.rpc
.call(CallbackReq::Data(
output.len() as isize,
self.output_frame_size as usize,
))
.wait();
match r {
Ok(CallbackResp::Data(frames)) => {
if frames >= 0 {
let nbytes = frames as usize * self.output_frame_size as usize;
let real_output = unsafe {
trace!("Resize output to {}", nbytes);
slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
};
self.output_shm.read(&mut real_output[..nbytes]).unwrap();
}
frames
}
_ => {
debug!("Unexpected message {:?} during data_callback", r);
-1
}
}
}
fn state_callback(&mut self, state: cubeb::State) {
trace!("Stream state callback: {:?}", state);
let r = self.rpc.call(CallbackReq::State(state.into())).wait();
match r {
Ok(CallbackResp::State) => {}
_ => {
debug!("Unexpected message {:?} during callback", r);
}
}
}
}
struct ServerStream {
stream: ManuallyDrop<cubeb::Stream>,
cbs: ManuallyDrop<Box<ServerStreamCallbacks>>,
}
impl Drop for ServerStream {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.stream);
ManuallyDrop::drop(&mut self.cbs);
}
}
}
type StreamSlab = slab::Slab<ServerStream, usize>;
pub struct CubebServer {
cb_remote: Remote,
@ -167,47 +248,55 @@ impl CubebServer {
}
ServerMessage::StreamStart(stm_tok) => self.streams[stm_tok]
.stream
.start()
.map(|_| ClientMessage::StreamStarted)
.unwrap_or_else(error),
ServerMessage::StreamStop(stm_tok) => self.streams[stm_tok]
.stream
.stop()
.map(|_| ClientMessage::StreamStopped)
.unwrap_or_else(error),
ServerMessage::StreamResetDefaultDevice(stm_tok) => self.streams[stm_tok]
.stream
.reset_default_device()
.map(|_| ClientMessage::StreamDefaultDeviceReset)
.unwrap_or_else(error),
ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
.stream
.position()
.map(ClientMessage::StreamPosition)
.unwrap_or_else(error),
ServerMessage::StreamGetLatency(stm_tok) => self.streams[stm_tok]
.stream
.latency()
.map(ClientMessage::StreamLatency)
.unwrap_or_else(error),
ServerMessage::StreamSetVolume(stm_tok, volume) => self.streams[stm_tok]
.stream
.set_volume(volume)
.map(|_| ClientMessage::StreamVolumeSet)
.unwrap_or_else(error),
ServerMessage::StreamSetPanning(stm_tok, panning) => self.streams[stm_tok]
.stream
.set_panning(panning)
.map(|_| ClientMessage::StreamPanningSet)
.unwrap_or_else(error),
ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
.stream
.current_device()
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error),
};
debug!("process_msg: req={:?}, resp={:?}", msg, resp);
trace!("process_msg: req={:?}, resp={:?}", msg, resp);
resp
}
@ -236,17 +325,13 @@ impl CubebServer {
.unwrap_or(0u16)
}
// TODO: Yuck!
let input_device = params.input_device as *const _;
let output_device = params.output_device as *const _;
let latency = params.latency_frames;
// Create the callback handling struct which is attached the cubeb stream.
let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
let (stm1, stm2) = net::UnixStream::pair()?;
debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (mut input_shm, input_file) =
let (input_shm, input_file) =
SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
let (output_shm, output_file) =
SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
@ -270,112 +355,89 @@ impl CubebServer {
Ok(())
});
let rpc_data: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
Ok(rpc) => rpc,
Err(_) => bail!("Failed to create callback rpc."),
};
let rpc_state = rpc_data.clone();
let mut builder = cubeb::StreamBuilder::new();
let cbs = Box::new(ServerStreamCallbacks {
input_frame_size,
output_frame_size,
input_shm,
output_shm,
rpc,
});
if let Some(ref stream_name) = params.stream_name {
builder.name(stream_name.clone());
}
// Create cubeb stream from params
let stream_name = params
.stream_name
.as_ref()
.and_then(|name| CStr::from_bytes_with_nul(name).ok());
if let Some(ref isp) = params.input_stream_params {
let input_stream_params =
unsafe { cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _) };
builder.input(input_device, input_stream_params);
}
let input_device = params.input_device as *const _;
let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
});
if let Some(ref osp) = params.output_stream_params {
let output_stream_params =
unsafe { cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _) };
builder.output(output_device, output_stream_params);
}
let output_device = params.output_device as *const _;
let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
});
builder
.latency(latency)
.data_callback(move |input, output| {
trace!("Stream data callback: {} {}", input.len(), output.len());
let latency = params.latency_frames;
assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
let user_ptr = cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
// len is of input and output is frame len. Turn these into the real lengths.
let real_input = unsafe {
let nbytes = input.len() * input_frame_size as usize;
slice::from_raw_parts(input.as_ptr(), nbytes)
};
unsafe {
context
.stream_init(
stream_name,
input_device,
input_stream_params,
output_device,
output_stream_params,
latency,
Some(data_cb_c),
Some(state_cb_c),
user_ptr,
)
.and_then(|stream| {
if !self.streams.has_available() {
trace!(
"server connection ran out of stream slots. reserving {} more.",
STREAM_CONN_CHUNK_SIZE
);
self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
}
input_shm.write(real_input).unwrap();
let stm_tok = match self.streams.vacant_entry() {
Some(entry) => {
debug!("Registering stream {:?}", entry.index(),);
let r = rpc_data
.call(CallbackReq::Data(
output.len() as isize,
output_frame_size as usize,
))
.wait();
match r {
Ok(CallbackResp::Data(frames)) => {
if frames >= 0 {
let nbytes = frames as usize * output_frame_size as usize;
let real_output = unsafe {
trace!("Resize output to {}", nbytes);
slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
};
output_shm.read(&mut real_output[..nbytes]).unwrap();
entry
.insert(ServerStream {
stream: ManuallyDrop::new(stream),
cbs: ManuallyDrop::new(cbs),
})
.index()
}
frames
}
_ => {
debug!("Unexpected message {:?} during data_callback", r);
-1
}
}
})
.state_callback(move |state| {
trace!("Stream state callback: {:?}", state);
let r = rpc_state.call(CallbackReq::State(state.into())).wait();
match r {
Ok(CallbackResp::State) => {}
_ => {
debug!("Unexpected message {:?} during callback", r);
}
}
});
None => {
// TODO: Turn into error
panic!("Failed to insert stream into slab. No entries")
}
};
builder
.init(context)
.and_then(|stream| {
if !self.streams.has_available() {
trace!(
"server connection ran out of stream slots. reserving {} more.",
STREAM_CONN_CHUNK_SIZE
);
self.streams.reserve_exact(STREAM_CONN_CHUNK_SIZE);
}
let stm_tok = match self.streams.vacant_entry() {
Some(entry) => {
debug!("Registering stream {:?}", entry.index(),);
entry.insert(stream).index()
}
None => {
// TODO: Turn into error
panic!("Failed to insert stream into slab. No entries")
}
};
Ok(ClientMessage::StreamCreated(StreamCreate {
token: stm_tok,
fds: [
stm1.into_raw_fd(),
input_file.into_raw_fd(),
output_file.into_raw_fd(),
],
}))
})
.map_err(|e| e.into())
Ok(ClientMessage::StreamCreated(StreamCreate {
token: stm_tok,
fds: [
stm1.into_raw_fd(),
input_file.into_raw_fd(),
output_file.into_raw_fd(),
],
}))
})
.map_err(|e| e.into())
}
}
}
@ -467,3 +529,41 @@ pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
fn error(error: cubeb::Error) -> ClientMessage {
ClientMessage::Error(error.raw_code())
}
// C callable callbacks
unsafe extern "C" fn data_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
input_buffer: *const c_void,
output_buffer: *mut c_void,
nframes: c_long,
) -> c_long {
let ok = panic::catch_unwind(|| {
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
let input = if input_buffer.is_null() {
&[]
} else {
slice::from_raw_parts(input_buffer as *const u8, nframes as usize)
};
let output: &mut [u8] = if output_buffer.is_null() {
&mut []
} else {
slice::from_raw_parts_mut(output_buffer as *mut u8, nframes as usize)
};
cbs.data_callback(input, output) as c_long
});
ok.unwrap_or(0)
}
unsafe extern "C" fn state_cb_c(
_: *mut ffi::cubeb_stream,
user_ptr: *mut c_void,
state: ffi::cubeb_state,
) {
let ok = panic::catch_unwind(|| {
let state = cubeb::State::from(state);
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
cbs.state_callback(state);
});
ok.expect("State callback panicked");
}