diff --git a/.cargo/config.in b/.cargo/config.in index c927aad26afd..af3b7faed9a9 100644 --- a/.cargo/config.in +++ b/.cargo/config.in @@ -32,5 +32,14 @@ git = "https://github.com/ChunMinChang/coreaudio-sys" branch = "gecko-build" replace-with = "vendored-sources" +[source."https://github.com/alexcrichton/mio-named-pipes"] +git = "https://github.com/alexcrichton/mio-named-pipes" +replace-with = "vendored-sources" + +[source."https://github.com/NikVolf/tokio-named-pipes"] +git = "https://github.com/NikVolf/tokio-named-pipes" +branch = "stable" +replace-with = "vendored-sources" + [source.vendored-sources] directory = '@top_srcdir@/third_party/rust' diff --git a/media/audioipc/Cargo.toml b/media/audioipc/Cargo.toml index d0a19799bff9..64d3e2bdb1e5 100644 --- a/media/audioipc/Cargo.toml +++ b/media/audioipc/Cargo.toml @@ -1,2 +1,3 @@ [workspace] members = ["audioipc", "client", "server"] + diff --git a/media/audioipc/README_MOZILLA b/media/audioipc/README_MOZILLA index 2cd56d4fb5db..b8f3a527d856 100644 --- a/media/audioipc/README_MOZILLA +++ b/media/audioipc/README_MOZILLA @@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system. The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git -The git commit ID used was 1c6e33942d726d71cb911fb8f57138aadf577057 (2019-07-23 15:56:25 +1200) +The git commit ID used was 959be8b71bd8d931a9e94276dde1dd718f52fca4 (2019-07-26 16:40:02 +1200) diff --git a/media/audioipc/audioipc/Cargo.toml b/media/audioipc/audioipc/Cargo.toml index f5eb7e94db2c..af57ee28682d 100644 --- a/media/audioipc/audioipc/Cargo.toml +++ b/media/audioipc/audioipc/Cargo.toml @@ -6,24 +6,32 @@ authors = [ "Dan Glastonbury " ] description = "Remote Cubeb IPC" +edition = "2018" [dependencies] -cubeb = "0.5.4" bincode = "1.0" bytes = "0.4" +cubeb = "0.5.5" futures = "0.1.18" -iovec = "0.1" -libc = "0.2" log = "0.4" -memmap = "0.5.2" +memmap = "0.7" scoped-tls = "0.1" serde = "1.*.*" serde_derive = "1.*.*" -tokio-core = "0.1" +tokio = "0.1" tokio-io = "0.1" -tokio-uds = "0.1.7" + +[target.'cfg(unix)'.dependencies] +iovec = "0.1" +libc = "0.2" +mio = "0.6.19" +mio-uds = "0.6.7" +tokio-reactor = "0.1" + +[target.'cfg(windows)'.dependencies] +mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" } +tokio-named-pipes = { git = "https://github.com/NikVolf/tokio-named-pipes", branch = "stable" } winapi = "0.3.6" -mio-named-pipes = "=0.1.5" [dependencies.error-chain] version = "0.11.0" diff --git a/media/audioipc/audioipc/src/async.rs b/media/audioipc/audioipc/src/async_msg.rs similarity index 94% rename from media/audioipc/audioipc/src/async.rs rename to media/audioipc/audioipc/src/async_msg.rs index 376b073f2129..fe682a71ed4c 100644 --- a/media/audioipc/audioipc/src/async.rs +++ b/media/audioipc/audioipc/src/async_msg.rs @@ -5,6 +5,8 @@ //! Various async helpers modelled after futures-rs and tokio-io. +#[cfg(unix)] +use crate::msg::{RecvMsg, SendMsg}; use bytes::{Buf, BufMut}; #[cfg(unix)] use futures::Async; @@ -12,7 +14,7 @@ use futures::Poll; #[cfg(unix)] use iovec::IoVec; #[cfg(unix)] -use msg::{RecvMsg, SendMsg}; +use mio::Ready; use std::io; use tokio_io::{AsyncRead, AsyncWrite}; @@ -63,7 +65,9 @@ impl AsyncRecvMsg for super::AsyncMessageStream { where B: BufMut, { - if let Async::NotReady = ::poll_read(self) { + if let Async::NotReady = + ::poll_read_ready(self, Ready::readable())? + { return Ok(Async::NotReady); } let r = unsafe { @@ -119,7 +123,7 @@ impl AsyncRecvMsg for super::AsyncMessageStream { Ok((n, flags).into()) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.need_read(); + self.clear_read_ready(mio::Ready::readable())?; Ok(Async::NotReady) } Err(e) => Err(e), @@ -134,7 +138,7 @@ impl AsyncSendMsg for super::AsyncMessageStream { B: Buf, C: Buf, { - if let Async::NotReady = ::poll_write(self) { + if let Async::NotReady = ::poll_write_ready(self)? { return Ok(Async::NotReady); } let r = { @@ -155,7 +159,7 @@ impl AsyncSendMsg for super::AsyncMessageStream { Ok(Async::Ready(n)) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.need_write(); + self.clear_write_ready()?; Ok(Async::NotReady) } Err(e) => Err(e), diff --git a/media/audioipc/audioipc/src/cmsg.rs b/media/audioipc/audioipc/src/cmsg.rs index 012e1878361e..97ff81f35f77 100644 --- a/media/audioipc/audioipc/src/cmsg.rs +++ b/media/audioipc/audioipc/src/cmsg.rs @@ -111,6 +111,7 @@ impl ControlMsgBuilder { // that, just use a pre-zeroed struct to fill out any // fields we don't care about. let zeroed = unsafe { mem::zeroed() }; + #[allow(clippy::needless_update)] let cmsghdr = cmsghdr { cmsg_len: cmsg_len as _, cmsg_level: level, @@ -122,7 +123,7 @@ impl ControlMsgBuilder { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::()) }; cmsg.put_slice(cmsghdr); - let mut cmsg = try!(align_buf(cmsg)); + let mut cmsg = align_buf(cmsg)?; cmsg.put_slice(msg); Ok(cmsg) diff --git a/media/audioipc/audioipc/src/codec.rs b/media/audioipc/audioipc/src/codec.rs index 6b60b0904561..a7e1825513b8 100644 --- a/media/audioipc/audioipc/src/codec.rs +++ b/media/audioipc/audioipc/src/codec.rs @@ -31,7 +31,7 @@ pub trait Codec { /// A default method available to be called when there are no more bytes /// available to be read from the I/O. fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result { - match try!(self.decode(buf)) { + match self.decode(buf)? { Some(frame) => Ok(frame), None => Err(io::Error::new( io::ErrorKind::Other, @@ -101,10 +101,10 @@ impl LengthDelimitedCodec { let buf = buf.split_to(n).freeze(); trace!("Attempting to decode"); - let msg = try!(deserialize::(buf.as_ref()).map_err(|e| match *e { + let msg = deserialize::(buf.as_ref()).map_err(|e| match *e { bincode::ErrorKind::Io(e) => e, _ => io::Error::new(io::ErrorKind::Other, *e), - })); + })?; trace!("... Decoded {:?}", msg); Ok(Some(msg)) @@ -122,7 +122,7 @@ where fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { let n = match self.state { State::Length => { - match try!(self.decode_length(buf)) { + match self.decode_length(buf)? { Some(n) => { self.state = State::Data(n); @@ -138,7 +138,7 @@ where State::Data(n) => n, }; - match try!(self.decode_data(buf, n)) { + match self.decode_data(buf, n)? { Some(data) => { // Update the decode state self.state = State::Length; diff --git a/media/audioipc/audioipc/src/core.rs b/media/audioipc/audioipc/src/core.rs index 5e2a027aea14..b2bde5bb6d6f 100644 --- a/media/audioipc/audioipc/src/core.rs +++ b/media/audioipc/audioipc/src/core.rs @@ -6,33 +6,9 @@ // Ease accessing reactor::Core handles. use futures::sync::oneshot; -use futures::{Future, IntoFuture}; use std::sync::mpsc; use std::{fmt, io, thread}; -use tokio_core::reactor::{Core, Handle, Remote}; - -scoped_thread_local! { - static HANDLE: Handle -} - -pub fn handle() -> Handle { - HANDLE.with(|handle| handle.clone()) -} - -pub fn spawn(f: F) -where - F: Future + 'static, -{ - HANDLE.with(|handle| handle.spawn(f)) -} - -pub fn spawn_fn(f: F) -where - F: FnOnce() -> R + 'static, - R: IntoFuture + 'static, -{ - HANDLE.with(|handle| handle.spawn_fn(f)) -} +use tokio::runtime::current_thread; struct Inner { join: thread::JoinHandle<()>, @@ -41,12 +17,12 @@ struct Inner { pub struct CoreThread { inner: Option, - remote: Remote, + handle: current_thread::Handle, } impl CoreThread { - pub fn remote(&self) -> Remote { - self.remote.clone() + pub fn handle(&self) -> current_thread::Handle { + self.handle.clone() } } @@ -61,9 +37,9 @@ impl Drop for CoreThread { } impl fmt::Debug for CoreThread { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // f.debug_tuple("CoreThread").field(&"...").finish() - f.debug_tuple("CoreThread").field(&self.remote).finish() + f.debug_tuple("CoreThread").field(&self.handle).finish() } } @@ -73,33 +49,35 @@ where F: FnOnce() -> io::Result<()> + Send + 'static, { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - let (remote_tx, remote_rx) = mpsc::channel::(); + let (handle_tx, handle_rx) = mpsc::channel::(); - let join = try!(thread::Builder::new().name(name.into()).spawn(move || { - let mut core = Core::new().expect("Failed to create reactor::Core"); - let handle = core.handle(); - let remote = handle.remote().clone(); - drop(remote_tx.send(remote)); + let join = thread::Builder::new().name(name.into()).spawn(move || { + let mut rt = + current_thread::Runtime::new().expect("Failed to create current_thread::Runtime"); + let handle = rt.handle(); + drop(handle_tx.send(handle.clone())); - drop(HANDLE.set(&handle, || { - f().and_then(|_| { - let _ = core.run(shutdown_rx); - Ok(()) - }) + rt.spawn(futures::future::lazy(|| { + let _ = f(); + Ok(()) })); - trace!("thread shutdown..."); - })); - let remote = try!(remote_rx.recv().or_else(|_| Err(io::Error::new( - io::ErrorKind::Other, - "Failed to receive remote handle from spawned thread" - )))); + let _ = rt.block_on(shutdown_rx); + trace!("thread shutdown..."); + })?; + + let handle = handle_rx.recv().or_else(|_| { + Err(io::Error::new( + io::ErrorKind::Other, + "Failed to receive remote handle from spawned thread", + )) + })?; Ok(CoreThread { inner: Some(Inner { - join: join, + join, shutdown: shutdown_tx, }), - remote: remote, + handle, }) } diff --git a/media/audioipc/audioipc/src/fd_passing.rs b/media/audioipc/audioipc/src/fd_passing.rs index b0584a4cca2b..3132bcb7de86 100644 --- a/media/audioipc/audioipc/src/fd_passing.rs +++ b/media/audioipc/audioipc/src/fd_passing.rs @@ -3,12 +3,12 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use async::{AsyncRecvMsg, AsyncSendMsg}; +use crate::async_msg::{AsyncRecvMsg, AsyncSendMsg}; +use crate::cmsg; +use crate::codec::Codec; +use crate::messages::AssocRawPlatformHandle; use bytes::{Bytes, BytesMut, IntoBuf}; -use cmsg; -use codec::Codec; use futures::{AsyncSink, Poll, Sink, StartSend, Stream}; -use messages::AssocRawPlatformHandle; use std::collections::VecDeque; use std::os::unix::io::RawFd; use std::{fmt, io, mem}; @@ -103,7 +103,8 @@ where let fds = match frame.fds { Some(ref fds) => fds.clone(), None => Bytes::new(), - }.into_buf(); + } + .into_buf(); try_ready!(self.io.send_msg_buf(&mut msgs, &fds)) } _ => { @@ -179,14 +180,14 @@ where // readable again, at which point the stream is terminated. if self.is_readable { if self.eof { - let mut item = try!(self.codec.decode_eof(&mut self.read_buf)); + let mut item = self.codec.decode_eof(&mut self.read_buf)?; item.take_platform_handles(|| self.incoming_fds.take_fds()); return Ok(Some(item).into()); } trace!("attempting to decode a frame"); - if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) { + if let Some(mut item) = self.codec.decode(&mut self.read_buf)? { trace!("frame decoded from buffer"); item.take_platform_handles(|| self.incoming_fds.take_fds()); return Ok(Some(item).into()); @@ -200,10 +201,9 @@ where // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF - let (n, _) = try_ready!( - self.io - .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg()) - ); + let (n, _) = try_ready!(self + .io + .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg())); if n == 0 { self.eof = true; @@ -230,14 +230,14 @@ where // then attempt to flush it. If after flush it's *still* // over BACKPRESSURE_THRESHOLD, then reject the send. if self.write_buf.len() > BACKPRESSURE_THRESHOLD { - try!(self.poll_complete()); + self.poll_complete()?; if self.write_buf.len() > BACKPRESSURE_THRESHOLD { return Ok(AsyncSink::NotReady(item)); } } let fds = item.platform_handles(); - try!(self.codec.encode(item, &mut self.write_buf)); + self.codec.encode(item, &mut self.write_buf)?; let fds = fds.and_then(|fds| { cmsg::builder(&mut self.outgoing_fds) .rights(&fds.0[..]) @@ -275,8 +275,8 @@ where pub fn framed_with_platformhandles(io: A, codec: C) -> FramedWithPlatformHandles { FramedWithPlatformHandles { - io: io, - codec: codec, + io, + codec, read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), incoming_fds: IncomingFds::new(FDS_CAPACITY), is_readable: false, @@ -313,8 +313,8 @@ mod tests { use libc; use std; - extern { - fn cmsghdr_bytes(size: *mut libc::size_t) -> *const libc::uint8_t; + extern "C" { + fn cmsghdr_bytes(size: *mut libc::size_t) -> *const u8; } fn cmsg_bytes() -> &'static [u8] { diff --git a/media/audioipc/audioipc/src/frame.rs b/media/audioipc/audioipc/src/frame.rs index 0bb1d93d1ae4..1b510c9bafc3 100644 --- a/media/audioipc/audioipc/src/frame.rs +++ b/media/audioipc/audioipc/src/frame.rs @@ -3,8 +3,8 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details +use crate::codec::Codec; use bytes::{Buf, Bytes, BytesMut, IntoBuf}; -use codec::Codec; use futures::{AsyncSink, Poll, Sink, StartSend, Stream}; use std::io; use tokio_io::{AsyncRead, AsyncWrite}; @@ -79,13 +79,13 @@ where // readable again, at which point the stream is terminated. if self.is_readable { if self.eof { - let frame = try!(self.codec.decode_eof(&mut self.read_buf)); + let frame = self.codec.decode_eof(&mut self.read_buf)?; return Ok(Some(frame).into()); } trace!("attempting to decode a frame"); - if let Some(frame) = try!(self.codec.decode(&mut self.read_buf)) { + if let Some(frame) = self.codec.decode(&mut self.read_buf)? { trace!("frame decoded from buffer"); return Ok(Some(frame).into()); } @@ -120,13 +120,13 @@ where // then attempt to flush it. If after flush it's *still* // over BACKPRESSURE_THRESHOLD, then reject the send. if self.write_buf.len() > BACKPRESSURE_THRESHOLD { - try!(self.poll_complete()); + self.poll_complete()?; if self.write_buf.len() > BACKPRESSURE_THRESHOLD { return Ok(AsyncSink::NotReady(item)); } } - try!(self.codec.encode(item, &mut self.write_buf)); + self.codec.encode(item, &mut self.write_buf)?; Ok(AsyncSink::Ready) } @@ -149,8 +149,8 @@ where pub fn framed(io: A, codec: C) -> Framed { Framed { - io: io, - codec: codec, + io, + codec, read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), write_buf: BytesMut::with_capacity(INITIAL_CAPACITY), frame: None, diff --git a/media/audioipc/audioipc/src/handle_passing.rs b/media/audioipc/audioipc/src/handle_passing.rs index b177047937c7..e2833b8fd22b 100644 --- a/media/audioipc/audioipc/src/handle_passing.rs +++ b/media/audioipc/audioipc/src/handle_passing.rs @@ -3,13 +3,13 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use tokio_io::{AsyncRead, AsyncWrite}; +use crate::codec::Codec; +use crate::messages::AssocRawPlatformHandle; use bytes::{Bytes, BytesMut, IntoBuf}; -use codec::Codec; use futures::{AsyncSink, Poll, Sink, StartSend, Stream}; -use messages::AssocRawPlatformHandle; use std::collections::VecDeque; use std::{fmt, io}; +use tokio_io::{AsyncRead, AsyncWrite}; const INITIAL_CAPACITY: usize = 1024; const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY; @@ -117,13 +117,13 @@ where // readable again, at which point the stream is terminated. if self.is_readable { if self.eof { - let item = try!(self.codec.decode_eof(&mut self.read_buf)); + let item = self.codec.decode_eof(&mut self.read_buf)?; return Ok(Some(item).into()); } trace!("attempting to decode a frame"); - if let Some(item) = try!(self.codec.decode(&mut self.read_buf)) { + if let Some(item) = self.codec.decode(&mut self.read_buf)? { trace!("frame decoded from buffer"); return Ok(Some(item).into()); } @@ -136,10 +136,7 @@ where // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF - let n = try_ready!( - self.io - .read_buf(&mut self.read_buf) - ); + let n = try_ready!(self.io.read_buf(&mut self.read_buf)); if n == 0 { self.eof = true; @@ -159,14 +156,17 @@ where type SinkItem = C::In; type SinkError = io::Error; - fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend { + fn start_send( + &mut self, + mut item: Self::SinkItem, + ) -> StartSend { trace!("start_send: item={:?}", item); // If the buffer is already over BACKPRESSURE_THRESHOLD, // then attempt to flush it. If after flush it's *still* // over BACKPRESSURE_THRESHOLD, then reject the send. if self.write_buf.len() > BACKPRESSURE_THRESHOLD { - try!(self.poll_complete()); + self.poll_complete()?; if self.write_buf.len() > BACKPRESSURE_THRESHOLD { return Ok(AsyncSink::NotReady(item)); } @@ -176,15 +176,21 @@ where if let Some((handles, target_pid)) = item.platform_handles() { got_handles = true; let remote_handles = unsafe { - [duplicate_platformhandle(handles[0], target_pid)?, - duplicate_platformhandle(handles[1], target_pid)?, - duplicate_platformhandle(handles[2], target_pid)?] + [ + duplicate_platformhandle(handles[0], target_pid)?, + duplicate_platformhandle(handles[1], target_pid)?, + duplicate_platformhandle(handles[2], target_pid)?, + ] }; - trace!("item handles: {:?} remote_handles: {:?}", handles, remote_handles); + trace!( + "item handles: {:?} remote_handles: {:?}", + handles, + remote_handles + ); item.take_platform_handles(|| Some(remote_handles)); } - try!(self.codec.encode(item, &mut self.write_buf)); + self.codec.encode(item, &mut self.write_buf)?; if got_handles { // Enforce splitting sends on messages that contain file @@ -214,8 +220,8 @@ where pub fn framed_with_platformhandles(io: A, codec: C) -> FramedWithPlatformHandles { FramedWithPlatformHandles { - io: io, - codec: codec, + io, + codec, read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), is_readable: false, eof: false, @@ -224,33 +230,41 @@ pub fn framed_with_platformhandles(io: A, codec: C) -> FramedWithPlatformH } } -use winapi::um::{processthreadsapi, winnt, handleapi}; -use winapi::shared::minwindef::{DWORD, FALSE}; use super::PlatformHandleType; +use winapi::shared::minwindef::{DWORD, FALSE}; +use winapi::um::{handleapi, processthreadsapi, winnt}; // source_handle is effectively taken ownership of (consumed) and // closed when duplicate_platformhandle is called. // TODO: Make this transfer more explicit via the type system. -unsafe fn duplicate_platformhandle(source_handle: PlatformHandleType, - target_pid: DWORD) -> Result { +unsafe fn duplicate_platformhandle( + source_handle: PlatformHandleType, + target_pid: DWORD, +) -> Result { let source = processthreadsapi::GetCurrentProcess(); - let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, - FALSE, - target_pid); + let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, target_pid); if !super::valid_handle(target) { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "invalid target process")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "invalid target process", + )); } let mut target_handle = std::ptr::null_mut(); - let ok = handleapi::DuplicateHandle(source, - source_handle, - target, - &mut target_handle, - 0, - FALSE, - winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS); + let ok = handleapi::DuplicateHandle( + source, + source_handle, + target, + &mut target_handle, + 0, + FALSE, + winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS, + ); if ok == FALSE { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "DuplicateHandle failed")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "DuplicateHandle failed", + )); } Ok(target_handle) } diff --git a/media/audioipc/audioipc/src/lib.rs b/media/audioipc/audioipc/src/lib.rs index f95097345ad3..8dd05ce26f25 100644 --- a/media/audioipc/audioipc/src/lib.rs +++ b/media/audioipc/audioipc/src/lib.rs @@ -2,36 +2,20 @@ // // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details - +#![warn(unused_extern_crates)] #![recursion_limit = "1024"] #[macro_use] extern crate error_chain; - #[macro_use] extern crate log; - #[macro_use] extern crate serde_derive; - -extern crate bincode; -extern crate bytes; -extern crate cubeb; #[macro_use] extern crate futures; -extern crate iovec; -extern crate libc; -extern crate memmap; -#[macro_use] -extern crate scoped_tls; -extern crate serde; -extern crate tokio_core; #[macro_use] extern crate tokio_io; -extern crate tokio_uds; -#[cfg(windows)] -extern crate winapi; -mod async; +mod async_msg; #[cfg(unix)] mod cmsg; pub mod codec; @@ -41,7 +25,7 @@ pub mod errors; #[cfg(unix)] pub mod fd_passing; #[cfg(unix)] -pub use fd_passing as platformhandle_passing; +pub use crate::fd_passing as platformhandle_passing; #[cfg(windows)] pub mod handle_passing; #[cfg(windows)] @@ -53,14 +37,21 @@ mod msg; pub mod rpc; pub mod shm; -pub use messages::{ClientMessage, ServerMessage}; +// TODO: Remove local fork when https://github.com/tokio-rs/tokio/pull/1294 is resolved. +#[cfg(unix)] +mod tokio_uds_stream; + +pub use crate::messages::{ClientMessage, ServerMessage}; use std::env::temp_dir; use std::path::PathBuf; -#[cfg(windows)] -use std::os::windows::io::{FromRawHandle, IntoRawHandle}; +// TODO: Remove hardcoded size and allow allocation based on cubeb backend requirements. +pub const SHM_AREA_SIZE: usize = 2 * 1024 * 1024; + #[cfg(unix)] use std::os::unix::io::{FromRawFd, IntoRawFd}; +#[cfg(windows)] +use std::os::windows::io::{FromRawHandle, IntoRawHandle}; // This must match the definition of // ipc::FileDescriptor::PlatformHandleType in Gecko. @@ -93,7 +84,7 @@ struct PlatformHandleVisitor; impl<'de> serde::de::Visitor<'de> for PlatformHandleVisitor { type Value = PlatformHandle; - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { formatter.write_str("an integer between -2^63 and 2^63") } @@ -158,7 +149,7 @@ impl PlatformHandle { std::fs::File::from_raw_fd(self.0) } - pub fn as_raw(&self) -> PlatformHandleType { + pub fn as_raw(self) -> PlatformHandleType { self.0 } @@ -187,11 +178,9 @@ pub fn get_shm_path(dir: &str) -> PathBuf { #[cfg(unix)] pub mod messagestream_unix; #[cfg(unix)] -pub use messagestream_unix::*; +pub use crate::messagestream_unix::*; #[cfg(windows)] pub mod messagestream_win; #[cfg(windows)] pub use messagestream_win::*; -#[cfg(windows)] -mod tokio_named_pipes; diff --git a/media/audioipc/audioipc/src/messages.rs b/media/audioipc/audioipc/src/messages.rs index 6a6fa39a57b3..9deb79ff85d7 100644 --- a/media/audioipc/audioipc/src/messages.rs +++ b/media/audioipc/audioipc/src/messages.rs @@ -3,8 +3,8 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use PlatformHandle; -use PlatformHandleType; +use crate::PlatformHandle; +use crate::PlatformHandleType; use cubeb::{self, ffi}; use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_int, c_uint}; @@ -174,6 +174,12 @@ pub struct StreamCreate { pub target_pid: u32, } +#[derive(Debug, Serialize, Deserialize)] +pub struct RegisterDeviceCollectionChanged { + pub platform_handles: [PlatformHandle; 3], + pub target_pid: u32, +} + // Client -> Server messages. // TODO: Callbacks should be different messages types so // ServerConn::process_msg doesn't have a catch-all case. @@ -187,6 +193,8 @@ pub enum ServerMessage { ContextGetMinLatency(StreamParams), ContextGetPreferredSampleRate, ContextGetDeviceEnumeration(ffi::cubeb_device_type), + ContextSetupDeviceCollectionCallback, + ContextRegisterDeviceCollectionChanged(ffi::cubeb_device_type, bool), StreamInit(StreamInitParams), StreamDestroy(usize), @@ -199,6 +207,7 @@ pub enum ServerMessage { StreamSetVolume(usize, f32), StreamSetPanning(usize, f32), StreamGetCurrentDevice(usize), + StreamRegisterDeviceChangeCallback(usize, bool), } // Server -> Client messages. @@ -208,11 +217,13 @@ pub enum ClientMessage { ClientConnected, ClientDisconnected, - ContextBackendId(), + ContextBackendId(String), ContextMaxChannelCount(u32), ContextMinLatency(u32), ContextPreferredSampleRate(u32), ContextEnumeratedDevices(Vec), + ContextSetupDeviceCollectionCallback(RegisterDeviceCollectionChanged), + ContextRegisteredDeviceCollectionChanged, StreamCreated(StreamCreate), StreamDestroyed, @@ -225,22 +236,37 @@ pub enum ClientMessage { StreamVolumeSet, StreamPanningSet, StreamCurrentDevice(Device), + StreamRegisterDeviceChangeCallback, Error(c_int), } #[derive(Debug, Deserialize, Serialize)] pub enum CallbackReq { - Data { nframes: isize, - input_frame_size: usize, - output_frame_size: usize }, + Data { + nframes: isize, + input_frame_size: usize, + output_frame_size: usize, + }, State(ffi::cubeb_state), + DeviceChange, } #[derive(Debug, Deserialize, Serialize)] pub enum CallbackResp { Data(isize), State, + DeviceChange, +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum DeviceCollectionReq { + DeviceChange(ffi::cubeb_device_type), +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum DeviceCollectionResp { + DeviceChange, } pub trait AssocRawPlatformHandle { @@ -250,7 +276,8 @@ pub trait AssocRawPlatformHandle { fn take_platform_handles(&mut self, f: F) where - F: FnOnce() -> Option<[PlatformHandleType; 3]> { + F: FnOnce() -> Option<[PlatformHandleType; 3]>, + { assert!(f().is_none()); } } @@ -260,10 +287,22 @@ impl AssocRawPlatformHandle for ServerMessage {} impl AssocRawPlatformHandle for ClientMessage { fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> { match *self { - ClientMessage::StreamCreated(ref data) => Some(([data.platform_handles[0].as_raw(), - data.platform_handles[1].as_raw(), - data.platform_handles[2].as_raw()], - data.target_pid)), + ClientMessage::StreamCreated(ref data) => Some(( + [ + data.platform_handles[0].as_raw(), + data.platform_handles[1].as_raw(), + data.platform_handles[2].as_raw(), + ], + data.target_pid, + )), + ClientMessage::ContextSetupDeviceCollectionCallback(ref data) => Some(( + [ + data.platform_handles[0].as_raw(), + data.platform_handles[1].as_raw(), + data.platform_handles[2].as_raw(), + ], + data.target_pid, + )), _ => None, } } @@ -272,11 +311,25 @@ impl AssocRawPlatformHandle for ClientMessage { where F: FnOnce() -> Option<[PlatformHandleType; 3]>, { - if let ClientMessage::StreamCreated(ref mut data) = *self { - let handles = f().expect("platform_handles must be available when processing StreamCreated"); - data.platform_handles = [PlatformHandle::new(handles[0]), - PlatformHandle::new(handles[1]), - PlatformHandle::new(handles[2])] + match *self { + ClientMessage::StreamCreated(ref mut data) => { + let handles = + f().expect("platform_handles must be available when processing StreamCreated"); + data.platform_handles = [ + PlatformHandle::new(handles[0]), + PlatformHandle::new(handles[1]), + PlatformHandle::new(handles[2]), + ] + } + ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { + let handles = f().expect("platform_handles must be available when processing ContextSetupDeviceCollectionCallback"); + data.platform_handles = [ + PlatformHandle::new(handles[0]), + PlatformHandle::new(handles[1]), + PlatformHandle::new(handles[2]), + ] + } + _ => {} } } } diff --git a/media/audioipc/audioipc/src/messagestream_unix.rs b/media/audioipc/audioipc/src/messagestream_unix.rs index 4f6846e2b484..624e66f546a2 100644 --- a/media/audioipc/audioipc/src/messagestream_unix.rs +++ b/media/audioipc/audioipc/src/messagestream_unix.rs @@ -3,7 +3,10 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use std::os::unix::io::{IntoRawFd, FromRawFd, AsRawFd, RawFd}; +use super::tokio_uds_stream as tokio_uds; +use futures::Poll; +use mio::Ready; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net; use tokio_io::{AsyncRead, AsyncWrite}; @@ -16,7 +19,8 @@ impl MessageStream { MessageStream(stream) } - pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { + pub fn anonymous_ipc_pair( + ) -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { let pair = net::UnixStream::pair()?; Ok((MessageStream::new(pair.0), MessageStream::new(pair.1))) } @@ -25,8 +29,13 @@ impl MessageStream { MessageStream::new(net::UnixStream::from_raw_fd(raw)) } - pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result { - Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_stream(self.0, handle)?)) + pub fn into_tokio_ipc( + self, + handle: &tokio::reactor::Handle, + ) -> std::result::Result { + Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_std( + self.0, handle, + )?)) } } @@ -35,20 +44,20 @@ impl AsyncMessageStream { AsyncMessageStream(stream) } - pub fn poll_read(&self) -> futures::Async<()> { - self.0.poll_read() + pub fn poll_read_ready(&self, ready: Ready) -> Poll { + self.0.poll_read_ready(ready) } - pub fn poll_write(&self) -> futures::Async<()> { - self.0.poll_write() + pub fn clear_read_ready(&self, ready: Ready) -> Result<(), std::io::Error> { + self.0.clear_read_ready(ready) } - pub fn need_read(&self) { - self.0.need_read() + pub fn poll_write_ready(&self) -> Poll { + self.0.poll_write_ready() } - pub fn need_write(&self) { - self.0.need_write() + pub fn clear_write_ready(&self) -> Result<(), std::io::Error> { + self.0.clear_write_ready() } } diff --git a/media/audioipc/audioipc/src/messagestream_win.rs b/media/audioipc/audioipc/src/messagestream_win.rs index 78f30ec49d80..e0159cd51558 100644 --- a/media/audioipc/audioipc/src/messagestream_win.rs +++ b/media/audioipc/audioipc/src/messagestream_win.rs @@ -3,8 +3,8 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -extern crate mio_named_pipes; -use std::os::windows::io::{IntoRawHandle, FromRawHandle, AsRawHandle, RawHandle}; +use mio_named_pipes; +use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_named_pipes; @@ -18,8 +18,8 @@ impl MessageStream { MessageStream(stream) } - - pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { + pub fn anonymous_ipc_pair( + ) -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { let pipe1 = mio_named_pipes::NamedPipe::new(get_pipe_name())?; let pipe2 = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(pipe1.as_raw_handle()) }; Ok((MessageStream::new(pipe1), MessageStream::new(pipe2))) @@ -29,8 +29,13 @@ impl MessageStream { MessageStream::new(mio_named_pipes::NamedPipe::from_raw_handle(raw)) } - pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result { - Ok(AsyncMessageStream::new(tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?)) + pub fn into_tokio_ipc( + self, + handle: &tokio::reactor::Handle, + ) -> std::result::Result { + Ok(AsyncMessageStream::new( + tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?, + )) } } @@ -38,22 +43,6 @@ impl AsyncMessageStream { fn new(stream: tokio_named_pipes::NamedPipe) -> AsyncMessageStream { AsyncMessageStream(stream) } - - pub fn poll_read(&self) -> futures::Async<()> { - self.0.poll_read() - } - - pub fn poll_write(&self) -> futures::Async<()> { - self.0.poll_write() - } - - pub fn need_read(&self) { - self.0.need_read() - } - - pub fn need_write(&self) { - self.0.need_write() - } } impl std::io::Read for AsyncMessageStream { diff --git a/media/audioipc/audioipc/src/msg.rs b/media/audioipc/audioipc/src/msg.rs index de7c0748c859..d41cc5870349 100644 --- a/media/audioipc/audioipc/src/msg.rs +++ b/media/audioipc/audioipc/src/msg.rs @@ -3,7 +3,7 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details. -use iovec::unix as iovec; +use iovec::unix; use iovec::IoVec; use libc; use std::os::unix::io::{AsRawFd, RawFd}; @@ -68,7 +68,7 @@ pub fn recv_msg_with_flags( cmsg: &mut [u8], flags: libc::c_int, ) -> io::Result<(usize, usize, libc::c_int)> { - let slice = iovec::as_os_slice_mut(bufs); + let slice = unix::as_os_slice_mut(bufs); let len = cmp::min(::max_value() as usize, slice.len()); let (control, controllen) = if cmsg.is_empty() { (ptr::null_mut(), 0) @@ -84,9 +84,7 @@ pub fn recv_msg_with_flags( msghdr.msg_control = control; msghdr.msg_controllen = controllen as _; - let n = try!(cvt_r(|| unsafe { - libc::recvmsg(socket, &mut msghdr as *mut _, flags) - })); + let n = cvt_r(|| unsafe { libc::recvmsg(socket, &mut msghdr as *mut _, flags) })?; let controllen = msghdr.msg_controllen as usize; Ok((n, controllen, msghdr.msg_flags)) @@ -98,7 +96,7 @@ pub fn send_msg_with_flags( cmsg: &[u8], flags: libc::c_int, ) -> io::Result { - let slice = iovec::as_os_slice(bufs); + let slice = unix::as_os_slice(bufs); let len = cmp::min(::max_value() as usize, slice.len()); let (control, controllen) = if cmsg.is_empty() { (ptr::null_mut(), 0) diff --git a/media/audioipc/audioipc/src/rpc/client/mod.rs b/media/audioipc/audioipc/src/rpc/client/mod.rs index 58f2ca4de0ac..da489a82e1f3 100644 --- a/media/audioipc/audioipc/src/rpc/client/mod.rs +++ b/media/audioipc/audioipc/src/rpc/client/mod.rs @@ -39,22 +39,19 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::rpc::driver::Driver; +use crate::rpc::Handler; use futures::sync::oneshot; use futures::{Async, Future, Poll, Sink, Stream}; -use rpc::driver::Driver; -use rpc::Handler; use std::collections::VecDeque; use std::io; -use tokio_core::reactor::Handle; +use tokio::runtime::current_thread; mod proxy; pub use self::proxy::{ClientProxy, Response}; -pub fn bind_client( - transport: C::Transport, - handle: &Handle, -) -> proxy::ClientProxy +pub fn bind_client(transport: C::Transport) -> proxy::ClientProxy where C: Client, { @@ -62,7 +59,7 @@ where let fut = { let handler = ClientHandler:: { - transport: transport, + transport, requests: rx, in_flight: VecDeque::with_capacity(32), }; @@ -70,7 +67,7 @@ where }; // Spawn the RPC driver into task - handle.spawn(Box::new(fut.map_err(|_| ()))); + current_thread::spawn(fut.map_err(|_| ())); tx } diff --git a/media/audioipc/audioipc/src/rpc/client/proxy.rs b/media/audioipc/audioipc/src/rpc/client/proxy.rs index d388a071eabb..bd44110d5985 100644 --- a/media/audioipc/audioipc/src/rpc/client/proxy.rs +++ b/media/audioipc/audioipc/src/rpc/client/proxy.rs @@ -104,7 +104,7 @@ where R: fmt::Debug, Q: fmt::Debug, { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "ClientProxy {{ ... }}") } } @@ -130,7 +130,7 @@ impl fmt::Debug for Response where Q: fmt::Debug, { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Response {{ ... }}") } } diff --git a/media/audioipc/audioipc/src/rpc/driver.rs b/media/audioipc/audioipc/src/rpc/driver.rs index 35189ab3062e..91463deaa7f9 100644 --- a/media/audioipc/audioipc/src/rpc/driver.rs +++ b/media/audioipc/audioipc/src/rpc/driver.rs @@ -3,8 +3,8 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details +use crate::rpc::Handler; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; -use rpc::Handler; use std::fmt; use std::io; @@ -29,7 +29,7 @@ where /// Create a new rpc driver with the given service and transport. pub fn new(handler: T) -> Driver { Driver { - handler: handler, + handler, run: true, is_flushed: true, } @@ -43,8 +43,8 @@ where /// Process incoming messages off the transport. fn receive_incoming(&mut self) -> io::Result<()> { while self.run { - if let Async::Ready(req) = try!(self.handler.transport().poll()) { - try!(self.process_incoming(req)); + if let Async::Ready(req) = self.handler.transport().poll()? { + self.process_incoming(req)?; } else { break; } @@ -82,10 +82,10 @@ where fn send_outgoing(&mut self) -> io::Result<()> { trace!("send_responses"); loop { - match try!(self.handler.produce()) { + match self.handler.produce()? { Async::Ready(Some(message)) => { trace!(" --> got message"); - try!(self.process_outgoing(message)); + self.process_outgoing(message)?; } Async::Ready(None) => { trace!(" --> got None"); @@ -103,13 +103,13 @@ where fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> { trace!("process_outgoing"); - try!(assert_send(&mut self.handler.transport(), message)); + assert_send(&mut self.handler.transport(), message)?; Ok(()) } fn flush(&mut self) -> io::Result<()> { - self.is_flushed = try!(self.handler.transport().poll_complete()).is_ready(); + self.is_flushed = self.handler.transport().poll_complete()?.is_ready(); // TODO: Ok(()) @@ -131,13 +131,13 @@ where trace!("rpc::Driver::tick"); // First read off data from the socket - try!(self.receive_incoming()); + self.receive_incoming()?; // Handle completed responses - try!(self.send_outgoing()); + self.send_outgoing()?; // Try flushing buffered writes - try!(self.flush()); + self.flush()?; if self.is_done() { trace!(" --> is done."); @@ -150,7 +150,7 @@ where } fn assert_send(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> { - match try!(s.start_send(item)) { + match s.start_send(item)? { AsyncSink::Ready => Ok(()), AsyncSink::NotReady(_) => panic!( "sink reported itself as ready after `poll_ready` but was \ @@ -163,7 +163,7 @@ impl fmt::Debug for Driver where T: Handler + fmt::Debug, { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("rpc::Handler") .field("handler", &self.handler) .field("run", &self.run) diff --git a/media/audioipc/audioipc/src/rpc/server.rs b/media/audioipc/audioipc/src/rpc/server.rs index 92156c241d4d..d08fedd93606 100644 --- a/media/audioipc/audioipc/src/rpc/server.rs +++ b/media/audioipc/audioipc/src/rpc/server.rs @@ -39,29 +39,29 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::rpc::driver::Driver; +use crate::rpc::Handler; use futures::{Async, Future, Poll, Sink, Stream}; -use rpc::driver::Driver; -use rpc::Handler; use std::collections::VecDeque; use std::io; -use tokio_core::reactor::Handle; +use tokio::runtime::current_thread; /// Bind an async I/O object `io` to the `server`. -pub fn bind_server(transport: S::Transport, server: S, handle: &Handle) +pub fn bind_server(transport: S::Transport, server: S) where S: Server, { let fut = { let handler = ServerHandler { - server: server, - transport: transport, + server, + transport, in_flight: VecDeque::with_capacity(32), }; Driver::new(handler) }; // Spawn the RPC driver into task - handle.spawn(Box::new(fut.map_err(|_| ()))) + current_thread::spawn(fut.map_err(|_| ())) } pub trait Server: 'static { diff --git a/media/audioipc/audioipc/src/shm.rs b/media/audioipc/audioipc/src/shm.rs index b7f7576e3a27..66f6a64e7645 100644 --- a/media/audioipc/audioipc/src/shm.rs +++ b/media/audioipc/audioipc/src/shm.rs @@ -3,11 +3,12 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details. -use errors::*; -use memmap::{Mmap, MmapViewSync, Protection}; +use crate::errors::*; +use memmap::{Mmap, MmapMut, MmapOptions}; +use std::cell::UnsafeCell; use std::fs::{remove_file, File, OpenOptions}; use std::path::Path; -use std::sync::atomic; +use std::sync::{atomic, Arc}; pub struct SharedMemReader { mmap: Mmap, @@ -22,7 +23,7 @@ impl SharedMemReader { .open(path)?; let _ = remove_file(path); file.set_len(size as u64)?; - let mmap = Mmap::open(&file, Protection::Read)?; + let mmap = unsafe { MmapOptions::new().map(&file)? }; assert_eq!(mmap.len(), size); Ok((SharedMemReader { mmap }, file)) } @@ -34,10 +35,8 @@ impl SharedMemReader { // TODO: Track how much is in the shm area. if buf.len() <= self.mmap.len() { atomic::fence(atomic::Ordering::Acquire); - unsafe { - let len = buf.len(); - buf.copy_from_slice(&self.mmap.as_slice()[..len]); - } + let len = buf.len(); + buf.copy_from_slice(&self.mmap[..len]); Ok(()) } else { bail!("mmap size"); @@ -46,15 +45,15 @@ impl SharedMemReader { } pub struct SharedMemSlice { - view: MmapViewSync, + mmap: Arc, } impl SharedMemSlice { pub fn from(file: &File, size: usize) -> Result { - let mmap = Mmap::open(file, Protection::Read)?; + let mmap = unsafe { MmapOptions::new().map(file)? }; assert_eq!(mmap.len(), size); - let view = mmap.into_view_sync(); - Ok(SharedMemSlice { view }) + let mmap = Arc::new(mmap); + Ok(SharedMemSlice { mmap }) } pub fn get_slice(&self, size: usize) -> Result<&[u8]> { @@ -62,28 +61,30 @@ impl SharedMemSlice { return Ok(&[]); } // TODO: Track how much is in the shm area. - if size <= self.view.len() { + if size <= self.mmap.len() { atomic::fence(atomic::Ordering::Acquire); - let buf = unsafe { &self.view.as_slice()[..size] }; + let buf = &self.mmap[..size]; Ok(buf) } else { bail!("mmap size"); } } - /// Clones the view of the memory map. + /// Clones the memory map. /// - /// The underlying memory map is shared, and thus the caller must ensure that the memory - /// underlying the view is not illegally aliased. - pub unsafe fn clone_view(&self) -> Self { + /// The underlying memory map is shared, and thus the caller must + /// ensure that the memory is not illegally aliased. + pub unsafe fn unsafe_clone(&self) -> Self { SharedMemSlice { - view: self.view.clone(), + mmap: self.mmap.clone(), } } } +unsafe impl Send for SharedMemSlice {} + pub struct SharedMemWriter { - mmap: Mmap, + mmap: MmapMut, } impl SharedMemWriter { @@ -95,7 +96,7 @@ impl SharedMemWriter { .open(path)?; let _ = remove_file(path); file.set_len(size as u64)?; - let mmap = Mmap::open(&file, Protection::ReadWrite)?; + let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; Ok((SharedMemWriter { mmap }, file)) } @@ -105,9 +106,7 @@ impl SharedMemWriter { } // TODO: Track how much is in the shm area. if buf.len() <= self.mmap.len() { - unsafe { - self.mmap.as_mut_slice()[..buf.len()].copy_from_slice(buf); - } + self.mmap[..buf.len()].copy_from_slice(buf); atomic::fence(atomic::Ordering::Release); Ok(()) } else { @@ -117,15 +116,15 @@ impl SharedMemWriter { } pub struct SharedMemMutSlice { - view: MmapViewSync, + mmap: Arc>, } impl SharedMemMutSlice { pub fn from(file: &File, size: usize) -> Result { - let mmap = Mmap::open(file, Protection::ReadWrite)?; + let mmap = unsafe { MmapOptions::new().map_mut(file)? }; assert_eq!(mmap.len(), size); - let view = mmap.into_view_sync(); - Ok(SharedMemMutSlice { view }) + let mmap = Arc::new(UnsafeCell::new(mmap)); + Ok(SharedMemMutSlice { mmap }) } pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> { @@ -133,8 +132,8 @@ impl SharedMemMutSlice { return Ok(&mut []); } // TODO: Track how much is in the shm area. - if size <= self.view.len() { - let buf = unsafe { &mut self.view.as_mut_slice()[..size] }; + if size <= self.inner().len() { + let buf = &mut self.inner_mut()[..size]; atomic::fence(atomic::Ordering::Release); Ok(buf) } else { @@ -142,14 +141,23 @@ impl SharedMemMutSlice { } } - /// Clones the view of the memory map. + /// Clones the memory map. /// /// The underlying memory map is shared, and thus the caller must - /// ensure that the memory underlying the view is not illegally - /// aliased. - pub unsafe fn clone_view(&self) -> Self { + /// ensure that the memory is not illegally aliased. + pub unsafe fn unsafe_clone(&self) -> Self { SharedMemMutSlice { - view: self.view.clone(), + mmap: self.mmap.clone(), } } + + fn inner(&self) -> &MmapMut { + unsafe { &*self.mmap.get() } + } + + fn inner_mut(&mut self) -> &mut MmapMut { + unsafe { &mut *self.mmap.get() } + } } + +unsafe impl Send for SharedMemMutSlice {} diff --git a/media/audioipc/audioipc/src/tokio_named_pipes.rs b/media/audioipc/audioipc/src/tokio_named_pipes.rs deleted file mode 100644 index b7be123446b6..000000000000 --- a/media/audioipc/audioipc/src/tokio_named_pipes.rs +++ /dev/null @@ -1,175 +0,0 @@ -// From https://github.com/alexcrichton/tokio-named-pipes/commit/3a22f8fc9a441b548aec25bd5df3b1e0ab99fabe -// License MIT/Apache-2.0 -// Sloppily updated to be compatible with tokio_io -// To be replaced with tokio_named_pipes crate after tokio 0.1 update. -#![cfg(windows)] - -extern crate bytes; -extern crate tokio_core; -extern crate mio_named_pipes; -extern crate futures; - -use std::ffi::OsStr; -use std::fmt; -use std::io::{self, Read, Write}; -use std::os::windows::io::*; - -use futures::{Async, Poll}; -use bytes::{BufMut, Buf}; -#[allow(deprecated)] -use tokio_core::io::Io; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_core::reactor::{PollEvented, Handle}; - -pub struct NamedPipe { - io: PollEvented, -} - -impl NamedPipe { - pub fn new>(p: P, handle: &Handle) -> io::Result { - NamedPipe::_new(p.as_ref(), handle) - } - - fn _new(p: &OsStr, handle: &Handle) -> io::Result { - let inner = try!(mio_named_pipes::NamedPipe::new(p)); - NamedPipe::from_pipe(inner, handle) - } - - pub fn from_pipe(pipe: mio_named_pipes::NamedPipe, - handle: &Handle) - -> io::Result { - Ok(NamedPipe { - io: try!(PollEvented::new(pipe, handle)), - }) - } - - pub fn connect(&self) -> io::Result<()> { - self.io.get_ref().connect() - } - - pub fn disconnect(&self) -> io::Result<()> { - self.io.get_ref().disconnect() - } - - pub fn need_read(&self) { - self.io.need_read() - } - - pub fn need_write(&self) { - self.io.need_write() - } - - pub fn poll_read(&self) -> Async<()> { - self.io.poll_read() - } - - pub fn poll_write(&self) -> Async<()> { - self.io.poll_write() - } -} - -impl Read for NamedPipe { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.io.read(buf) - } -} - -impl Write for NamedPipe { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.io.write(buf) - } - fn flush(&mut self) -> io::Result<()> { - self.io.flush() - } -} - -#[allow(deprecated)] -impl Io for NamedPipe { - fn poll_read(&mut self) -> Async<()> { - ::poll_read(self) - } - - fn poll_write(&mut self) -> Async<()> { - ::poll_write(self) - } -} - -impl AsyncRead for NamedPipe { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { - false - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - if NamedPipe::poll_read(self).is_not_ready() { - return Ok(Async::NotReady) - } - - let mut stack_buf = [0u8; 1024]; - let bytes_read = self.io.read(&mut stack_buf); - match bytes_read { - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - self.io.need_read(); - return Ok(Async::NotReady); - }, - Err(e) => Err(e), - Ok(bytes_read) => { - buf.put_slice(&stack_buf[0..bytes_read]); - Ok(Async::Ready(bytes_read)) - } - } - } -} - -impl AsyncWrite for NamedPipe { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(().into()) - } - - fn write_buf(&mut self, buf: &mut B) -> Poll { - if NamedPipe::poll_write(self).is_not_ready() { - return Ok(Async::NotReady) - } - - let bytes_wrt = self.io.write(buf.bytes()); - match bytes_wrt { - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - self.io.need_write(); - return Ok(Async::NotReady); - }, - Err(e) => Err(e), - Ok(bytes_wrt) => { - buf.advance(bytes_wrt); - Ok(Async::Ready(bytes_wrt)) - } - } - } - -} - -impl<'a> Read for &'a NamedPipe { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - (&self.io).read(buf) - } -} - -impl<'a> Write for &'a NamedPipe { - fn write(&mut self, buf: &[u8]) -> io::Result { - (&self.io).write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - (&self.io).flush() - } -} - -impl fmt::Debug for NamedPipe { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.io.get_ref().fmt(f) - } -} - -impl AsRawHandle for NamedPipe { - fn as_raw_handle(&self) -> RawHandle { - self.io.get_ref().as_raw_handle() - } -} diff --git a/media/audioipc/audioipc/src/tokio_uds_stream.rs b/media/audioipc/audioipc/src/tokio_uds_stream.rs new file mode 100644 index 000000000000..db005d728761 --- /dev/null +++ b/media/audioipc/audioipc/src/tokio_uds_stream.rs @@ -0,0 +1,363 @@ +// Copied from tokio-uds/src/stream.rs revision 25e835c5b7e2cfeb9c22b1fd576844f6814a9477 (tokio-uds 0.2.5) +// License MIT per upstream: https://github.com/tokio-rs/tokio/blob/master/tokio-uds/LICENSE +// - Removed ucred for build simplicity +// - Added clear_{read,write}_ready per: https://github.com/tokio-rs/tokio/pull/1294 + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_reactor::{Handle, PollEvented}; + +use bytes::{Buf, BufMut}; +use futures::{Async, Future, Poll}; +use iovec::{self, IoVec}; +use libc; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io::{self, Read, Write}; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// A structure representing a connected Unix socket. +/// +/// This socket can be connected directly with `UnixStream::connect` or accepted +/// from a listener with `UnixListener::incoming`. Additionally, a pair of +/// anonymous Unix sockets can be created with `UnixStream::pair`. +pub struct UnixStream { + io: PollEvented, +} + +/// Future returned by `UnixStream::connect` which will resolve to a +/// `UnixStream` when the stream is connected. +#[derive(Debug)] +pub struct ConnectFuture { + inner: State, +} + +#[derive(Debug)] +enum State { + Waiting(UnixStream), + Error(io::Error), + Empty, +} + +impl UnixStream { + /// Connects to the socket named by `path`. + /// + /// This function will create a new Unix socket and connect to the path + /// specified, associating the returned stream with the default event loop's + /// handle. + pub fn connect

