From 38559127250fadde5130a6254a459d43628e613c Mon Sep 17 00:00:00 2001 From: Dan Glastonbury Date: Tue, 13 Feb 2018 14:39:06 +1000 Subject: [PATCH] Bug 1440538 - P2: Update audioipc to commit 933fb48. r=kinetik MozReview-Commit-ID: 29VUZKxz3xR --HG-- extra : rebase_source : 52c8d16aadbd677d22102b9fb989055cb21bb607 --- media/audioipc/README_MOZILLA | 2 +- media/audioipc/audioipc/Cargo.toml | 2 +- media/audioipc/audioipc/src/async.rs | 25 +- media/audioipc/audioipc/src/cmsg.rs | 28 +- media/audioipc/audioipc/src/codec.rs | 32 +-- media/audioipc/audioipc/src/core.rs | 24 +- media/audioipc/audioipc/src/errors.rs | 4 +- media/audioipc/audioipc/src/fd_passing.rs | 38 ++- media/audioipc/audioipc/src/frame.rs | 4 +- media/audioipc/audioipc/src/lib.rs | 19 +- media/audioipc/audioipc/src/messages.rs | 121 ++++---- media/audioipc/audioipc/src/msg.rs | 12 +- media/audioipc/audioipc/src/rpc/client/mod.rs | 22 +- .../audioipc/audioipc/src/rpc/client/proxy.rs | 18 +- media/audioipc/audioipc/src/rpc/driver.rs | 22 +- media/audioipc/audioipc/src/rpc/server.rs | 23 +- media/audioipc/audioipc/src/shm.rs | 34 +-- media/audioipc/client/Cargo.toml | 6 +- media/audioipc/client/src/context.rs | 110 ++++---- media/audioipc/client/src/lib.rs | 7 +- media/audioipc/client/src/send_recv.rs | 8 +- media/audioipc/client/src/stream.rs | 118 ++++---- media/audioipc/gecko.patch | 45 --- media/audioipc/server/Cargo.toml | 3 +- media/audioipc/server/src/lib.rs | 265 ++++++++---------- 25 files changed, 449 insertions(+), 543 deletions(-) diff --git a/media/audioipc/README_MOZILLA b/media/audioipc/README_MOZILLA index 9e0652c57bde..f77e5fbbcfb1 100644 --- a/media/audioipc/README_MOZILLA +++ b/media/audioipc/README_MOZILLA @@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system. The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git -The git commit ID used was d7798606aa590ef402344b7a519a0053725a9805 (2018-01-27 09:07:03 +1000) +The git commit ID used was 933fb48b252a10569ba8d598541577c6f2dc308f (2018-02-21 17:13:04 +1000) diff --git a/media/audioipc/audioipc/Cargo.toml b/media/audioipc/audioipc/Cargo.toml index 32924feb9ae5..0561b4cb99f8 100644 --- a/media/audioipc/audioipc/Cargo.toml +++ b/media/audioipc/audioipc/Cargo.toml @@ -8,7 +8,7 @@ authors = [ description = "Remote Cubeb IPC" [dependencies] -cubeb-core = { path = "../../cubeb-rs/cubeb-core" } +cubeb = "0.4" bincode = "0.8" bytes = "0.4" # rayon-core in Gecko uses futures 0.1.13 diff --git a/media/audioipc/audioipc/src/async.rs b/media/audioipc/audioipc/src/async.rs index d34a25b6a274..0c8a19563044 100644 --- a/media/audioipc/audioipc/src/async.rs +++ b/media/audioipc/audioipc/src/async.rs @@ -113,11 +113,11 @@ impl AsyncRecvMsg for UnixStream { cmsg.advance_mut(cmsg_len); } Ok((n, flags).into()) - }, + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.need_read(); Ok(Async::NotReady) - }, + } Err(e) => Err(e), } } @@ -139,22 +139,7 @@ impl AsyncSendMsg for UnixStream { static DUMMY: &[u8] = &[0]; let nom = <&IoVec>::from(DUMMY); let mut bufs = [ - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, - nom, + nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom ]; let n = buf.bytes_vec(&mut bufs); self.send_msg(&bufs[..n], cmsg.bytes()) @@ -163,11 +148,11 @@ impl AsyncSendMsg for UnixStream { Ok(n) => { buf.advance(n); Ok(Async::Ready(n)) - }, + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.need_write(); Ok(Async::NotReady) - }, + } Err(e) => Err(e), } } diff --git a/media/audioipc/audioipc/src/cmsg.rs b/media/audioipc/audioipc/src/cmsg.rs index 33f367f8d242..4385a7635bea 100644 --- a/media/audioipc/audioipc/src/cmsg.rs +++ b/media/audioipc/audioipc/src/cmsg.rs @@ -10,7 +10,7 @@ use std::os::unix::io::RawFd; #[derive(Clone, Debug)] pub struct Fds { - fds: Bytes + fds: Bytes, } impl convert::AsRef<[RawFd]> for Fds { @@ -30,13 +30,11 @@ impl ops::Deref for Fds { } pub struct ControlMsgIter { - control: Bytes + control: Bytes, } pub fn iterator(c: Bytes) -> ControlMsgIter { - ControlMsgIter { - control: c - } + ControlMsgIter { control: c } } impl Iterator for ControlMsgIter { @@ -73,12 +71,12 @@ impl Iterator for ControlMsgIter { (libc::SOL_SOCKET, libc::SCM_RIGHTS) => { trace!("Found SCM_RIGHTS..."); return Some(Fds { - fds: control.slice(cmsghdr_len, cmsg_len as _) + fds: control.slice(cmsghdr_len, cmsg_len as _), }); - }, + } (level, kind) => { trace!("Skipping cmsg level, {}, type={}...", level, kind); - }, + } } } } @@ -87,19 +85,17 @@ impl Iterator for ControlMsgIter { #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum Error { /// Not enough space in storage to insert control mesage. - NoSpace + NoSpace, } #[must_use] pub struct ControlMsgBuilder { - result: Result + result: Result, } pub fn builder(buf: &mut BytesMut) -> ControlMsgBuilder { let buf = aligned(buf); - ControlMsgBuilder { - result: Ok(buf) - } + ControlMsgBuilder { result: Ok(buf) } } impl ControlMsgBuilder { @@ -113,10 +109,12 @@ impl ControlMsgBuilder { let cmsghdr = cmsghdr { cmsg_len: cmsg_len as _, cmsg_level: level, - cmsg_type: kind + cmsg_type: kind, }; - let cmsghdr = unsafe { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::()) }; + let cmsghdr = unsafe { + slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::()) + }; cmsg.put_slice(cmsghdr); let mut cmsg = try!(align_buf(cmsg)); cmsg.put_slice(msg); diff --git a/media/audioipc/audioipc/src/codec.rs b/media/audioipc/audioipc/src/codec.rs index 1a0e74a22961..c7779b99d3dd 100644 --- a/media/audioipc/audioipc/src/codec.rs +++ b/media/audioipc/audioipc/src/codec.rs @@ -35,8 +35,8 @@ pub trait Codec { Some(frame) => Ok(frame), None => Err(io::Error::new( io::ErrorKind::Other, - "bytes remaining on stream" - )) + "bytes remaining on stream", + )), } } @@ -53,12 +53,12 @@ pub trait Codec { pub struct LengthDelimitedCodec { state: State, __in: PhantomData, - __out: PhantomData + __out: PhantomData, } enum State { Length, - Data(u16) + Data(u16), } impl Default for LengthDelimitedCodec { @@ -66,7 +66,7 @@ impl Default for LengthDelimitedCodec { LengthDelimitedCodec { state: State::Length, __in: PhantomData, - __out: PhantomData + __out: PhantomData, } } } @@ -89,7 +89,7 @@ impl LengthDelimitedCodec { fn decode_data(&mut self, buf: &mut BytesMut, n: u16) -> io::Result> where - Out: DeserializeOwned + Debug + Out: DeserializeOwned + Debug, { // At this point, the buffer has already had the required capacity // reserved. All there is to do is read. @@ -103,7 +103,7 @@ impl LengthDelimitedCodec { trace!("Attempting to decode"); let msg = try!(deserialize::(buf.as_ref()).map_err(|e| match *e { bincode::ErrorKind::IoError(e) => e, - _ => io::Error::new(io::ErrorKind::Other, *e) + _ => io::Error::new(io::ErrorKind::Other, *e), })); trace!("... Decoded {:?}", msg); @@ -114,7 +114,7 @@ impl LengthDelimitedCodec { impl Codec for LengthDelimitedCodec where In: Serialize + Debug, - Out: DeserializeOwned + Debug + Out: DeserializeOwned + Debug, { type In = In; type Out = Out; @@ -131,11 +131,11 @@ where buf.reserve(n as usize); n - }, - None => return Ok(None) + } + None => return Ok(None), } - }, - State::Data(n) => n + } + State::Data(n) => n, }; match try!(self.decode_data(buf, n)) { @@ -147,8 +147,8 @@ where buf.reserve(2); Ok(Some(data)) - }, - None => Ok(None) + } + None => Ok(None), } } @@ -158,7 +158,7 @@ where if encoded_len > 8 * 1024 { return Err(io::Error::new( io::ErrorKind::InvalidInput, - "encoded message too big" + "encoded message too big", )); } @@ -171,7 +171,7 @@ where { match *e { bincode::ErrorKind::IoError(e) => return Err(e), - _ => return Err(io::Error::new(io::ErrorKind::Other, *e)) + _ => return Err(io::Error::new(io::ErrorKind::Other, *e)), } } diff --git a/media/audioipc/audioipc/src/core.rs b/media/audioipc/audioipc/src/core.rs index 86fa32a2d987..40e3bc1b5611 100644 --- a/media/audioipc/audioipc/src/core.rs +++ b/media/audioipc/audioipc/src/core.rs @@ -16,7 +16,7 @@ pub fn handle() -> Handle { pub fn spawn(f: F) where - F: Future + 'static + F: Future + 'static, { HANDLE.with(|handle| handle.spawn(f)) } @@ -24,19 +24,19 @@ where pub fn spawn_fn(f: F) where F: FnOnce() -> R + 'static, - R: IntoFuture + 'static + R: IntoFuture + 'static, { HANDLE.with(|handle| handle.spawn_fn(f)) } struct Inner { join: thread::JoinHandle<()>, - shutdown: oneshot::Sender<()> + shutdown: oneshot::Sender<()>, } pub struct CoreThread { inner: Option, - remote: Remote + remote: Remote, } impl CoreThread { @@ -65,7 +65,7 @@ impl fmt::Debug for CoreThread { pub fn spawn_thread(name: S, f: F) -> io::Result where S: Into, - F: FnOnce() -> io::Result<()> + Send + 'static + F: FnOnce() -> io::Result<()> + Send + 'static, { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let (remote_tx, remote_rx) = mpsc::channel::(); @@ -85,18 +85,16 @@ where trace!("thread shutdown..."); })); - let remote = try!(remote_rx.recv().or_else(|_| { - Err(io::Error::new( - io::ErrorKind::Other, - "Failed to receive remote handle from spawned thread" - )) - })); + let remote = try!(remote_rx.recv().or_else(|_| Err(io::Error::new( + io::ErrorKind::Other, + "Failed to receive remote handle from spawned thread" + )))); Ok(CoreThread { inner: Some(Inner { join: join, - shutdown: shutdown_tx + shutdown: shutdown_tx, }), - remote: remote + remote: remote, }) } diff --git a/media/audioipc/audioipc/src/errors.rs b/media/audioipc/audioipc/src/errors.rs index 2731eb66f439..d2eeb51ab933 100644 --- a/media/audioipc/audioipc/src/errors.rs +++ b/media/audioipc/audioipc/src/errors.rs @@ -1,5 +1,5 @@ use bincode; -use cubeb_core; +use cubeb; use std; error_chain! { @@ -7,7 +7,7 @@ error_chain! { foreign_links { Bincode(bincode::Error); Io(std::io::Error); - Cubeb(cubeb_core::Error); + Cubeb(cubeb::Error); } // Replace bail!(str) with explicit errors. diff --git a/media/audioipc/audioipc/src/fd_passing.rs b/media/audioipc/audioipc/src/fd_passing.rs index 24b56f65935d..00119913f162 100644 --- a/media/audioipc/audioipc/src/fd_passing.rs +++ b/media/audioipc/audioipc/src/fd_passing.rs @@ -20,7 +20,7 @@ const FDS_CAPACITY: usize = 16; struct IncomingFds { cmsg: BytesMut, - recv_fds: Option + recv_fds: Option, } impl IncomingFds { @@ -28,7 +28,7 @@ impl IncomingFds { let capacity = c * cmsg::space(mem::size_of::<[RawFd; 3]>()); IncomingFds { cmsg: BytesMut::with_capacity(capacity), - recv_fds: None + recv_fds: None, } } @@ -60,7 +60,7 @@ impl IncomingFds { #[derive(Debug)] struct Frame { msgs: Bytes, - fds: Option + fds: Option, } /// A unified `Stream` and `Sink` interface over an I/O object, using @@ -76,12 +76,12 @@ pub struct FramedWithFds { // Sink frames: VecDeque, write_buf: BytesMut, - outgoing_fds: BytesMut + outgoing_fds: BytesMut, } impl FramedWithFds where - A: AsyncSendMsg + A: AsyncSendMsg, { // If there is a buffered frame, try to write it to `A` fn do_write(&mut self) -> Poll<(), io::Error> { @@ -102,10 +102,10 @@ where let mut msgs = frame.msgs.clone().into_buf(); let mut fds = match frame.fds { Some(ref fds) => fds.clone(), - None => Bytes::new() + None => Bytes::new(), }.into_buf(); try_ready!(self.io.send_msg_buf(&mut msgs, &fds)) - }, + } _ => { // No pending frames. return Ok(().into()); @@ -137,8 +137,8 @@ where self.frames.push_front(frame); break; } - }, - _ => panic!() + } + _ => panic!(), } } debug!("process {} frames", processed); @@ -158,10 +158,7 @@ where let msgs = self.write_buf.take().freeze(); trace!("set_frame: msgs={:?} fds={:?}", msgs, fds); - self.frames.push_back(Frame { - msgs, - fds - }); + self.frames.push_back(Frame { msgs, fds }); } } @@ -169,7 +166,7 @@ impl Stream for FramedWithFds where A: AsyncRecvMsg, C: Codec, - C::Out: AssocRawFd + C::Out: AssocRawFd, { type Item = C::Out; type Error = io::Error; @@ -226,15 +223,12 @@ impl Sink for FramedWithFds where A: AsyncSendMsg, C: Codec, - C::In: AssocRawFd + fmt::Debug + C::In: AssocRawFd + fmt::Debug, { type SinkItem = C::In; type SinkError = io::Error; - fn start_send( - &mut self, - item: Self::SinkItem - ) -> StartSend { + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { trace!("start_send: item={:?}", item); // If the buffer is already over BACKPRESSURE_THRESHOLD, @@ -295,8 +289,8 @@ pub fn framed_with_fds(io: A, codec: C) -> FramedWithFds { frames: VecDeque::new(), write_buf: BytesMut::with_capacity(INITIAL_CAPACITY), outgoing_fds: BytesMut::with_capacity( - FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>()) - ) + FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>()), + ), } } @@ -307,7 +301,7 @@ fn write_zero() -> io::Error { fn clone_into_array(slice: &[T]) -> A where A: Sized + Default + AsMut<[T]>, - T: Clone + T: Clone, { let mut a = Default::default(); >::as_mut(&mut a).clone_from_slice(slice); diff --git a/media/audioipc/audioipc/src/frame.rs b/media/audioipc/audioipc/src/frame.rs index 33b894f2682c..832f5bb16daf 100644 --- a/media/audioipc/audioipc/src/frame.rs +++ b/media/audioipc/audioipc/src/frame.rs @@ -21,7 +21,7 @@ pub struct Framed { write_buf: BytesMut, frame: Option<::Buf>, is_readable: bool, - eof: bool + eof: bool, } impl Framed @@ -159,6 +159,6 @@ pub fn framed(io: A, codec: C) -> Framed { write_buf: BytesMut::with_capacity(INITIAL_CAPACITY), frame: None, is_readable: false, - eof: false + eof: false, } } diff --git a/media/audioipc/audioipc/src/lib.rs b/media/audioipc/audioipc/src/lib.rs index 7150de97d2a0..f05c5b2a7d53 100644 --- a/media/audioipc/audioipc/src/lib.rs +++ b/media/audioipc/audioipc/src/lib.rs @@ -3,7 +3,6 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details #![allow(dead_code)] // TODO: Remove. - #![recursion_limit = "1024"] #[macro_use] extern crate error_chain; @@ -16,15 +15,15 @@ extern crate serde_derive; extern crate bincode; extern crate bytes; -extern crate cubeb_core; +extern crate cubeb; #[macro_use] extern crate futures; extern crate iovec; extern crate libc; extern crate memmap; -extern crate serde; #[macro_use] extern crate scoped_tls; +extern crate serde; extern crate tokio_core; #[macro_use] extern crate tokio_io; @@ -43,11 +42,9 @@ mod msg; pub mod shm; use iovec::IoVec; - #[cfg(target_os = "linux")] use libc::MSG_CMSG_CLOEXEC; pub use messages::{ClientMessage, ServerMessage}; - use std::env::temp_dir; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -59,7 +56,11 @@ const MSG_CMSG_CLOEXEC: libc::c_int = 0; // We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped // UnixStream type. pub trait RecvMsg { - fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)>; + fn recv_msg( + &mut self, + iov: &mut [&mut IoVec], + cmsg: &mut [u8], + ) -> io::Result<(usize, usize, i32)>; } pub trait SendMsg { @@ -67,7 +68,11 @@ pub trait SendMsg { } impl RecvMsg for T { - fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)> { + fn recv_msg( + &mut self, + iov: &mut [&mut IoVec], + cmsg: &mut [u8], + ) -> io::Result<(usize, usize, i32)> { msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC) } } diff --git a/media/audioipc/audioipc/src/messages.rs b/media/audioipc/audioipc/src/messages.rs index 7cf7b06c1f70..5250353225c9 100644 --- a/media/audioipc/audioipc/src/messages.rs +++ b/media/audioipc/audioipc/src/messages.rs @@ -3,23 +3,23 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use cubeb_core::{self, ffi}; +use cubeb::{self, ffi}; use std::ffi::{CStr, CString}; -use std::os::raw::c_char; +use std::os::raw::{c_char, c_int, c_uint}; use std::os::unix::io::RawFd; use std::ptr; #[derive(Debug, Serialize, Deserialize)] pub struct Device { pub output_name: Option>, - pub input_name: Option> + pub input_name: Option>, } -impl<'a> From> for Device { - fn from(info: cubeb_core::Device) -> Self { +impl<'a> From<&'a cubeb::DeviceRef> for Device { + fn from(info: &'a cubeb::DeviceRef) -> Self { Self { output_name: info.output_name_bytes().map(|s| s.to_vec()), - input_name: info.input_name_bytes().map(|s| s.to_vec()) + input_name: info.input_name_bytes().map(|s| s.to_vec()), } } } @@ -28,7 +28,7 @@ impl From for Device { fn from(info: ffi::cubeb_device) -> Self { Self { output_name: dup_str(info.output_name), - input_name: dup_str(info.input_name) + input_name: dup_str(info.input_name), } } } @@ -37,7 +37,7 @@ impl From for ffi::cubeb_device { fn from(info: Device) -> Self { Self { output_name: opt_str(info.output_name), - input_name: opt_str(info.input_name) + input_name: opt_str(info.input_name), } } } @@ -62,11 +62,12 @@ pub struct DeviceInfo { pub min_rate: u32, pub latency_lo: u32, - pub latency_hi: u32 + pub latency_hi: u32, } -impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo { - fn from(info: &'a ffi::cubeb_device_info) -> Self { +impl<'a> From<&'a cubeb::DeviceInfoRef> for DeviceInfo { + fn from(info: &'a cubeb::DeviceInfoRef) -> Self { + let info = unsafe { &*info.as_ptr() }; DeviceInfo { devid: info.devid as _, device_id: dup_str(info.device_id), @@ -86,7 +87,7 @@ impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo { min_rate: info.min_rate, latency_lo: info.latency_lo, - latency_hi: info.latency_hi + latency_hi: info.latency_hi, } } } @@ -112,43 +113,23 @@ impl From for ffi::cubeb_device_info { min_rate: info.min_rate, latency_lo: info.latency_lo, - latency_hi: info.latency_hi + latency_hi: info.latency_hi, } } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] pub struct StreamParams { - pub format: u32, - pub rate: u16, - pub channels: u8, - pub layout: i32, - pub prefs: i32 + pub format: ffi::cubeb_sample_format, + pub rate: c_uint, + pub channels: c_uint, + pub layout: ffi::cubeb_channel_layout, + pub prefs: ffi::cubeb_stream_prefs, } -impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams { - fn from(params: &'a ffi::cubeb_stream_params) -> Self { - assert!(params.channels <= u32::from(u8::max_value())); - - StreamParams { - format: params.format, - rate: params.rate as u16, - channels: params.channels as u8, - layout: params.layout, - prefs: params.prefs - } - } -} - -impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params { - fn from(params: &StreamParams) -> Self { - ffi::cubeb_stream_params { - format: params.format, - rate: u32::from(params.rate), - channels: u32::from(params.channels), - layout: params.layout, - prefs: params.prefs - } +impl<'a> From<&'a cubeb::StreamParamsRef> for StreamParams { + fn from(x: &cubeb::StreamParamsRef) -> StreamParams { + unsafe { *(x.as_ptr() as *mut StreamParams) } } } @@ -159,7 +140,7 @@ pub struct StreamInitParams { pub input_stream_params: Option, pub output_device: usize, pub output_stream_params: Option, - pub latency_frames: u32 + pub latency_frames: u32, } fn dup_str(s: *const c_char) -> Option> { @@ -171,23 +152,23 @@ fn dup_str(s: *const c_char) -> Option> { } } -fn opt_str(v: Option>) -> *const c_char { +fn opt_str(v: Option>) -> *mut c_char { match v { Some(v) => match CString::new(v) { Ok(s) => s.into_raw(), Err(_) => { debug!("Failed to convert bytes to CString"); - ptr::null() + ptr::null_mut() } }, - None => ptr::null() + None => ptr::null_mut(), } } #[derive(Debug, Serialize, Deserialize)] pub struct StreamCreate { pub token: usize, - pub fds: [RawFd; 3] + pub fds: [RawFd; 3], } // Client -> Server messages. @@ -215,7 +196,7 @@ pub enum ServerMessage { StreamGetLatency(usize), StreamSetVolume(usize, f32), StreamSetPanning(usize, f32), - StreamGetCurrentDevice(usize) + StreamGetCurrentDevice(usize), } // Server -> Client messages. @@ -244,19 +225,19 @@ pub enum ClientMessage { StreamPanningSet, StreamCurrentDevice(Device), - Error(ffi::cubeb_error_code) + Error(c_int), } #[derive(Debug, Deserialize, Serialize)] pub enum CallbackReq { Data(isize, usize), - State(ffi::cubeb_state) + State(ffi::cubeb_state), } #[derive(Debug, Deserialize, Serialize)] pub enum CallbackResp { Data(isize), - State + State, } pub trait AssocRawFd { @@ -265,7 +246,7 @@ pub trait AssocRawFd { } fn take_fd(&mut self, _: F) where - F: FnOnce() -> Option<[RawFd; 3]> + F: FnOnce() -> Option<[RawFd; 3]>, { } } @@ -275,16 +256,48 @@ impl AssocRawFd for ClientMessage { fn fd(&self) -> Option<[RawFd; 3]> { match *self { ClientMessage::StreamCreated(ref data) => Some(data.fds), - _ => None + _ => None, } } fn take_fd(&mut self, f: F) where - F: FnOnce() -> Option<[RawFd; 3]> + F: FnOnce() -> Option<[RawFd; 3]>, { if let ClientMessage::StreamCreated(ref mut data) = *self { data.fds = f().unwrap(); } } } + +#[cfg(test)] +mod test { + use super::StreamParams; + use cubeb::ffi; + use std::mem; + + #[test] + fn stream_params_size_check() { + assert_eq!( + mem::size_of::(), + mem::size_of::() + ) + } + + #[test] + fn stream_params_from() { + let mut raw = ffi::cubeb_stream_params::default(); + raw.format = ffi::CUBEB_SAMPLE_FLOAT32BE; + raw.rate = 96_000; + raw.channels = 32; + raw.layout = ffi::CUBEB_LAYOUT_3F1_LFE; + raw.prefs = ffi::CUBEB_STREAM_PREF_LOOPBACK; + let wrapped = ::cubeb::StreamParams::from(raw); + let params = StreamParams::from(wrapped.as_ref()); + assert_eq!(params.format, raw.format); + assert_eq!(params.rate, raw.rate); + assert_eq!(params.channels, raw.channels); + assert_eq!(params.layout, raw.layout); + assert_eq!(params.prefs, raw.prefs); + } +} diff --git a/media/audioipc/audioipc/src/msg.rs b/media/audioipc/audioipc/src/msg.rs index a884de8d900e..e8043261ead2 100644 --- a/media/audioipc/audioipc/src/msg.rs +++ b/media/audioipc/audioipc/src/msg.rs @@ -16,8 +16,8 @@ fn cvt(r: libc::ssize_t) -> io::Result { fn cvt_r libc::ssize_t>(mut f: F) -> io::Result { loop { match cvt(f()) { - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}, - other => return other + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + other => return other, } } } @@ -26,7 +26,7 @@ pub fn recv_msg_with_flags( socket: RawFd, bufs: &mut [&mut IoVec], cmsg: &mut [u8], - flags: libc::c_int + flags: libc::c_int, ) -> io::Result<(usize, usize, libc::c_int)> { let slice = iovec::as_os_slice_mut(bufs); let len = cmp::min(::max_value() as usize, slice.len()); @@ -56,7 +56,7 @@ pub fn send_msg_with_flags( socket: RawFd, bufs: &[&IoVec], cmsg: &[u8], - flags: libc::c_int + flags: libc::c_int, ) -> io::Result { let slice = iovec::as_os_slice(bufs); let len = cmp::min(::max_value() as usize, slice.len()); @@ -74,7 +74,5 @@ pub fn send_msg_with_flags( msghdr.msg_control = control; msghdr.msg_controllen = controllen as _; - cvt_r(|| unsafe { - libc::sendmsg(socket, &msghdr as *const _, flags) - }) + cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) }) } diff --git a/media/audioipc/audioipc/src/rpc/client/mod.rs b/media/audioipc/audioipc/src/rpc/client/mod.rs index 539948a49d38..4d9b151a650a 100644 --- a/media/audioipc/audioipc/src/rpc/client/mod.rs +++ b/media/audioipc/audioipc/src/rpc/client/mod.rs @@ -53,10 +53,10 @@ pub use self::proxy::{ClientProxy, Response}; pub fn bind_client( transport: C::Transport, - handle: &Handle + handle: &Handle, ) -> proxy::ClientProxy where - C: Client + C: Client, { let (tx, rx) = proxy::channel(); @@ -64,7 +64,7 @@ where let handler = ClientHandler:: { transport: transport, requests: rx, - in_flight: VecDeque::with_capacity(32) + in_flight: VecDeque::with_capacity(32), }; Driver::new(handler) }; @@ -92,16 +92,16 @@ pub trait Client: 'static { struct ClientHandler where - C: Client + C: Client, { transport: C::Transport, requests: proxy::Receiver, - in_flight: VecDeque> + in_flight: VecDeque>, } impl Handler for ClientHandler where - C: Client + C: Client, { type In = C::Response; type Out = C::Request; @@ -118,7 +118,7 @@ where } else { return Err(io::Error::new( io::ErrorKind::Other, - "request / response mismatch" + "request / response mismatch", )); } @@ -138,16 +138,16 @@ where self.in_flight.push_back(complete); Ok(Some(request).into()) - }, + } Ok(Async::Ready(None)) => { trace!(" --> client dropped"); Ok(None.into()) - }, + } Ok(Async::NotReady) => { trace!(" --> not ready"); Ok(Async::NotReady) - }, - Err(_) => unreachable!() + } + Err(_) => unreachable!(), } } diff --git a/media/audioipc/audioipc/src/rpc/client/proxy.rs b/media/audioipc/audioipc/src/rpc/client/proxy.rs index 3fe1c45dda76..8ef59f4e2e92 100644 --- a/media/audioipc/audioipc/src/rpc/client/proxy.rs +++ b/media/audioipc/audioipc/src/rpc/client/proxy.rs @@ -57,17 +57,17 @@ pub type Receiver = mpsc::UnboundedReceiver>; /// Response future returned from a client pub struct Response { - inner: oneshot::Receiver + inner: oneshot::Receiver, } pub struct ClientProxy { - tx: mpsc::UnboundedSender> + tx: mpsc::UnboundedSender>, } impl Clone for ClientProxy { fn clone(&self) -> Self { ClientProxy { - tx: self.tx.clone() + tx: self.tx.clone(), } } } @@ -78,9 +78,7 @@ pub fn channel() -> (ClientProxy, Receiver) { // Wrap the `tx` part in ClientProxy so the rpc call interface // can be implemented. - let client = ClientProxy { - tx - }; + let client = ClientProxy { tx }; (client, rx) } @@ -97,16 +95,14 @@ impl ClientProxy { // into a BrokenPipe, which conveys the proper error. let _ = self.tx.send((request, tx)); - Response { - inner: rx - } + Response { inner: rx } } } impl fmt::Debug for ClientProxy where R: fmt::Debug, - Q: fmt::Debug + Q: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "ClientProxy {{ ... }}") @@ -132,7 +128,7 @@ impl Future for Response { impl fmt::Debug for Response where - Q: fmt::Debug + Q: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Response {{ ... }}") diff --git a/media/audioipc/audioipc/src/rpc/driver.rs b/media/audioipc/audioipc/src/rpc/driver.rs index f78f8f569897..e3d36e5c31f3 100644 --- a/media/audioipc/audioipc/src/rpc/driver.rs +++ b/media/audioipc/audioipc/src/rpc/driver.rs @@ -10,7 +10,7 @@ use std::io; pub struct Driver where - T: Handler + T: Handler, { // Glue handler: T, @@ -19,19 +19,19 @@ where run: bool, // True when the transport is fully flushed - is_flushed: bool + is_flushed: bool, } impl Driver where - T: Handler + T: Handler, { /// Create a new rpc driver with the given service and transport. pub fn new(handler: T) -> Driver { Driver { handler: handler, run: true, - is_flushed: true + is_flushed: true, } } @@ -65,7 +65,7 @@ where // TODO: Should handler be infalliable? panic!("unimplemented error handling: {:?}", e); } - }, + } None => { trace!("received None"); // At this point, we just return. This works @@ -86,14 +86,14 @@ where Async::Ready(Some(message)) => { trace!(" --> got message"); try!(self.process_outgoing(message)); - }, + } Async::Ready(None) => { trace!(" --> got None"); // The service is done with the connection. break; - }, + } // Nothing to dispatch - Async::NotReady => break + Async::NotReady => break, } } @@ -121,7 +121,7 @@ where impl Future for Driver where - T: Handler + T: Handler, { type Item = (); type Error = io::Error; @@ -153,13 +153,13 @@ fn assert_send(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError AsyncSink::NotReady(_) => panic!( "sink reported itself as ready after `poll_ready` but was \ then unable to accept a message" - ) + ), } } impl fmt::Debug for Driver where - T: Handler + fmt::Debug + T: Handler + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("rpc::Handler") diff --git a/media/audioipc/audioipc/src/rpc/server.rs b/media/audioipc/audioipc/src/rpc/server.rs index 8bd942ff2c2b..34fe07d37064 100644 --- a/media/audioipc/audioipc/src/rpc/server.rs +++ b/media/audioipc/audioipc/src/rpc/server.rs @@ -49,13 +49,13 @@ use tokio_core::reactor::Handle; /// Bind an async I/O object `io` to the `server`. pub fn bind_server(transport: S::Transport, server: S, handle: &Handle) where - S: Server + S: Server, { let fut = { let handler = ServerHandler { server: server, transport: transport, - in_flight: VecDeque::with_capacity(32) + in_flight: VecDeque::with_capacity(32), }; Driver::new(handler) }; @@ -84,24 +84,23 @@ pub trait Server: 'static { fn process(&mut self, req: Self::Request) -> Self::Future; } - //////////////////////////////////////////////////////////////////////////////// struct ServerHandler where - S: Server + S: Server, { // The service handling the connection server: S, // The transport responsible for sending/receving messages over the wire transport: S::Transport, // FIFO of "in flight" responses to requests. - in_flight: VecDeque> + in_flight: VecDeque>, } impl Handler for ServerHandler where - S: Server + S: Server, { type In = S::Request; type Out = S::Response; @@ -133,7 +132,7 @@ where // Is the head of the queue ready? match self.in_flight.front() { - Some(&InFlight::Done(_)) => {}, + Some(&InFlight::Done(_)) => {} _ => { trace!(" --> not ready"); return Ok(Async::NotReady); @@ -145,8 +144,8 @@ where Some(InFlight::Done(res)) => { trace!(" --> received response"); Ok(Async::Ready(Some(res))) - }, - _ => panic!() + } + _ => panic!(), } } @@ -160,7 +159,7 @@ where enum InFlight> { Active(F), - Done(F::Item) + Done(F::Item), } impl> InFlight { @@ -169,9 +168,9 @@ impl> InFlight { InFlight::Active(ref mut f) => match f.poll() { Ok(Async::Ready(e)) => e, Err(_) => unreachable!(), - Ok(Async::NotReady) => return + Ok(Async::NotReady) => return, }, - _ => return + _ => return, }; *self = InFlight::Done(res); } diff --git a/media/audioipc/audioipc/src/shm.rs b/media/audioipc/audioipc/src/shm.rs index 2097b242ddfd..290f485caec0 100644 --- a/media/audioipc/audioipc/src/shm.rs +++ b/media/audioipc/audioipc/src/shm.rs @@ -5,7 +5,7 @@ use std::path::Path; use std::sync::atomic; pub struct SharedMemReader { - mmap: Mmap + mmap: Mmap, } impl SharedMemReader { @@ -19,12 +19,7 @@ impl SharedMemReader { file.set_len(size as u64)?; let mmap = Mmap::open(&file, Protection::Read)?; assert_eq!(mmap.len(), size); - Ok(( - SharedMemReader { - mmap - }, - file - )) + Ok((SharedMemReader { mmap }, file)) } pub fn read(&self, buf: &mut [u8]) -> Result<()> { @@ -46,7 +41,7 @@ impl SharedMemReader { } pub struct SharedMemSlice { - view: MmapViewSync + view: MmapViewSync, } impl SharedMemSlice { @@ -54,9 +49,7 @@ impl SharedMemSlice { let mmap = Mmap::open(file, Protection::Read)?; assert_eq!(mmap.len(), size); let view = mmap.into_view_sync(); - Ok(SharedMemSlice { - view - }) + Ok(SharedMemSlice { view }) } pub fn get_slice(&self, size: usize) -> Result<&[u8]> { @@ -79,13 +72,13 @@ impl SharedMemSlice { /// underlying the view is not illegally aliased. pub unsafe fn clone_view(&self) -> Self { SharedMemSlice { - view: self.view.clone() + view: self.view.clone(), } } } pub struct SharedMemWriter { - mmap: Mmap + mmap: Mmap, } impl SharedMemWriter { @@ -98,12 +91,7 @@ impl SharedMemWriter { let _ = remove_file(path); file.set_len(size as u64)?; let mmap = Mmap::open(&file, Protection::ReadWrite)?; - Ok(( - SharedMemWriter { - mmap - }, - file - )) + Ok((SharedMemWriter { mmap }, file)) } pub fn write(&mut self, buf: &[u8]) -> Result<()> { @@ -124,7 +112,7 @@ impl SharedMemWriter { } pub struct SharedMemMutSlice { - view: MmapViewSync + view: MmapViewSync, } impl SharedMemMutSlice { @@ -132,9 +120,7 @@ impl SharedMemMutSlice { let mmap = Mmap::open(file, Protection::ReadWrite)?; assert_eq!(mmap.len(), size); let view = mmap.into_view_sync(); - Ok(SharedMemMutSlice { - view - }) + Ok(SharedMemMutSlice { view }) } pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> { @@ -158,7 +144,7 @@ impl SharedMemMutSlice { /// aliased. pub unsafe fn clone_view(&self) -> Self { SharedMemMutSlice { - view: self.view.clone() + view: self.view.clone(), } } } diff --git a/media/audioipc/client/Cargo.toml b/media/audioipc/client/Cargo.toml index 9e3f8a52204d..06848317e4ba 100644 --- a/media/audioipc/client/Cargo.toml +++ b/media/audioipc/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "audioipc-client" -version = "0.2.0" +version = "0.3.0" authors = [ "Matthew Gregan ", "Dan Glastonbury " @@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server." [dependencies] audioipc = { path="../audioipc" } -cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" } -cubeb-core = { path = "../../cubeb-rs/cubeb-core" } +cubeb-backend = "0.4" +foreign-types = "0.3" # rayon-core in Gecko uses futures 0.1.13 futures = { version="=0.1.13", default-features=false, features=["use_std"] } # futures-cpupool 0.1.5 matches futures 0.1.13 diff --git a/media/audioipc/client/src/context.rs b/media/audioipc/client/src/context.rs index eb6cd3f75428..3acd459730f6 100644 --- a/media/audioipc/client/src/context.rs +++ b/media/audioipc/client/src/context.rs @@ -9,13 +9,12 @@ use audioipc::{messages, ClientMessage, ServerMessage}; use audioipc::{core, rpc}; use audioipc::codec::LengthDelimitedCodec; use audioipc::fd_passing::{framed_with_fds, FramedWithFds}; -use cubeb_backend::{Context, Ops}; -use cubeb_core::{ffi, DeviceId, DeviceType, Error, Result, StreamParams}; -use cubeb_core::binding::Binding; +use cubeb_backend::{ffi, ChannelLayout, Context, ContextOps, DeviceCollectionRef, DeviceId, + DeviceType, Error, Ops, Result, Stream, StreamParams, StreamParamsRef}; use futures::Future; use futures_cpupool::{self, CpuPool}; use libc; -use std::{fmt, io, mem}; +use std::{fmt, io, mem, ptr}; use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::os::unix::io::FromRawFd; @@ -30,8 +29,7 @@ struct CubebClient; impl rpc::Client for CubebClient { type Request = ServerMessage; type Response = ClientMessage; - type Transport = - FramedWithFds>; + type Transport = FramedWithFds>; } macro_rules! t( @@ -48,7 +46,7 @@ pub struct ClientContext { _ops: *const Ops, rpc: rpc::ClientProxy, core: core::CoreThread, - cpu_pool: CpuPool + cpu_pool: CpuPool, } impl ClientContext { @@ -79,12 +77,12 @@ fn open_server_stream() -> Result { } } -impl Context for ClientContext { - fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> { +impl ContextOps for ClientContext { + fn init(_context_name: Option<&CStr>) -> Result { fn bind_and_send_client( stream: UnixStream, handle: &Handle, - tx_rpc: &mpsc::Sender> + tx_rpc: &mpsc::Sender>, ) -> Option<()> { let transport = framed_with_fds(stream, Default::default()); let rpc = rpc::bind_client::(transport, handle); @@ -101,13 +99,16 @@ impl Context for ClientContext { let core = t!(core::spawn_thread("AudioIPC Client RPC", move || { let handle = core::handle(); - open_server_stream().ok() + open_server_stream() + .ok() .and_then(|stream| UnixStream::from_stream(stream, &handle).ok()) .and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc)) - .ok_or_else(|| io::Error::new( - io::ErrorKind::Other, - "Failed to open stream and create rpc." - )) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "Failed to open stream and create rpc.", + ) + }) })); let rpc = t!(rx_rpc.recv()); @@ -120,66 +121,72 @@ impl Context for ClientContext { _ops: &CLIENT_OPS as *const _, rpc: rpc, core: core, - cpu_pool: cpupool + cpu_pool: cpupool, }); - Ok(Box::into_raw(ctx) as *mut _) + Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) } - fn backend_id(&self) -> &'static CStr { + fn backend_id(&mut self) -> &'static CStr { assert_not_in_callback(); unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) } } - fn max_channel_count(&self) -> Result { + fn max_channel_count(&mut self) -> Result { assert_not_in_callback(); send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount()) } - fn min_latency(&self, params: &StreamParams) -> Result { + fn min_latency(&mut self, params: StreamParams) -> Result { assert_not_in_callback(); - let params = messages::StreamParams::from(unsafe { &*params.raw() }); + let params = messages::StreamParams::from(params.as_ref()); send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency()) } - fn preferred_sample_rate(&self) -> Result { + fn preferred_sample_rate(&mut self) -> Result { assert_not_in_callback(); send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate()) } - fn preferred_channel_layout(&self) -> Result { + fn preferred_channel_layout(&mut self) -> Result { assert_not_in_callback(); send_recv!(self.rpc(), ContextGetPreferredChannelLayout => ContextPreferredChannelLayout()) + .map(|l| { + ChannelLayout::from(l) + }) } - fn enumerate_devices(&self, devtype: DeviceType) -> Result { + fn enumerate_devices( + &mut self, + devtype: DeviceType, + collection: &DeviceCollectionRef, + ) -> Result<()> { assert_not_in_callback(); let v: Vec = match send_recv!(self.rpc(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) { Ok(mut v) => v.drain(..).map(|i| i.into()).collect(), - Err(e) => return Err(e) - }; - let vs = v.into_boxed_slice(); - let coll = ffi::cubeb_device_collection { - count: vs.len(), - device: vs.as_ptr() + Err(e) => return Err(e), }; + let mut vs = v.into_boxed_slice(); + let coll = unsafe { &mut *collection.as_ptr() }; + coll.device = vs.as_mut_ptr(); + coll.count = vs.len(); // Giving away the memory owned by vs. Don't free it! // Reclaimed in `device_collection_destroy`. mem::forget(vs); - Ok(coll) + Ok(()) } - fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) { + fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> { assert_not_in_callback(); unsafe { - let coll = &*collection; + let coll = &mut *collection.as_ptr(); let mut devices = Vec::from_raw_parts( coll.device as *mut ffi::cubeb_device_info, coll.count, - coll.count + coll.count, ); for dev in &mut devices { if !dev.device_id.is_null() { @@ -195,36 +202,37 @@ impl Context for ClientContext { let _ = CString::from_raw(dev.friendly_name as *mut _); } } + coll.device = ptr::null_mut(); + coll.count = 0; + Ok(()) } } fn stream_init( - &self, + &mut self, stream_name: Option<&CStr>, input_device: DeviceId, - input_stream_params: Option<&ffi::cubeb_stream_params>, + input_stream_params: Option<&StreamParamsRef>, output_device: DeviceId, - output_stream_params: Option<&ffi::cubeb_stream_params>, + output_stream_params: Option<&StreamParamsRef>, latency_frame: u32, // These params aren't sent to the server data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, - user_ptr: *mut c_void - ) -> Result<*mut ffi::cubeb_stream> { + user_ptr: *mut c_void, + ) -> Result { assert_not_in_callback(); - fn opt_stream_params( - p: Option<&ffi::cubeb_stream_params> - ) -> Option { + fn opt_stream_params(p: Option<&StreamParamsRef>) -> Option { match p { - Some(raw) => Some(messages::StreamParams::from(raw)), - None => None + Some(p) => Some(messages::StreamParams::from(p)), + None => None, } } let stream_name = match stream_name { Some(s) => Some(s.to_bytes().to_vec()), - None => None + None => None, }; let input_stream_params = opt_stream_params(input_stream_params); @@ -232,20 +240,20 @@ impl Context for ClientContext { let init_params = messages::StreamInitParams { stream_name: stream_name, - input_device: input_device.raw() as _, + input_device: input_device as usize, input_stream_params: input_stream_params, - output_device: output_device.raw() as _, + output_device: output_device as usize, output_stream_params: output_stream_params, - latency_frames: latency_frame + latency_frames: latency_frame, }; stream::init(self, init_params, data_callback, state_callback, user_ptr) } fn register_device_collection_changed( - &self, + &mut self, _dev_type: DeviceType, _collection_changed_callback: ffi::cubeb_device_collection_changed_callback, - _user_ptr: *mut c_void + _user_ptr: *mut c_void, ) -> Result<()> { assert_not_in_callback(); Ok(()) @@ -254,7 +262,7 @@ impl Context for ClientContext { impl Drop for ClientContext { fn drop(&mut self) { - info!("ClientContext drop..."); + debug!("ClientContext drop..."); let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected); unsafe { if super::G_SERVER_FD.is_some() { diff --git a/media/audioipc/client/src/lib.rs b/media/audioipc/client/src/lib.rs index 86392238a040..d12bc4038f2d 100644 --- a/media/audioipc/client/src/lib.rs +++ b/media/audioipc/client/src/lib.rs @@ -6,7 +6,7 @@ extern crate audioipc; #[macro_use] extern crate cubeb_backend; -extern crate cubeb_core; +extern crate foreign_types; extern crate futures; extern crate futures_cpupool; extern crate libc; @@ -21,8 +21,7 @@ mod context; mod stream; use context::ClientContext; -use cubeb_backend::capi; -use cubeb_core::ffi; +use cubeb_backend::{capi, ffi}; use std::os::raw::{c_char, c_int}; use std::os::unix::io::RawFd; use stream::ClientStream; @@ -49,7 +48,7 @@ static mut G_SERVER_FD: Option = None; pub unsafe extern "C" fn audioipc_client_init( c: *mut *mut ffi::cubeb, context_name: *const c_char, - server_connection: c_int + server_connection: c_int, ) -> c_int { // TODO: Windows portability (for fd). // TODO: Better way to pass extra parameters to Context impl. diff --git a/media/audioipc/client/src/send_recv.rs b/media/audioipc/client/src/send_recv.rs index 2c64f8b0c8c1..afd16192c762 100644 --- a/media/audioipc/client/src/send_recv.rs +++ b/media/audioipc/client/src/send_recv.rs @@ -1,14 +1,14 @@ -use cubeb_core::Error; -use cubeb_core::ffi; +use cubeb_backend::Error; +use std::os::raw::c_int; #[doc(hidden)] pub fn _err(e: E) -> Error where - E: Into> + E: Into>, { match e.into() { Some(e) => unsafe { Error::from_raw(e) }, - None => Error::new() + None => Error::error(), } } diff --git a/media/audioipc/client/src/stream.rs b/media/audioipc/client/src/stream.rs index 5dd01b997170..86890a1db892 100644 --- a/media/audioipc/client/src/stream.rs +++ b/media/audioipc/client/src/stream.rs @@ -10,8 +10,7 @@ use audioipc::frame::{framed, Framed}; use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage}; use audioipc::rpc; use audioipc::shm::{SharedMemMutSlice, SharedMemSlice}; -use cubeb_backend::Stream; -use cubeb_core::{ffi, Result}; +use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps}; use futures::Future; use futures_cpupool::{CpuFuture, CpuPool}; use std::ffi::CString; @@ -26,10 +25,27 @@ use tokio_uds::UnixStream; // TODO: Remove and let caller allocate based on cubeb backend requirements. const SHM_AREA_SIZE: usize = 2 * 1024 * 1024; +pub struct Device(ffi::cubeb_device); + +impl Drop for Device { + fn drop(&mut self) { + unsafe { + if !self.0.input_name.is_null() { + let _ = CString::from_raw(self.0.input_name as *mut _); + } + if !self.0.output_name.is_null() { + let _ = CString::from_raw(self.0.output_name as *mut _); + } + } + } +} + +#[derive(Debug)] pub struct ClientStream<'ctx> { - // This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops + // This must be a reference to Context for cubeb, cubeb accesses + // stream methods via stream->context->ops context: &'ctx ClientContext, - token: usize + token: usize, } struct CallbackServer { @@ -38,7 +54,7 @@ struct CallbackServer { data_cb: ffi::cubeb_data_callback, state_cb: ffi::cubeb_state_callback, user_ptr: usize, - cpu_pool: CpuPool + cpu_pool: CpuPool, } impl rpc::Server for CallbackServer { @@ -50,17 +66,16 @@ impl rpc::Server for CallbackServer { fn process(&mut self, req: Self::Request) -> Self::Future { match req { CallbackReq::Data(nframes, frame_size) => { - info!( + debug!( "stream_thread: Data Callback: nframes={} frame_size={}", - nframes, - frame_size + nframes, frame_size ); // Clone values that need to be moved into the cpu pool thread. let input_shm = unsafe { self.input_shm.clone_view() }; let mut output_shm = unsafe { self.output_shm.clone_view() }; let user_ptr = self.user_ptr; - let cb = self.data_cb; + let cb = self.data_cb.unwrap(); self.cpu_pool.spawn_fn(move || { // TODO: This is proof-of-concept. Make it better. @@ -74,25 +89,29 @@ impl rpc::Server for CallbackServer { .as_mut_ptr(); set_in_callback(true); - let nframes = cb( - ptr::null_mut(), - user_ptr as *mut c_void, - input_ptr as *const _, - output_ptr as *mut _, - nframes as _ - ); + let nframes = unsafe { + cb( + ptr::null_mut(), + user_ptr as *mut c_void, + input_ptr as *const _, + output_ptr as *mut _, + nframes as _, + ) + }; set_in_callback(false); Ok(CallbackResp::Data(nframes as isize)) }) - }, + } CallbackReq::State(state) => { - info!("stream_thread: State Callback: {:?}", state); + debug!("stream_thread: State Callback: {:?}", state); let user_ptr = self.user_ptr; - let cb = self.state_cb; + let cb = self.state_cb.unwrap(); self.cpu_pool.spawn_fn(move || { set_in_callback(true); - cb(ptr::null_mut(), user_ptr as *mut _, state); + unsafe { + cb(ptr::null_mut(), user_ptr as *mut _, state); + } set_in_callback(false); Ok(CallbackResp::State) @@ -108,8 +127,8 @@ impl<'ctx> ClientStream<'ctx> { init_params: messages::StreamInitParams, data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, - user_ptr: *mut c_void - ) -> Result<*mut ffi::cubeb_stream> { + user_ptr: *mut c_void, + ) -> Result { assert_not_in_callback(); let rpc = ctx.rpc(); @@ -138,7 +157,7 @@ impl<'ctx> ClientStream<'ctx> { data_cb: data_callback, state_cb: state_callback, user_ptr: user_data, - cpu_pool: cpu_pool + cpu_pool: cpu_pool, }; let (wait_tx, wait_rx) = mpsc::channel(); @@ -151,10 +170,11 @@ impl<'ctx> ClientStream<'ctx> { }); wait_rx.recv().unwrap(); - Ok(Box::into_raw(Box::new(ClientStream { + let stream = Box::into_raw(Box::new(ClientStream { context: ctx, - token: data.token - })) as _) + token: data.token, + })); + Ok(unsafe { Stream::from_ptr(stream as *mut _) }) } } @@ -166,79 +186,75 @@ impl<'ctx> Drop for ClientStream<'ctx> { } } -impl<'ctx> Stream for ClientStream<'ctx> { - fn start(&self) -> Result<()> { +impl<'ctx> StreamOps for ClientStream<'ctx> { + fn start(&mut self) -> Result<()> { assert_not_in_callback(); let rpc = self.context.rpc(); send_recv!(rpc, StreamStart(self.token) => StreamStarted) } - fn stop(&self) -> Result<()> { + fn stop(&mut self) -> Result<()> { assert_not_in_callback(); let rpc = self.context.rpc(); send_recv!(rpc, StreamStop(self.token) => StreamStopped) } - fn reset_default_device(&self) -> Result<()> { + fn reset_default_device(&mut self) -> Result<()> { assert_not_in_callback(); let rpc = self.context.rpc(); send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset) } - fn position(&self) -> Result { + fn position(&mut self) -> Result { assert_not_in_callback(); let rpc = self.context.rpc(); send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition()) } - fn latency(&self) -> Result { + fn latency(&mut self) -> Result { assert_not_in_callback(); let rpc = self.context.rpc(); send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency()) } - fn set_volume(&self, volume: f32) -> Result<()> { + fn set_volume(&mut self, volume: f32) -> Result<()> { assert_not_in_callback(); let rpc = self.context.rpc(); send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet) } - fn set_panning(&self, panning: f32) -> Result<()> { + fn set_panning(&mut self, panning: f32) -> Result<()> { assert_not_in_callback(); let rpc = self.context.rpc(); send_recv!(rpc, StreamSetPanning(self.token, panning) => StreamPanningSet) } - fn current_device(&self) -> Result<*const ffi::cubeb_device> { + fn current_device(&mut self) -> Result<&DeviceRef> { assert_not_in_callback(); let rpc = self.context.rpc(); match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) { - Ok(d) => Ok(Box::into_raw(Box::new(d.into()))), - Err(e) => Err(e) + Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }), + Err(e) => Err(e), } } - fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> { + fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> { assert_not_in_callback(); // It's all unsafe... - if !device.is_null() { + if device.as_ptr().is_null() { + Err(Error::error()) + } else { unsafe { - if !(*device).output_name.is_null() { - let _ = CString::from_raw((*device).output_name as *mut _); - } - if !(*device).input_name.is_null() { - let _ = CString::from_raw((*device).input_name as *mut _); - } - let _: Box = Box::from_raw(device as *mut _); + let _: Box = Box::from_raw(device.as_ptr() as *mut _); } + Ok(()) } - Ok(()) } // TODO: How do we call this back? On what thread? fn register_device_changed_callback( - &self, - _device_changed_callback: ffi::cubeb_device_changed_callback + &mut self, + _device_changed_callback: ffi::cubeb_device_changed_callback, ) -> Result<()> { assert_not_in_callback(); Ok(()) @@ -250,7 +266,7 @@ pub fn init( init_params: messages::StreamInitParams, data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, - user_ptr: *mut c_void -) -> Result<*mut ffi::cubeb_stream> { + user_ptr: *mut c_void, +) -> Result { ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr) } diff --git a/media/audioipc/gecko.patch b/media/audioipc/gecko.patch index d3ebdebf7d0a..8a7d46532a66 100644 --- a/media/audioipc/gecko.patch +++ b/media/audioipc/gecko.patch @@ -12,49 +12,4 @@ index ede6064..d0a1979 100644 [workspace] -members = ["audioipc", "client", "server", "ipctest"] +members = ["audioipc", "client", "server"] -diff --git a/audioipc/Cargo.toml b/audioipc/Cargo.toml -index 669c6ff..308cb5c 100644 ---- a/media/audioipc/audioipc/Cargo.toml -+++ b/media/audioipc/audioipc/Cargo.toml -@@ -8,7 +8,7 @@ authors = [ - description = "Remote Cubeb IPC" - - [dependencies] --cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" } -+cubeb-core = { path = "../../cubeb-rs/cubeb-core" } - bincode = "0.8" - bytes = "0.4" - # rayon-core in Gecko uses futures 0.1.13 -diff --git a/client/Cargo.toml b/client/Cargo.toml -index c81b19a..9e3f8a5 100644 ---- a/media/audioipc/client/Cargo.toml -+++ b/media/audioipc/client/Cargo.toml -@@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server." - - [dependencies] - audioipc = { path="../audioipc" } --cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" } --cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" } -+cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" } -+cubeb-core = { path = "../../cubeb-rs/cubeb-core" } - # rayon-core in Gecko uses futures 0.1.13 - futures = { version="=0.1.13", default-features=false, features=["use_std"] } - # futures-cpupool 0.1.5 matches futures 0.1.13 -diff --git a/server/Cargo.toml b/server/Cargo.toml -index 5b79b83..01463be 100644 ---- a/media/audioipc/server/Cargo.toml -+++ b/media/audioipc/server/Cargo.toml -@@ -9,8 +9,8 @@ description = "Remote cubeb server" - - [dependencies] - audioipc = { path = "../audioipc" } --cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" } --cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3.2" } -+cubeb-core = { path = "../../cubeb-rs/cubeb-core" } -+cubeb = { path = "../../cubeb-rs/cubeb-api" } - bytes = "0.4" - lazycell = "^0.4" - libc = "0.2" --- -2.10.2 diff --git a/media/audioipc/server/Cargo.toml b/media/audioipc/server/Cargo.toml index f38737daa8fb..ef497c29c225 100644 --- a/media/audioipc/server/Cargo.toml +++ b/media/audioipc/server/Cargo.toml @@ -9,8 +9,7 @@ description = "Remote cubeb server" [dependencies] audioipc = { path = "../audioipc" } -cubeb-core = { path = "../../cubeb-rs/cubeb-core" } -cubeb = { path = "../../cubeb-rs/cubeb-api" } +cubeb = "0.4" bytes = "0.4" lazycell = "^0.4" libc = "0.2" diff --git a/media/audioipc/server/src/lib.rs b/media/audioipc/server/src/lib.rs index 7be60f7fb787..38283f3459f7 100644 --- a/media/audioipc/server/src/lib.rs +++ b/media/audioipc/server/src/lib.rs @@ -7,7 +7,6 @@ extern crate log; extern crate audioipc; extern crate bytes; extern crate cubeb; -extern crate cubeb_core; extern crate futures; extern crate lazycell; extern crate libc; @@ -19,12 +18,10 @@ use audioipc::codec::LengthDelimitedCodec; use audioipc::core; use audioipc::fd_passing::{framed_with_fds, FramedWithFds}; use audioipc::frame::{framed, Framed}; -use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, DeviceInfo, ServerMessage, - StreamCreate, StreamInitParams, StreamParams}; +use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo, + ServerMessage, StreamCreate, StreamInitParams, StreamParams}; use audioipc::rpc; use audioipc::shm::{SharedMemReader, SharedMemWriter}; -use cubeb_core::binding::Binding; -use cubeb_core::ffi; use futures::Future; use futures::future::{self, FutureResult}; use futures::sync::oneshot; @@ -44,7 +41,7 @@ pub mod errors { AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind); } foreign_links { - Cubeb(::cubeb_core::Error); + Cubeb(::cubeb::Error); Io(::std::io::Error); Canceled(::futures::sync::oneshot::Canceled); } @@ -53,16 +50,17 @@ pub mod errors { use errors::*; -thread_local!(static CONTEXT_KEY: RefCell>> = RefCell::new(None)); +type ContextKey = RefCell>>; +thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None)); fn with_local_context(f: F) -> T where - F: FnOnce(&cubeb::Result) -> T + F: FnOnce(&cubeb::Result) -> T, { CONTEXT_KEY.with(|k| { let mut context = k.borrow_mut(); if context.is_none() { - *context = Some(cubeb::Context::init("AudioIPC Server", None)); + *context = Some(cubeb::init("AudioIPC Server")); } f(context.as_ref().unwrap()) }) @@ -82,97 +80,23 @@ impl rpc::Client for CallbackClient { type Transport = Framed>; } -// TODO: this should forward to the client. -struct Callback { - /// Size of input frame in bytes - input_frame_size: u16, - /// Size of output frame in bytes - output_frame_size: u16, - input_shm: SharedMemWriter, - output_shm: SharedMemReader, - rpc: rpc::ClientProxy -} - -impl cubeb::StreamCallback for Callback { - type Frame = u8; - - fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize { - trace!("Stream data callback: {} {}", input.len(), output.len()); - - // len is of input and output is frame len. Turn these into the real lengths. - let real_input = unsafe { - let size_bytes = input.len() * self.input_frame_size as usize; - slice::from_raw_parts(input.as_ptr(), size_bytes) - }; - let real_output = unsafe { - let size_bytes = output.len() * self.output_frame_size as usize; - trace!("Resize output to {}", size_bytes); - slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes) - }; - - self.input_shm.write(real_input).unwrap(); - - let r = self.rpc - .call(CallbackReq::Data( - output.len() as isize, - self.output_frame_size as usize - )) - .wait(); - - match r { - Ok(CallbackResp::Data(cb_result)) => if cb_result >= 0 { - let len = cb_result as usize * self.output_frame_size as usize; - self.output_shm.read(&mut real_output[..len]).unwrap(); - cb_result - } else { - cb_result - }, - _ => { - debug!("Unexpected message {:?} during data_callback", r); - -1 - } - } - } - - fn state_callback(&mut self, state: cubeb::State) { - info!("Stream state callback: {:?}", state); - // TODO: Share this conversion with the same code in cubeb-rs? - let state = match state { - cubeb::State::Started => ffi::CUBEB_STATE_STARTED, - cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED, - cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED, - cubeb::State::Error => ffi::CUBEB_STATE_ERROR - }; - - let r = self.rpc.call(CallbackReq::State(state)).wait(); - - match r { - Ok(CallbackResp::State) => {}, - _ => { - debug!("Unexpected message {:?} during callback", r); - } - }; - } -} - -type StreamSlab = slab::Slab, usize>; +type StreamSlab = slab::Slab, usize>; pub struct CubebServer { cb_remote: Remote, - streams: StreamSlab + streams: StreamSlab, } impl rpc::Server for CubebServer { type Request = ServerMessage; type Response = ClientMessage; type Future = FutureResult; - type Transport = - FramedWithFds>; + type Transport = FramedWithFds>; fn process(&mut self, req: Self::Request) -> Self::Future { let resp = with_local_context(|context| match *context { - Err(_) => error(cubeb::Error::new()), - Ok(ref context) => self.process_msg(context, &req) + Err(_) => error(cubeb::Error::error()), + Ok(ref context) => self.process_msg(context, &req), }); future::ok(resp) } @@ -182,7 +106,7 @@ impl CubebServer { pub fn new(cb_remote: Remote) -> Self { CubebServer { cb_remote: cb_remote, - streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE) + streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE), } } @@ -195,7 +119,7 @@ impl CubebServer { // TODO: //self.connection.client_disconnect(); ClientMessage::ClientDisconnected - }, + } ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(), @@ -219,7 +143,7 @@ impl CubebServer { .min_latency(¶ms) .map(ClientMessage::ContextMinLatency) .unwrap_or_else(error) - }, + } ServerMessage::ContextGetPreferredSampleRate => context .preferred_sample_rate() @@ -234,33 +158,33 @@ impl CubebServer { ServerMessage::ContextGetDeviceEnumeration(device_type) => context .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type)) .map(|devices| { - let v: Vec = devices.iter().map(|i| i.raw().into()).collect(); + let v: Vec = devices.iter().map(|i| i.as_ref().into()).collect(); ClientMessage::ContextEnumeratedDevices(v) }) .unwrap_or_else(error), ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params) - .unwrap_or_else(|_| error(cubeb::Error::new())), + .unwrap_or_else(|_| error(cubeb::Error::error())), ServerMessage::StreamDestroy(stm_tok) => { self.streams.remove(stm_tok); ClientMessage::StreamDestroyed - }, + } ServerMessage::StreamStart(stm_tok) => { let _ = self.streams[stm_tok].start(); ClientMessage::StreamStarted - }, + } ServerMessage::StreamStop(stm_tok) => { let _ = self.streams[stm_tok].stop(); ClientMessage::StreamStopped - }, + } ServerMessage::StreamResetDefaultDevice(stm_tok) => { let _ = self.streams[stm_tok].reset_default_device(); ClientMessage::StreamDefaultDeviceReset - }, + } ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok] .position() @@ -284,8 +208,8 @@ impl CubebServer { ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok] .current_device() - .map(|device| ClientMessage::StreamCurrentDevice(device.into())) - .unwrap_or_else(error) + .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device))) + .unwrap_or_else(error), }; debug!("process_msg: req={:?}, resp={:?}", msg, resp); @@ -297,63 +221,37 @@ impl CubebServer { fn process_stream_init( &mut self, context: &cubeb::Context, - params: &StreamInitParams + params: &StreamInitParams, ) -> Result { - fn opt_stream_params(params: Option<&StreamParams>) -> Option { - params.and_then(|p| { - let raw = ffi::cubeb_stream_params::from(p); - Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) }) - }) - } - - fn frame_size_in_bytes(params: Option) -> u16 { + fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 { params .map(|p| { - let sample_size = match p.format() { + let format = p.format.into(); + let sample_size = match format { cubeb::SampleFormat::S16LE | cubeb::SampleFormat::S16BE | cubeb::SampleFormat::S16NE => 2, cubeb::SampleFormat::Float32LE | cubeb::SampleFormat::Float32BE - | cubeb::SampleFormat::Float32NE => 4 + | cubeb::SampleFormat::Float32NE => 4, }; - let channel_count = p.channels() as u16; + let channel_count = p.channels as u16; sample_size * channel_count }) .unwrap_or(0u16) } // TODO: Yuck! - let input_device = - unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) }; - let output_device = - unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) }; + let input_device = params.input_device as *const _; + let output_device = params.output_device as *const _; let latency = params.latency_frames; - let mut builder = cubeb::StreamInitOptionsBuilder::new(); - builder - .input_device(input_device) - .output_device(output_device) - .latency(latency); - if let Some(ref stream_name) = params.stream_name { - builder.stream_name(stream_name); - } - let input_stream_params = opt_stream_params(params.input_stream_params.as_ref()); - if let Some(ref isp) = input_stream_params { - builder.input_stream_param(isp); - } - let output_stream_params = opt_stream_params(params.output_stream_params.as_ref()); - if let Some(ref osp) = output_stream_params { - builder.output_stream_param(osp); - } - let params = builder.take(); - - let input_frame_size = frame_size_in_bytes(input_stream_params); - let output_frame_size = frame_size_in_bytes(output_stream_params); + let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref()); + let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref()); let (stm1, stm2) = net::UnixStream::pair()?; - info!("Created callback pair: {:?}-{:?}", stm1, stm2); - let (input_shm, input_file) = + debug!("Created callback pair: {:?}-{:?}", stm1, stm2); + let (mut input_shm, input_file) = SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?; let (output_shm, output_file) = SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?; @@ -377,22 +275,81 @@ impl CubebServer { Ok(()) }); - let rpc: rpc::ClientProxy = match rx.wait() { + let rpc_data: rpc::ClientProxy = match rx.wait() { Ok(rpc) => rpc, - Err(_) => bail!("Failed to create callback rpc.") + Err(_) => bail!("Failed to create callback rpc."), }; + let rpc_state = rpc_data.clone(); - context - .stream_init( - ¶ms, - Callback { - input_frame_size: input_frame_size, - output_frame_size: output_frame_size, - input_shm: input_shm, - output_shm: output_shm, - rpc: rpc + let mut builder = cubeb::StreamBuilder::new(); + + if let Some(ref stream_name) = params.stream_name { + builder.name(stream_name.clone()); + } + + if let Some(ref isp) = params.input_stream_params { + let input_stream_params = + unsafe { cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _) }; + builder.input(input_device, input_stream_params); + } + + if let Some(ref osp) = params.output_stream_params { + let output_stream_params = + unsafe { cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _) }; + builder.output(output_device, output_stream_params); + } + + builder + .latency(latency) + .data_callback(move |input, output| { + trace!("Stream data callback: {} {}", input.len(), output.len()); + + // len is of input and output is frame len. Turn these into the real lengths. + let real_input = unsafe { + let nbytes = input.len() * input_frame_size as usize; + slice::from_raw_parts(input.as_ptr(), nbytes) + }; + + input_shm.write(real_input).unwrap(); + + let r = rpc_data + .call(CallbackReq::Data( + output.len() as isize, + output_frame_size as usize, + )) + .wait(); + + match r { + Ok(CallbackResp::Data(frames)) => { + if frames >= 0 { + let nbytes = frames as usize * output_frame_size as usize; + let real_output = unsafe { + trace!("Resize output to {}", nbytes); + slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes) + }; + output_shm.read(&mut real_output[..nbytes]).unwrap(); + } + frames + } + _ => { + debug!("Unexpected message {:?} during data_callback", r); + -1 + } } - ) + }) + .state_callback(move |state| { + trace!("Stream state callback: {:?}", state); + let r = rpc_state.call(CallbackReq::State(state.into())).wait(); + match r { + Ok(CallbackResp::State) => {} + _ => { + debug!("Unexpected message {:?} during callback", r); + } + } + }); + + builder + .init(context) .and_then(|stream| { if !self.streams.has_available() { trace!( @@ -407,7 +364,7 @@ impl CubebServer { debug!("Registering stream {:?}", entry.index(),); entry.insert(stream).index() - }, + } None => { // TODO: Turn into error panic!("Failed to insert stream into slab. No entries") @@ -420,7 +377,7 @@ impl CubebServer { stm1.into_raw_fd(), input_file.into_raw_fd(), output_file.into_raw_fd(), - ] + ], })) }) .map_err(|e| e.into()) @@ -429,7 +386,7 @@ impl CubebServer { struct ServerWrapper { core_thread: core::CoreThread, - callback_thread: core::CoreThread + callback_thread: core::CoreThread, } fn run() -> Result { @@ -460,7 +417,7 @@ fn run() -> Result { Ok(ServerWrapper { core_thread: core_thread, - callback_thread: callback_thread + callback_thread: callback_thread, }) } @@ -468,7 +425,7 @@ fn run() -> Result { pub extern "C" fn audioipc_server_start() -> *mut c_void { match run() { Ok(server) => Box::into_raw(Box::new(server)) as *mut _, - Err(_) => ptr::null_mut() as *mut _ + Err(_) => ptr::null_mut() as *mut _, } }