(path: P) -> ConnectFuture + where + P: AsRef, + { + let res = mio_uds::UnixStream::connect(path).map(UnixStream::new); + + let inner = match res { + Ok(stream) => State::Waiting(stream), + Err(e) => State::Error(e), + }; + + ConnectFuture { inner } + } + + /// Consumes a `UnixStream` in the standard library and returns a + /// nonblocking `UnixStream` from this crate. + /// + /// The returned stream will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result { + let stream = mio_uds::UnixStream::from_stream(stream)?; + let io = PollEvented::new_with_handle(stream, handle)?; + + Ok(UnixStream { io }) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected Unix sockets for + /// communicating back and forth between one another. Each socket will + /// be associated with the default event loop's handle. + pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = mio_uds::UnixStream::pair()?; + let a = UnixStream::new(a); + let b = UnixStream::new(b); + + Ok((a, b)) + } + + pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream { + let io = PollEvented::new(stream); + UnixStream { io } + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll { + self.io.poll_read_ready(ready) + } + + /// Clear read ready state. + pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> { + self.io.clear_read_ready(ready) + } + + /// Test whether this socket is ready to be written to or not. + pub fn poll_write_ready(&self) -> Poll { + self.io.poll_write_ready() + } + + /// Clear write ready state. + pub fn clear_write_ready(&self) -> io::Result<()> { + self.io.clear_write_ready() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result { + self.io.get_ref().local_addr() + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result { + self.io.get_ref().peer_addr() + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.io.get_ref().take_error() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } +} + +impl Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.io.read(buf) + } +} + +impl Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.io.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.io.flush() + } +} + +impl AsyncRead for UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + <&UnixStream>::read_buf(&mut &*self, buf) + } +} + +impl AsyncWrite for UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + <&UnixStream>::shutdown(&mut &*self) + } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + <&UnixStream>::write_buf(&mut &*self, buf) + } +} + +impl<'a> Read for &'a UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (&self.io).read(buf) + } +} + +impl<'a> Write for &'a UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + (&self.io).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.io).flush() + } +} + +impl<'a> AsyncRead for &'a UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + if let Async::NotReady = ::poll_read_ready(self, Ready::readable())? { + return Ok(Async::NotReady); + } + unsafe { + let r = read_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_read_ready(Ready::readable())?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance_mut(r); + Ok(r.into()) + } + } + } +} + +impl<'a> AsyncWrite for &'a UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + if let Async::NotReady = ::poll_write_ready(self)? { + return Ok(Async::NotReady); + } + unsafe { + let r = write_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance(r); + Ok(r.into()) + } + } + } +} + +impl fmt::Debug for UnixStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} + +impl Future for ConnectFuture { + type Item = UnixStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + use std::mem; + + match self.inner { + State::Waiting(ref mut stream) => { + if let Async::NotReady = stream.io.poll_write_ready()? { + return Ok(Async::NotReady); + } + + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e); + } + } + State::Error(_) => { + let e = match mem::replace(&mut self.inner, State::Empty) { + State::Error(e) => e, + _ => unreachable!(), + }; + + return Err(e); + } + State::Empty => panic!("can't poll stream twice"), + } + + match mem::replace(&mut self.inner, State::Empty) { + State::Waiting(stream) => Ok(Async::Ready(stream)), + _ => unreachable!(), + } + } +} + +unsafe fn read_ready(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a 0-length size, so we create a bunch + // of dummy versions on the stack with 1 length which we'll quickly + // overwrite. + let b1: &mut [u8] = &mut [0]; + let b2: &mut [u8] = &mut [0]; + let b3: &mut [u8] = &mut [0]; + let b4: &mut [u8] = &mut [0]; + let b5: &mut [u8] = &mut [0]; + let b6: &mut [u8] = &mut [0]; + let b7: &mut [u8] = &mut [0]; + let b8: &mut [u8] = &mut [0]; + let b9: &mut [u8] = &mut [0]; + let b10: &mut [u8] = &mut [0]; + let b11: &mut [u8] = &mut [0]; + let b12: &mut [u8] = &mut [0]; + let b13: &mut [u8] = &mut [0]; + let b14: &mut [u8] = &mut [0]; + let b15: &mut [u8] = &mut [0]; + let b16: &mut [u8] = &mut [0]; + let mut bufs: [&mut IoVec; 16] = [ + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + b6.into(), + b7.into(), + b8.into(), + b9.into(), + b10.into(), + b11.into(), + b12.into(), + b13.into(), + b14.into(), + b15.into(), + b16.into(), + ]; + + let n = buf.bytes_vec_mut(&mut bufs); + read_ready_vecs(&mut bufs[..n], raw_fd) +} + +unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice_mut(bufs); + + libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} + +unsafe fn write_ready(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a zero-length size, so create a dummy + // version from a 1-length slice which we'll overwrite with the + // `bytes_vec` method. + static DUMMY: &[u8] = &[0]; + let iovec = <&IoVec>::from(DUMMY); + let mut bufs = [ + iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, + iovec, iovec, iovec, + ]; + + let n = buf.bytes_vec(&mut bufs); + write_ready_vecs(&bufs[..n], raw_fd) +} + +unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice(bufs); + + libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} diff --git a/media/audioipc/client/Cargo.toml b/media/audioipc/client/Cargo.toml index 90fa464f2a98..ffab183b8d32 100644 --- a/media/audioipc/client/Cargo.toml +++ b/media/audioipc/client/Cargo.toml @@ -6,17 +6,14 @@ authors = [ "Dan Glastonbury " ] description = "Cubeb Backend for talking to remote cubeb server." +edition = "2018" [dependencies] +audio_thread_priority = "0.15.0" audioipc = { path="../audioipc" } -cubeb-backend = "0.5.4" -foreign-types = "0.3" +cubeb-backend = "0.5.5" futures = { version="0.1.18", default-features=false, features=["use_std"] } futures-cpupool = { version="0.1.8", default-features=false } -libc = "0.2" -log = "0.4" -tokio-core = "0.1" -tokio-uds = "0.1.7" -audio_thread_priority = "0.15.0" lazy_static = "1.2.0" -cfg-if = "0.1.0" +log = "0.4" +tokio = "0.1" diff --git a/media/audioipc/client/src/context.rs b/media/audioipc/client/src/context.rs index 6c959d4faccc..86d0ac988a15 100644 --- a/media/audioipc/client/src/context.rs +++ b/media/audioipc/client/src/context.rs @@ -3,48 +3,46 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use assert_not_in_callback; +use crate::assert_not_in_callback; +use crate::stream; +#[cfg(target_os = "linux")] +use crate::G_THREAD_POOL; +use crate::{ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD}; use audio_thread_priority::promote_current_thread_to_real_time; use audioipc::codec::LengthDelimitedCodec; +use audioipc::frame::{framed, Framed}; use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles}; use audioipc::{core, rpc}; -use audioipc::{messages, ClientMessage, ServerMessage}; +use audioipc::{ + messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage, + ServerMessage, +}; use cubeb_backend::{ ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result, Stream, StreamParams, StreamParamsRef, }; use futures::Future; -use futures_cpupool::CpuPool; +use futures_cpupool::{CpuFuture, CpuPool}; use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::sync::mpsc; +use std::sync::{Arc, Mutex}; use std::thread; use std::{fmt, io, mem, ptr}; -use stream; -use tokio_core::reactor::{Handle, Remote}; -use {ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD}; -cfg_if! { - if #[cfg(target_os = "linux")] { - use {G_THREAD_POOL}; - } -} +use tokio::reactor; +use tokio::runtime::current_thread; struct CubebClient; impl rpc::Client for CubebClient { type Request = ServerMessage; type Response = ClientMessage; - type Transport = FramedWithPlatformHandles>; + type Transport = FramedWithPlatformHandles< + audioipc::AsyncMessageStream, + LengthDelimitedCodec, + >; } -macro_rules! t( - ($e:expr) => ( - match $e { - Ok(e) => e, - Err(_) => return Err(Error::default()) - } - )); - pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream); // ClientContext's layout *must* match cubeb.c's `struct cubeb` for the @@ -55,12 +53,16 @@ pub struct ClientContext { rpc: rpc::ClientProxy, core: core::CoreThread, cpu_pool: CpuPool, + backend_id: CString, + device_collection_rpc: bool, + input_device_callback: Arc>, + output_device_callback: Arc>, } impl ClientContext { #[doc(hidden)] - pub fn remote(&self) -> Remote { - self.core.remote() + pub fn handle(&self) -> current_thread::Handle { + self.core.handle() } #[doc(hidden)] @@ -75,13 +77,16 @@ impl ClientContext { } // TODO: encapsulate connect, etc inside audioipc. -fn open_server_stream() -> Result { +fn open_server_stream() -> io::Result { unsafe { if let Some(fd) = G_SERVER_FD { return Ok(audioipc::MessageStream::from_raw_fd(fd.as_raw())); } - Err(Error::default()) + Err(io::Error::new( + io::ErrorKind::Other, + "Failed to get server connection.", + )) } } @@ -110,21 +115,74 @@ fn create_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { .create() } -cfg_if! { - if #[cfg(target_os = "linux")] { - fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { - let mut guard = G_THREAD_POOL.lock().unwrap(); - if guard.is_some() { - // Sandbox is on, and the thread pool was created earlier, before the lockdown. - guard.take().unwrap() - } else { - // Sandbox is off, let's create the pool now, promoting the threads will work. - create_thread_pool(init_params) - } - } +#[cfg(target_os = "linux")] +fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { + let mut guard = G_THREAD_POOL.lock().unwrap(); + if guard.is_some() { + // Sandbox is on, and the thread pool was created earlier, before the lockdown. + guard.take().unwrap() } else { - fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { - create_thread_pool(init_params) + // Sandbox is off, let's create the pool now, promoting the threads will work. + create_thread_pool(init_params) + } +} + +#[cfg(not(target_os = "linux"))] +fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { + create_thread_pool(init_params) +} + +#[derive(Default)] +struct DeviceCollectionCallback { + cb: ffi::cubeb_device_collection_changed_callback, + user_ptr: usize, +} + +struct DeviceCollectionServer { + input_device_callback: Arc>, + output_device_callback: Arc>, + cpu_pool: CpuPool, +} + +impl rpc::Server for DeviceCollectionServer { + type Request = DeviceCollectionReq; + type Response = DeviceCollectionResp; + type Future = CpuFuture; + type Transport = + Framed>; + + fn process(&mut self, req: Self::Request) -> Self::Future { + match req { + DeviceCollectionReq::DeviceChange(device_type) => { + trace!( + "ctx_thread: DeviceChange Callback: device_type={}", + device_type + ); + + let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type); + + let (input_cb, input_user_ptr) = { + let dcb = self.input_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + let (output_cb, output_user_ptr) = { + let dcb = self.output_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + + self.cpu_pool.spawn_fn(move || { + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) } + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + unsafe { + output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void) + } + } + + Ok(DeviceCollectionResp::DeviceChange) + }) + } } } } @@ -133,15 +191,14 @@ impl ContextOps for ClientContext { fn init(_context_name: Option<&CStr>) -> Result { fn bind_and_send_client( stream: audioipc::AsyncMessageStream, - handle: &Handle, tx_rpc: &mpsc::Sender>, - ) -> Option<()> { + ) -> io::Result<()> { let transport = framed_with_platformhandles(stream, Default::default()); - let rpc = rpc::bind_client::(transport, handle); + let rpc = rpc::bind_client::(transport); // If send fails then the rx end has closed // which is unlikely here. let _ = tx_rpc.send(rpc); - Some(()) + Ok(()) } assert_not_in_callback(); @@ -150,43 +207,45 @@ impl ContextOps for ClientContext { let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap()); - let core = t!(core::spawn_thread("AudioIPC Client RPC", move || { - let handle = core::handle(); + let core = core::spawn_thread("AudioIPC Client RPC", move || { + let handle = reactor::Handle::default(); register_thread(params.thread_create_callback); open_server_stream() - .ok() - .and_then(|stream| stream.into_tokio_ipc(&handle).ok()) - .and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc)) - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "Failed to open stream and create rpc.", - ) - }) - })); + .and_then(|stream| stream.into_tokio_ipc(&handle)) + .and_then(|stream| bind_and_send_client(stream, &tx_rpc)) + }) + .map_err(|_| Error::default())?; - let rpc = t!(rx_rpc.recv()); + let rpc = rx_rpc.recv().map_err(|_| Error::default())?; // Don't let errors bubble from here. Later calls against this context // will return errors the caller expects to handle. let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); - let pool = get_thread_pool(params); + let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId()) + .unwrap_or_else(|_| "(remote error)".to_string()); + let backend_id = CString::new(backend_id).expect("backend_id query failed"); + + let cpu_pool = get_thread_pool(params); let ctx = Box::new(ClientContext { _ops: &CLIENT_OPS as *const _, - rpc: rpc, - core: core, - cpu_pool: pool, + rpc, + core, + cpu_pool, + backend_id, + device_collection_rpc: false, + input_device_callback: Arc::new(Mutex::new(Default::default())), + output_device_callback: Arc::new(Mutex::new(Default::default())), }); Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) } - fn backend_id(&mut self) -> &'static CStr { + fn backend_id(&mut self) -> &CStr { assert_not_in_callback(); - unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) } + self.backend_id.as_c_str() } fn max_channel_count(&mut self) -> Result { @@ -264,7 +323,7 @@ impl ContextOps for ClientContext { input_stream_params: Option<&StreamParamsRef>, output_device: DeviceId, output_stream_params: Option<&StreamParamsRef>, - latency_frame: u32, + latency_frames: u32, // These params aren't sent to the server data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, @@ -288,24 +347,75 @@ impl ContextOps for ClientContext { let output_stream_params = opt_stream_params(output_stream_params); let init_params = messages::StreamInitParams { - stream_name: stream_name, + stream_name, input_device: input_device as usize, - input_stream_params: input_stream_params, + input_stream_params, output_device: output_device as usize, - output_stream_params: output_stream_params, - latency_frames: latency_frame, + output_stream_params, + latency_frames, }; stream::init(self, init_params, data_callback, state_callback, user_ptr) } fn register_device_collection_changed( &mut self, - _dev_type: DeviceType, - _collection_changed_callback: ffi::cubeb_device_collection_changed_callback, - _user_ptr: *mut c_void, + devtype: DeviceType, + collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + user_ptr: *mut c_void, ) -> Result<()> { assert_not_in_callback(); - Err(Error::not_supported()) + + if !self.device_collection_rpc { + let fds = send_recv!(self.rpc(), + ContextSetupDeviceCollectionCallback => + ContextSetupDeviceCollectionCallback())?; + + let stream = + unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].as_raw()) }; + + // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only + // need one here. Drop the dummy handles the other side sent us to discard. + unsafe { + fds.platform_handles[1].into_file(); + fds.platform_handles[2].into_file(); + } + + let server = DeviceCollectionServer { + input_device_callback: self.input_device_callback.clone(), + output_device_callback: self.output_device_callback.clone(), + cpu_pool: self.cpu_pool(), + }; + + let (wait_tx, wait_rx) = mpsc::channel(); + self.handle() + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stream.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + rpc::bind_server(transport, server); + wait_tx.send(()).unwrap(); + Ok(()) + })) + .expect("Failed to spawn DeviceCollectionServer"); + wait_rx.recv().unwrap(); + self.device_collection_rpc = true; + } + + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + let mut cb = self.input_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + let mut cb = self.output_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + + let enable = collection_changed_callback.is_some(); + send_recv!(self.rpc(), + ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) => + ContextRegisteredDeviceCollectionChanged) } } @@ -322,7 +432,7 @@ impl Drop for ClientContext { } impl fmt::Debug for ClientContext { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ClientContext") .field("_ops", &self._ops) .field("rpc", &self.rpc) diff --git a/media/audioipc/client/src/lib.rs b/media/audioipc/client/src/lib.rs index 302ca87a919f..a31bc62d5f56 100644 --- a/media/audioipc/client/src/lib.rs +++ b/media/audioipc/client/src/lib.rs @@ -2,45 +2,36 @@ // // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details. +#![warn(unused_extern_crates)] -extern crate audioipc; #[macro_use] extern crate cubeb_backend; -extern crate foreign_types; -extern crate futures; -extern crate futures_cpupool; -extern crate libc; #[macro_use] extern crate log; -extern crate tokio_core; -extern crate tokio_uds; -extern crate audio_thread_priority; #[macro_use] extern crate lazy_static; -#[macro_use] -extern crate cfg_if; #[macro_use] mod send_recv; mod context; mod stream; -use audioipc::{PlatformHandleType, PlatformHandle}; -use context::ClientContext; -use cubeb_backend::{capi, ffi}; -use std::os::raw::{c_char, c_int}; -use stream::ClientStream; -use std::sync::{Mutex}; -use futures_cpupool::CpuPool; +use crate::context::ClientContext; +use crate::stream::ClientStream; +#[cfg(target_os = "linux")] +use audio_thread_priority::promote_current_thread_to_real_time; use audio_thread_priority::RtPriorityHandle; -cfg_if! { - if #[cfg(target_os = "linux")] { - use std::sync::{Arc, Condvar}; - use std::ffi::CString; - use std::thread; - use audio_thread_priority::promote_current_thread_to_real_time; - } -} +use audioipc::{PlatformHandle, PlatformHandleType}; +use cubeb_backend::{capi, ffi}; +use futures_cpupool::CpuPool; +#[cfg(target_os = "linux")] +use std::ffi::CString; +use std::os::raw::{c_char, c_int}; +use std::sync::Mutex; +#[cfg(target_os = "linux")] +use std::sync::{Arc, Condvar}; +#[cfg(target_os = "linux")] +use std::thread; type InitParamsTls = std::cell::RefCell>; @@ -105,58 +96,63 @@ where static mut G_SERVER_FD: Option = None; -cfg_if! { - if #[cfg(target_os = "linux")] { - #[no_mangle] - pub unsafe extern "C" fn audioipc_init_threads(init_params: *const AudioIpcInitParams) { - let thread_create_callback = (*init_params).thread_create_callback; +#[cfg(target_os = "linux")] +#[no_mangle] +pub unsafe extern "C" fn audioipc_init_threads(init_params: *const AudioIpcInitParams) { + let thread_create_callback = (*init_params).thread_create_callback; - // It is critical that this function waits until the various threads are created, promoted to - // real-time, and _then_ return, because the sandbox lockdown happens right after returning - // from here. - let pair = Arc::new((Mutex::new((*init_params).pool_size), Condvar::new())); - let pair2 = pair.clone(); + // It is critical that this function waits until the various threads are created, promoted to + // real-time, and _then_ return, because the sandbox lockdown happens right after returning + // from here. + let pair = Arc::new((Mutex::new((*init_params).pool_size), Condvar::new())); + let pair2 = pair.clone(); - let register_thread = move || { - if let Some(func) = thread_create_callback { - match promote_current_thread_to_real_time(0, 48000) { - Ok(handle) => { - G_PRIORITY_HANDLES.with(|handles| { - (handles.borrow_mut()).push(handle); - }); - } - Err(_) => { - warn!("Could not promote audio threads to real-time during initialization."); - } - } - let thr = thread::current(); - let name = CString::new(thr.name().unwrap()).unwrap(); - func(name.as_ptr()); - let &(ref lock, ref cvar) = &*pair2; - let mut count = lock.lock().unwrap(); - *count -= 1; - cvar.notify_one(); + let register_thread = move || { + if let Some(func) = thread_create_callback { + match promote_current_thread_to_real_time(0, 48000) { + Ok(handle) => { + G_PRIORITY_HANDLES.with(|handles| { + (handles.borrow_mut()).push(handle); + }); + } + Err(_) => { + warn!("Could not promote audio threads to real-time during initialization."); } - }; - - let mut pool = G_THREAD_POOL.lock().unwrap(); - - *pool = Some(futures_cpupool::Builder::new() - .name_prefix("AudioIPC") - .after_start(register_thread) - .pool_size((*init_params).pool_size) - .stack_size((*init_params).stack_size) - .create()); - - let &(ref lock, ref cvar) = &*pair; - let mut count = lock.lock().unwrap(); - while *count != 0 { - count = cvar.wait(count).unwrap(); } + let thr = thread::current(); + let name = CString::new(thr.name().unwrap()).unwrap(); + func(name.as_ptr()); + let &(ref lock, ref cvar) = &*pair2; + let mut count = lock.lock().unwrap(); + *count -= 1; + cvar.notify_one(); } + }; + + let mut pool = G_THREAD_POOL.lock().unwrap(); + + *pool = Some( + futures_cpupool::Builder::new() + .name_prefix("AudioIPC") + .after_start(register_thread) + .pool_size((*init_params).pool_size) + .stack_size((*init_params).stack_size) + .create(), + ); + + let &(ref lock, ref cvar) = &*pair; + let mut count = lock.lock().unwrap(); + while *count != 0 { + count = cvar.wait(count).unwrap(); } } +#[cfg(not(target_os = "linux"))] +#[no_mangle] +pub unsafe extern "C" fn audioipc_init_threads(_: *const AudioIpcInitParams) { + unimplemented!(); +} + #[no_mangle] /// Entry point from C code. pub unsafe extern "C" fn audioipc_client_init( diff --git a/media/audioipc/client/src/stream.rs b/media/audioipc/client/src/stream.rs index 5511d00e87a8..b7c59d7317bd 100644 --- a/media/audioipc/client/src/stream.rs +++ b/media/audioipc/client/src/stream.rs @@ -3,6 +3,8 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details +use crate::ClientContext; +use crate::{assert_not_in_callback, set_in_callback}; use audioipc::codec::LengthDelimitedCodec; use audioipc::frame::{framed, Framed}; use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage}; @@ -15,11 +17,8 @@ use std::ffi::CString; use std::os::raw::c_void; use std::ptr; use std::sync::mpsc; -use ClientContext; -use {assert_not_in_callback, set_in_callback}; - -// TODO: Remove and let caller allocate based on cubeb backend requirements. -const SHM_AREA_SIZE: usize = 2 * 1024 * 1024; +use std::sync::{Arc, Mutex}; +use tokio::reactor; pub struct Device(ffi::cubeb_device); @@ -46,6 +45,7 @@ pub struct ClientStream<'ctx> { context: &'ctx ClientContext, user_ptr: *mut c_void, token: usize, + device_change_cb: Arc>, } struct CallbackServer { @@ -55,17 +55,23 @@ struct CallbackServer { state_cb: ffi::cubeb_state_callback, user_ptr: usize, cpu_pool: CpuPool, + device_change_cb: Arc>, } impl rpc::Server for CallbackServer { type Request = CallbackReq; type Response = CallbackResp; type Future = CpuFuture; - type Transport = Framed>; + type Transport = + Framed>; fn process(&mut self, req: Self::Request) -> Self::Future { match req { - CallbackReq::Data { nframes, input_frame_size, output_frame_size } => { + CallbackReq::Data { + nframes, + input_frame_size, + output_frame_size, + } => { trace!( "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}", nframes, @@ -75,11 +81,11 @@ impl rpc::Server for CallbackServer { // Clone values that need to be moved into the cpu pool thread. let input_shm = match self.input_shm { - Some(ref shm) => unsafe { Some(shm.clone_view()) }, + Some(ref shm) => unsafe { Some(shm.unsafe_clone()) }, None => None, }; let mut output_shm = match self.output_shm { - Some(ref shm) => unsafe { Some(shm.clone_view()) }, + Some(ref shm) => unsafe { Some(shm.unsafe_clone()) }, None => None, }; let user_ptr = self.user_ptr; @@ -131,6 +137,24 @@ impl rpc::Server for CallbackServer { Ok(CallbackResp::State) }) } + CallbackReq::DeviceChange => { + let cb = self.device_change_cb.clone(); + let user_ptr = self.user_ptr; + self.cpu_pool.spawn_fn(move || { + set_in_callback(true); + let cb = cb.lock().unwrap(); + if let Some(cb) = *cb { + unsafe { + cb(user_ptr as *mut _); + } + } else { + warn!("DeviceChange received with null callback"); + } + set_in_callback(false); + + Ok(CallbackResp::DeviceChange) + }) + } } } } @@ -149,9 +173,12 @@ impl<'ctx> ClientStream<'ctx> { let has_output = init_params.output_stream_params.is_some(); let rpc = ctx.rpc(); - let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated())); + let data = send_recv!(rpc, StreamInit(init_params) => StreamCreated())?; - debug!("token = {}, handles = {:?}", data.token, data.platform_handles); + debug!( + "token = {}, handles = {:?}", + data.token, data.platform_handles + ); let stm = data.platform_handles[0]; let stream = unsafe { audioipc::MessageStream::from_raw_fd(stm.as_raw()) }; @@ -159,7 +186,7 @@ impl<'ctx> ClientStream<'ctx> { let input = data.platform_handles[1]; let input_file = unsafe { input.into_file() }; let input_shm = if has_input { - Some(SharedMemSlice::from(&input_file, SHM_AREA_SIZE).unwrap()) + Some(SharedMemSlice::from(&input_file, audioipc::SHM_AREA_SIZE).unwrap()) } else { None }; @@ -167,7 +194,7 @@ impl<'ctx> ClientStream<'ctx> { let output = data.platform_handles[2]; let output_file = unsafe { output.into_file() }; let output_shm = if has_output { - Some(SharedMemMutSlice::from(&output_file, SHM_AREA_SIZE).unwrap()) + Some(SharedMemMutSlice::from(&output_file, audioipc::SHM_AREA_SIZE).unwrap()) } else { None }; @@ -176,29 +203,37 @@ impl<'ctx> ClientStream<'ctx> { let cpu_pool = ctx.cpu_pool(); + let null_cb: ffi::cubeb_device_changed_callback = None; + let device_change_cb = Arc::new(Mutex::new(null_cb)); + let server = CallbackServer { - input_shm: input_shm, - output_shm: output_shm, + input_shm, + output_shm, data_cb: data_callback, state_cb: state_callback, user_ptr: user_data, - cpu_pool: cpu_pool, + cpu_pool, + device_change_cb: device_change_cb.clone(), }; let (wait_tx, wait_rx) = mpsc::channel(); - ctx.remote().spawn(move |handle| { - let stream = stream.into_tokio_ipc(handle).unwrap(); - let transport = framed(stream, Default::default()); - rpc::bind_server(transport, server, handle); - wait_tx.send(()).unwrap(); - Ok(()) - }); + ctx.handle() + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stream.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + rpc::bind_server(transport, server); + wait_tx.send(()).unwrap(); + Ok(()) + })) + .expect("Failed to spawn CallbackServer"); wait_rx.recv().unwrap(); let stream = Box::into_raw(Box::new(ClientStream { context: ctx, - user_ptr: user_ptr, + user_ptr, token: data.token, + device_change_cb, })); Ok(unsafe { Stream::from_ptr(stream as *mut _) }) } @@ -277,13 +312,15 @@ impl<'ctx> StreamOps for ClientStream<'ctx> { } } - // TODO: How do we call this back? On what thread? fn register_device_changed_callback( &mut self, - _device_changed_callback: ffi::cubeb_device_changed_callback, + device_changed_callback: ffi::cubeb_device_changed_callback, ) -> Result<()> { assert_not_in_callback(); - Ok(()) + let rpc = self.context.rpc(); + let enable = device_changed_callback.is_some(); + *self.device_change_cb.lock().unwrap() = device_changed_callback; + send_recv!(rpc, StreamRegisterDeviceChangeCallback(self.token, enable) => StreamRegisterDeviceChangeCallback) } } @@ -294,13 +331,7 @@ pub fn init( state_callback: ffi::cubeb_state_callback, user_ptr: *mut c_void, ) -> Result { - let stm = try!(ClientStream::init( - ctx, - init_params, - data_callback, - state_callback, - user_ptr - )); + let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?; debug_assert_eq!(stm.user_ptr(), user_ptr); Ok(stm) } diff --git a/media/audioipc/server/Cargo.toml b/media/audioipc/server/Cargo.toml index 7ece6dd45b6c..dc39ccb18c08 100644 --- a/media/audioipc/server/Cargo.toml +++ b/media/audioipc/server/Cargo.toml @@ -6,20 +6,17 @@ authors = [ "Dan Glastonbury " ] description = "Remote cubeb server" +edition = "2018" [dependencies] +audio_thread_priority = "0.15.0" audioipc = { path = "../audioipc" } -cubeb-core = "0.5.4" -bytes = "0.4" -lazycell = "^0.4" -libc = "0.2" +cubeb-core = "0.5.5" +futures = "0.1.18" +lazy_static = "1.2.0" log = "0.4" slab = "0.3.0" -futures = "0.1.18" -tokio-core = "0.1" -tokio-uds = "0.1.7" -audio_thread_priority = "0.15.0" -lazy_static = "1.2.0" +tokio = "0.1" [dependencies.error-chain] version = "0.11.0" diff --git a/media/audioipc/server/src/lib.rs b/media/audioipc/server/src/lib.rs index adb8af7dd98a..63b373fe8f84 100644 --- a/media/audioipc/server/src/lib.rs +++ b/media/audioipc/server/src/lib.rs @@ -2,26 +2,16 @@ // // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details +#![warn(unused_extern_crates)] #[macro_use] extern crate error_chain; - #[macro_use] extern crate log; - -extern crate audioipc; -extern crate bytes; -extern crate cubeb_core as cubeb; -extern crate futures; -extern crate lazycell; -extern crate libc; -extern crate slab; -extern crate tokio_core; -extern crate tokio_uds; -extern crate audio_thread_priority; #[macro_use] extern crate lazy_static; +use audio_thread_priority::promote_current_thread_to_real_time; use audioipc::core; use audioipc::platformhandle_passing::framed_with_platformhandles; use audioipc::rpc; @@ -29,11 +19,11 @@ use audioipc::{MessageStream, PlatformHandle, PlatformHandleType}; use futures::sync::oneshot; use futures::Future; use std::error::Error; +use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::ptr; -use audio_thread_priority::promote_current_thread_to_real_time; -use std::ffi::{CStr, CString}; use std::sync::Mutex; +use tokio::reactor; mod server; @@ -56,14 +46,14 @@ pub mod errors { AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind); } foreign_links { - Cubeb(::cubeb::Error); + Cubeb(cubeb_core::Error); Io(::std::io::Error); Canceled(::futures::sync::oneshot::Canceled); } } } -use errors::*; +use crate::errors::*; struct ServerWrapper { core_thread: core::CoreThread, @@ -73,50 +63,49 @@ struct ServerWrapper { fn run() -> Result { trace!("Starting up cubeb audio server event loop thread..."); - let callback_thread = try!( - core::spawn_thread("AudioIPC Callback RPC", || { - match promote_current_thread_to_real_time(0, 48000) { - Ok(_) => { } - Err(_) => { - debug!("Failed to promote audio callback thread to real-time."); - } + let callback_thread = core::spawn_thread("AudioIPC Callback RPC", || { + match promote_current_thread_to_real_time(0, 48000) { + Ok(_) => {} + Err(_) => { + debug!("Failed to promote audio callback thread to real-time."); } - trace!("Starting up cubeb audio callback event loop thread..."); - Ok(()) - }).or_else(|e| { - debug!( - "Failed to start cubeb audio callback event loop thread: {:?}", - e.description() - ); - Err(e) - }) - ); + } + trace!("Starting up cubeb audio callback event loop thread..."); + Ok(()) + }) + .or_else(|e| { + debug!( + "Failed to start cubeb audio callback event loop thread: {:?}", + e.description() + ); + Err(e) + })?; - let core_thread = try!( - core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| { - debug!( - "Failed to cubeb audio core event loop thread: {:?}", - e.description() - ); - Err(e) - }) - ); + let core_thread = core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| { + debug!( + "Failed to cubeb audio core event loop thread: {:?}", + e.description() + ); + Err(e) + })?; Ok(ServerWrapper { - core_thread: core_thread, - callback_thread: callback_thread, + core_thread, + callback_thread, }) } #[no_mangle] -pub extern "C" fn audioipc_server_start(context_name: *const std::os::raw::c_char, - backend_name: *const std::os::raw::c_char) -> *mut c_void { +pub unsafe extern "C" fn audioipc_server_start( + context_name: *const std::os::raw::c_char, + backend_name: *const std::os::raw::c_char, +) -> *mut c_void { let mut params = G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); if !context_name.is_null() { - params.context_name = unsafe { CStr::from_ptr(context_name) }.to_owned(); + params.context_name = CStr::from_ptr(context_name).to_owned(); } if !backend_name.is_null() { - let backend_string = unsafe { CStr::from_ptr(backend_name) }.to_owned(); + let backend_string = CStr::from_ptr(backend_name).to_owned(); params.backend_name = Some(backend_string); } match run() { @@ -130,7 +119,7 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy let (wait_tx, wait_rx) = oneshot::channel(); let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) }; - let cb_remote = wrapper.callback_thread.remote(); + let core_handle = wrapper.callback_thread.handle(); // We create a connected pair of anonymous IPC endpoints. One side // is registered with the reactor core, the other side is returned @@ -139,22 +128,28 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy .and_then(|(sock1, sock2)| { // Spawn closure to run on same thread as reactor::Core // via remote handle. - wrapper.core_thread.remote().spawn(|handle| { - trace!("Incoming connection"); - sock2.into_tokio_ipc(handle) + wrapper + .core_thread + .handle() + .spawn(futures::future::lazy(|| { + trace!("Incoming connection"); + let handle = reactor::Handle::default(); + sock2.into_tokio_ipc(&handle) .and_then(|sock| { let transport = framed_with_platformhandles(sock, Default::default()); - rpc::bind_server(transport, server::CubebServer::new(cb_remote), handle); + rpc::bind_server(transport, server::CubebServer::new(core_handle)); Ok(()) }).map_err(|_| ()) // Notify waiting thread that sock2 has been registered. .and_then(|_| wait_tx.send(())) - }); + })) + .expect("Failed to spawn CubebServer"); // Wait for notification that sock2 has been registered // with reactor::Core. let _ = wait_rx.wait(); Ok(PlatformHandle::from(sock1).as_raw()) - }).unwrap_or(-1isize as PlatformHandleType) + }) + .unwrap_or(-1isize as PlatformHandleType) } #[no_mangle] diff --git a/media/audioipc/server/src/server.rs b/media/audioipc/server/src/server.rs index fb57858477e7..9d70cd3d7e9c 100644 --- a/media/audioipc/server/src/server.rs +++ b/media/audioipc/server/src/server.rs @@ -4,19 +4,19 @@ // accompanying file LICENSE for details use audioipc; -use audioipc::{MessageStream, PlatformHandle}; use audioipc::codec::LengthDelimitedCodec; -use audioipc::core; -use audioipc::platformhandle_passing::FramedWithPlatformHandles; use audioipc::frame::{framed, Framed}; use audioipc::messages::{ - CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo, ServerMessage, StreamCreate, - StreamInitParams, StreamParams, + CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp, + DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamInitParams, + StreamParams, }; +use audioipc::platformhandle_passing::FramedWithPlatformHandles; use audioipc::rpc; use audioipc::shm::{SharedMemReader, SharedMemWriter}; -use cubeb; -use cubeb::ffi; +use audioipc::{MessageStream, PlatformHandle}; +use cubeb_core as cubeb; +use cubeb_core::ffi; use futures::future::{self, FutureResult}; use futures::sync::oneshot; use futures::Future; @@ -26,25 +26,134 @@ use std::convert::From; use std::ffi::CStr; use std::mem::{size_of, ManuallyDrop}; use std::os::raw::{c_long, c_void}; +use std::rc::Rc; use std::{panic, slice}; -use tokio_core::reactor::Remote; +use tokio::reactor; +use tokio::runtime::current_thread; -use errors::*; +use crate::errors::*; fn error(error: cubeb::Error) -> ClientMessage { ClientMessage::Error(error.raw_code()) } -type ContextKey = RefCell>>; -thread_local!(static CONTEXT_KEY: ContextKey = RefCell::new(None)); +struct CubebDeviceCollectionManager { + servers: Vec>>, + devtype: cubeb::DeviceType, +} + +impl CubebDeviceCollectionManager { + fn new() -> CubebDeviceCollectionManager { + CubebDeviceCollectionManager { + servers: Vec::new(), + devtype: cubeb::DeviceType::empty(), + } + } + + fn register(&mut self, context: &cubeb::Context, server: &Rc>) { + if self + .servers + .iter() + .find(|s| Rc::ptr_eq(s, server)) + .is_none() + { + self.servers.push(server.clone()); + } + self.update(context); + } + + fn unregister(&mut self, context: &cubeb::Context, server: &Rc>) { + self.servers + .retain(|s| !(Rc::ptr_eq(&s, server) && s.borrow().devtype.is_empty())); + self.update(context); + } + + fn update(&mut self, context: &cubeb::Context) { + let mut devtype = cubeb::DeviceType::empty(); + for s in &self.servers { + devtype |= s.borrow().devtype; + } + for &dir in &[cubeb::DeviceType::INPUT, cubeb::DeviceType::OUTPUT] { + match (devtype.contains(dir), self.devtype.contains(dir)) { + (true, false) => self.internal_register(context, dir, true), + (false, true) => self.internal_register(context, dir, false), + _ => {} + } + } + } + + fn internal_register( + &mut self, + context: &cubeb::Context, + devtype: cubeb::DeviceType, + enable: bool, + ) { + let user_ptr = if enable { + self as *const CubebDeviceCollectionManager as *mut c_void + } else { + std::ptr::null_mut() + }; + for &(dir, cb) in &[ + ( + cubeb::DeviceType::INPUT, + device_collection_changed_input_cb_c as _, + ), + ( + cubeb::DeviceType::OUTPUT, + device_collection_changed_output_cb_c as _, + ), + ] { + if devtype.contains(dir) { + assert_eq!(self.devtype.contains(dir), !enable); + unsafe { + context + .register_device_collection_changed( + dir, + if enable { Some(cb) } else { None }, + user_ptr, + ) + .expect("devcol register failed"); + } + if enable { + self.devtype.insert(dir); + } else { + self.devtype.remove(dir); + } + } + } + } + + // Warning: this is called from an internal cubeb thread, so we must not mutate unprotected shared state. + unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) { + self.servers.iter().for_each(|server| { + if server + .borrow() + .devtype + .contains(cubeb::DeviceType::from_bits_truncate(device_type)) + { + server + .borrow_mut() + .device_collection_changed_callback(device_type) + } + }); + } +} + +struct CubebContextState { + context: cubeb::Result, + manager: CubebDeviceCollectionManager, +} + +type ContextKey = RefCell>; +thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None)); fn with_local_context(f: F) -> T where - F: FnOnce(&cubeb::Result) -> T, + F: FnOnce(&cubeb::Result, &mut CubebDeviceCollectionManager) -> T, { CONTEXT_KEY.with(|k| { - let mut context = k.borrow_mut(); - if context.is_none() { + let mut state = k.borrow_mut(); + if state.is_none() { let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); let context_name = Some(params.context_name.as_c_str()); let backend_name = if let Some(ref name) = params.backend_name { @@ -52,24 +161,34 @@ where } else { None }; - *context = Some(cubeb::Context::init(context_name, backend_name)); + let context = cubeb::Context::init(context_name, backend_name); + let manager = CubebDeviceCollectionManager::new(); + *state = Some(CubebContextState { context, manager }); } - f(context.as_ref().unwrap()) + let state = state.as_mut().unwrap(); + f(&state.context, &mut state.manager) }) } -// TODO: Remove and let caller allocate based on cubeb backend requirements. -const SHM_AREA_SIZE: usize = 2 * 1024 * 1024; - // The size in which the stream slab is grown. const STREAM_CONN_CHUNK_SIZE: usize = 64; +struct DeviceCollectionClient; + +impl rpc::Client for DeviceCollectionClient { + type Request = DeviceCollectionReq; + type Response = DeviceCollectionResp; + type Transport = + Framed>; +} + struct CallbackClient; impl rpc::Client for CallbackClient { type Request = CallbackReq; type Response = CallbackResp; - type Transport = Framed>; + type Transport = + Framed>; } struct ServerStreamCallbacks { @@ -87,17 +206,23 @@ struct ServerStreamCallbacks { impl ServerStreamCallbacks { fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize { - trace!("Stream data callback: {} {} {}", nframes, input.len(), output.len()); + trace!( + "Stream data callback: {} {} {}", + nframes, + input.len(), + output.len() + ); self.input_shm.write(input).unwrap(); let r = self .rpc .call(CallbackReq::Data { - nframes: nframes, + nframes, input_frame_size: self.input_frame_size as usize, output_frame_size: self.output_frame_size as usize, - }).wait(); + }) + .wait(); match r { Ok(CallbackResp::Data(frames)) => { @@ -121,7 +246,18 @@ impl ServerStreamCallbacks { match r { Ok(CallbackResp::State) => {} _ => { - debug!("Unexpected message {:?} during callback", r); + debug!("Unexpected message {:?} during state callback", r); + } + } + } + + fn device_change_callback(&mut self) { + trace!("Stream device change callback"); + let r = self.rpc.call(CallbackReq::DeviceChange).wait(); + match r { + Ok(CallbackResp::DeviceChange) => {} + _ => { + debug!("Unexpected message {:?} during device change callback", r); } } } @@ -143,38 +279,67 @@ impl Drop for ServerStream { type StreamSlab = slab::Slab; +struct CubebServerCallbacks { + rpc: rpc::ClientProxy, + devtype: cubeb::DeviceType, +} + +impl CubebServerCallbacks { + fn device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type) { + // TODO: Assert device_type is in devtype. + debug!( + "Sending device collection ({:?}) changed event", + device_type + ); + let _ = self + .rpc + .call(DeviceCollectionReq::DeviceChange(device_type)) + .wait(); + } +} + pub struct CubebServer { - cb_remote: Remote, + handle: current_thread::Handle, streams: StreamSlab, remote_pid: Option, + cbs: Option>>, } impl rpc::Server for CubebServer { type Request = ServerMessage; type Response = ClientMessage; type Future = FutureResult; - type Transport = FramedWithPlatformHandles>; + type Transport = FramedWithPlatformHandles< + audioipc::AsyncMessageStream, + LengthDelimitedCodec, + >; fn process(&mut self, req: Self::Request) -> Self::Future { - let resp = with_local_context(|context| match *context { + let resp = with_local_context(|context, manager| match *context { Err(_) => error(cubeb::Error::error()), - Ok(ref context) => self.process_msg(context, &req), + Ok(ref context) => self.process_msg(context, manager, &req), }); future::ok(resp) } } impl CubebServer { - pub fn new(cb_remote: Remote) -> Self { + pub fn new(handle: current_thread::Handle) -> Self { CubebServer { - cb_remote: cb_remote, + handle, streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE), remote_pid: None, + cbs: None, } } // Process a request coming from the client. - fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage { + fn process_msg( + &mut self, + context: &cubeb::Context, + manager: &mut CubebDeviceCollectionManager, + msg: &ServerMessage, + ) -> ClientMessage { let resp: ClientMessage = match *msg { ServerMessage::ClientConnect(pid) => { self.remote_pid = Some(pid); @@ -187,7 +352,9 @@ impl CubebServer { ClientMessage::ClientDisconnected } - ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(), + ServerMessage::ContextGetBackendId => { + ClientMessage::ContextBackendId(context.backend_id().to_string()) + } ServerMessage::ContextGetMaxChannelCount => context .max_channel_count() @@ -200,8 +367,8 @@ impl CubebServer { let params = cubeb::StreamParamsBuilder::new() .format(format) - .rate(u32::from(params.rate)) - .channels(u32::from(params.channels)) + .rate(params.rate) + .channels(params.channels) .layout(layout) .take(); @@ -221,7 +388,8 @@ impl CubebServer { .map(|devices| { let v: Vec = devices.iter().map(|i| i.as_ref().into()).collect(); ClientMessage::ContextEnumeratedDevices(v) - }).unwrap_or_else(error), + }) + .unwrap_or_else(error), ServerMessage::StreamInit(ref params) => self .process_stream_init(context, params) @@ -279,6 +447,76 @@ impl CubebServer { .current_device() .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device))) .unwrap_or_else(error), + + ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => self.streams + [stm_tok] + .stream + .register_device_changed_callback(if enable { + Some(device_change_cb_c) + } else { + None + }) + .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback) + .unwrap_or_else(error), + + ServerMessage::ContextSetupDeviceCollectionCallback => { + if let Ok((stm1, stm2)) = MessageStream::anonymous_ipc_pair() { + debug!("Created device collection RPC pair: {:?}-{:?}", stm1, stm2); + + // This code is currently running on the Client/Server RPC + // handling thread. We need to move the registration of the + // bind_client to the callback RPC handling thread. This is + // done by spawning a future on `handle`. + + let (tx, rx) = oneshot::channel(); + self.handle + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stm2.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + let rpc = rpc::bind_client::(transport); + drop(tx.send(rpc)); + Ok(()) + })) + .expect("Failed to spawn DeviceCollectionClient"); + + // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only + // need one here. Send some dummy handles over for the other side to discard. + let (dummy1, dummy2) = + MessageStream::anonymous_ipc_pair().expect("need dummy IPC pair"); + if let Ok(rpc) = rx.wait() { + self.cbs = Some(Rc::new(RefCell::new(CubebServerCallbacks { + rpc, + devtype: cubeb::DeviceType::empty(), + }))); + let fds = RegisterDeviceCollectionChanged { + platform_handles: [ + PlatformHandle::from(stm1), + PlatformHandle::from(dummy1), + PlatformHandle::from(dummy2), + ], + target_pid: self.remote_pid.unwrap(), + }; + + ClientMessage::ContextSetupDeviceCollectionCallback(fds) + } else { + warn!("Failed to setup RPC client"); + error(cubeb::Error::error()) + } + } else { + warn!("Failed to create RPC pair"); + error(cubeb::Error::error()) + } + } + + ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self + .process_register_device_collection_changed( + context, + manager, + cubeb::DeviceType::from_bits_truncate(device_type), + enable, + ) + .unwrap_or_else(error), }; trace!("process_msg: req={:?}, resp={:?}", msg, resp); @@ -286,6 +524,30 @@ impl CubebServer { resp } + fn process_register_device_collection_changed( + &mut self, + context: &cubeb::Context, + manager: &mut CubebDeviceCollectionManager, + devtype: cubeb::DeviceType, + enable: bool, + ) -> cubeb::Result { + if devtype == cubeb::DeviceType::UNKNOWN { + return Err(cubeb::Error::invalid_parameter()); + } + + assert!(self.cbs.is_some()); + let cbs = self.cbs.as_ref().unwrap(); + + if enable { + cbs.borrow_mut().devtype.insert(devtype); + manager.register(context, cbs); + } else { + cbs.borrow_mut().devtype.remove(devtype); + manager.unregister(context, cbs); + } + Ok(ClientMessage::ContextRegisteredDeviceCollectionChanged) + } + // Stream init is special, so it's been separated from process_msg. fn process_stream_init( &mut self, @@ -306,7 +568,8 @@ impl CubebServer { }; let channel_count = p.channels as u16; sample_size * channel_count - }).unwrap_or(0u16) + }) + .unwrap_or(0u16) } // Create the callback handling struct which is attached the cubeb stream. @@ -316,28 +579,26 @@ impl CubebServer { let (stm1, stm2) = MessageStream::anonymous_ipc_pair()?; debug!("Created callback pair: {:?}-{:?}", stm1, stm2); let (input_shm, input_file) = - SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?; + SharedMemWriter::new(&audioipc::get_shm_path("input"), audioipc::SHM_AREA_SIZE)?; let (output_shm, output_file) = - SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?; + SharedMemReader::new(&audioipc::get_shm_path("output"), audioipc::SHM_AREA_SIZE)?; // This code is currently running on the Client/Server RPC // handling thread. We need to move the registration of the // bind_client to the callback RPC handling thread. This is - // done by spawning a future on cb_remote. - - let id = core::handle().id(); + // done by spawning a future on `handle`. let (tx, rx) = oneshot::channel(); - self.cb_remote.spawn(move |handle| { - // Ensure we're running on a loop different to the one - // invoking spawn_fn. - assert_ne!(id, handle.id()); - let stream = stm2.into_tokio_ipc(handle).unwrap(); - let transport = framed(stream, Default::default()); - let rpc = rpc::bind_client::(transport, handle); - drop(tx.send(rpc)); - Ok(()) - }); + self.handle + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stm2.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + let rpc = rpc::bind_client::(transport); + drop(tx.send(rpc)); + Ok(()) + })) + .expect("Failed to spawn CallbackClient"); let rpc: rpc::ClientProxy = match rx.wait() { Ok(rpc) => rpc, @@ -384,7 +645,8 @@ impl CubebServer { Some(data_cb_c), Some(state_cb_c), user_ptr, - ).and_then(|stream| { + ) + .and_then(|stream| { if !self.streams.has_available() { trace!( "server connection ran out of stream slots. reserving {} more.", @@ -401,7 +663,8 @@ impl CubebServer { .insert(ServerStream { stream: ManuallyDrop::new(stream), cbs: ManuallyDrop::new(cbs), - }).index() + }) + .index() } None => { // TODO: Turn into error @@ -416,9 +679,10 @@ impl CubebServer { PlatformHandle::from(input_file), PlatformHandle::from(output_file), ], - target_pid: self.remote_pid.unwrap() + target_pid: self.remote_pid.unwrap(), })) - }).map_err(|e| e.into()) + }) + .map_err(|e| e.into()) } } } @@ -436,13 +700,13 @@ unsafe extern "C" fn data_cb_c( let input = if input_buffer.is_null() { &[] } else { - let nbytes = nframes * cbs.input_frame_size as c_long; + let nbytes = nframes * c_long::from(cbs.input_frame_size); slice::from_raw_parts(input_buffer as *const u8, nbytes as usize) }; let output: &mut [u8] = if output_buffer.is_null() { &mut [] } else { - let nbytes = nframes * cbs.output_frame_size as c_long; + let nbytes = nframes * c_long::from(cbs.output_frame_size); slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize) }; cbs.data_callback(input, output, nframes as isize) as c_long @@ -462,3 +726,33 @@ unsafe extern "C" fn state_cb_c( }); ok.expect("State callback panicked"); } + +unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) { + let ok = panic::catch_unwind(|| { + let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); + cbs.device_change_callback(); + }); + ok.expect("Device change callback panicked"); +} + +unsafe extern "C" fn device_collection_changed_input_cb_c( + _: *mut ffi::cubeb, + user_ptr: *mut c_void, +) { + let ok = panic::catch_unwind(|| { + let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager); + manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT); + }); + ok.expect("Collection changed (input) callback panicked"); +} + +unsafe extern "C" fn device_collection_changed_output_cb_c( + _: *mut ffi::cubeb, + user_ptr: *mut c_void, +) { + let ok = panic::catch_unwind(|| { + let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager); + manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT); + }); + ok.expect("Collection changed (output) callback panicked"); +}