Bug 1569090 - Update audioipc to 177ebd96. r=chunmin

Differential Revision: https://phabricator.services.mozilla.com/D39453

--HG--
rename : media/audioipc/audioipc/src/async.rs => media/audioipc/audioipc/src/async_msg.rs
extra : moz-landing-system : lando
This commit is contained in:
Matthew Gregan 2019-07-27 13:14:23 +00:00
Родитель 47358b914e
Коммит e0fdf04539
30 изменённых файлов: 1419 добавлений и 753 удалений

Просмотреть файл

@ -32,5 +32,14 @@ git = "https://github.com/ChunMinChang/coreaudio-sys"
branch = "gecko-build" branch = "gecko-build"
replace-with = "vendored-sources" replace-with = "vendored-sources"
[source."https://github.com/alexcrichton/mio-named-pipes"]
git = "https://github.com/alexcrichton/mio-named-pipes"
replace-with = "vendored-sources"
[source."https://github.com/NikVolf/tokio-named-pipes"]
git = "https://github.com/NikVolf/tokio-named-pipes"
branch = "stable"
replace-with = "vendored-sources"
[source.vendored-sources] [source.vendored-sources]
directory = '@top_srcdir@/third_party/rust' directory = '@top_srcdir@/third_party/rust'

Просмотреть файл

@ -1,2 +1,3 @@
[workspace] [workspace]
members = ["audioipc", "client", "server"] members = ["audioipc", "client", "server"]

Просмотреть файл

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was 1c6e33942d726d71cb911fb8f57138aadf577057 (2019-07-23 15:56:25 +1200) The git commit ID used was 959be8b71bd8d931a9e94276dde1dd718f52fca4 (2019-07-26 16:40:02 +1200)

Просмотреть файл

@ -6,24 +6,32 @@ authors = [
"Dan Glastonbury <dan.glastonbury@gmail.com>" "Dan Glastonbury <dan.glastonbury@gmail.com>"
] ]
description = "Remote Cubeb IPC" description = "Remote Cubeb IPC"
edition = "2018"
[dependencies] [dependencies]
cubeb = "0.5.4"
bincode = "1.0" bincode = "1.0"
bytes = "0.4" bytes = "0.4"
cubeb = "0.5.5"
futures = "0.1.18" futures = "0.1.18"
iovec = "0.1"
libc = "0.2"
log = "0.4" log = "0.4"
memmap = "0.5.2" memmap = "0.7"
scoped-tls = "0.1" scoped-tls = "0.1"
serde = "1.*.*" serde = "1.*.*"
serde_derive = "1.*.*" serde_derive = "1.*.*"
tokio-core = "0.1" tokio = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-uds = "0.1.7"
[target.'cfg(unix)'.dependencies]
iovec = "0.1"
libc = "0.2"
mio = "0.6.19"
mio-uds = "0.6.7"
tokio-reactor = "0.1"
[target.'cfg(windows)'.dependencies]
mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" }
tokio-named-pipes = { git = "https://github.com/NikVolf/tokio-named-pipes", branch = "stable" }
winapi = "0.3.6" winapi = "0.3.6"
mio-named-pipes = "=0.1.5"
[dependencies.error-chain] [dependencies.error-chain]
version = "0.11.0" version = "0.11.0"

Просмотреть файл

@ -5,6 +5,8 @@
//! Various async helpers modelled after futures-rs and tokio-io. //! Various async helpers modelled after futures-rs and tokio-io.
#[cfg(unix)]
use crate::msg::{RecvMsg, SendMsg};
use bytes::{Buf, BufMut}; use bytes::{Buf, BufMut};
#[cfg(unix)] #[cfg(unix)]
use futures::Async; use futures::Async;
@ -12,7 +14,7 @@ use futures::Poll;
#[cfg(unix)] #[cfg(unix)]
use iovec::IoVec; use iovec::IoVec;
#[cfg(unix)] #[cfg(unix)]
use msg::{RecvMsg, SendMsg}; use mio::Ready;
use std::io; use std::io;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -63,7 +65,9 @@ impl AsyncRecvMsg for super::AsyncMessageStream {
where where
B: BufMut, B: BufMut,
{ {
if let Async::NotReady = <super::AsyncMessageStream>::poll_read(self) { if let Async::NotReady =
<super::AsyncMessageStream>::poll_read_ready(self, Ready::readable())?
{
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
let r = unsafe { let r = unsafe {
@ -119,7 +123,7 @@ impl AsyncRecvMsg for super::AsyncMessageStream {
Ok((n, flags).into()) Ok((n, flags).into())
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.need_read(); self.clear_read_ready(mio::Ready::readable())?;
Ok(Async::NotReady) Ok(Async::NotReady)
} }
Err(e) => Err(e), Err(e) => Err(e),
@ -134,7 +138,7 @@ impl AsyncSendMsg for super::AsyncMessageStream {
B: Buf, B: Buf,
C: Buf, C: Buf,
{ {
if let Async::NotReady = <super::AsyncMessageStream>::poll_write(self) { if let Async::NotReady = <super::AsyncMessageStream>::poll_write_ready(self)? {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
let r = { let r = {
@ -155,7 +159,7 @@ impl AsyncSendMsg for super::AsyncMessageStream {
Ok(Async::Ready(n)) Ok(Async::Ready(n))
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.need_write(); self.clear_write_ready()?;
Ok(Async::NotReady) Ok(Async::NotReady)
} }
Err(e) => Err(e), Err(e) => Err(e),

Просмотреть файл

@ -111,6 +111,7 @@ impl ControlMsgBuilder {
// that, just use a pre-zeroed struct to fill out any // that, just use a pre-zeroed struct to fill out any
// fields we don't care about. // fields we don't care about.
let zeroed = unsafe { mem::zeroed() }; let zeroed = unsafe { mem::zeroed() };
#[allow(clippy::needless_update)]
let cmsghdr = cmsghdr { let cmsghdr = cmsghdr {
cmsg_len: cmsg_len as _, cmsg_len: cmsg_len as _,
cmsg_level: level, cmsg_level: level,
@ -122,7 +123,7 @@ impl ControlMsgBuilder {
slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>()) slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>())
}; };
cmsg.put_slice(cmsghdr); cmsg.put_slice(cmsghdr);
let mut cmsg = try!(align_buf(cmsg)); let mut cmsg = align_buf(cmsg)?;
cmsg.put_slice(msg); cmsg.put_slice(msg);
Ok(cmsg) Ok(cmsg)

Просмотреть файл

@ -31,7 +31,7 @@ pub trait Codec {
/// A default method available to be called when there are no more bytes /// A default method available to be called when there are no more bytes
/// available to be read from the I/O. /// available to be read from the I/O.
fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> { fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
match try!(self.decode(buf)) { match self.decode(buf)? {
Some(frame) => Ok(frame), Some(frame) => Ok(frame),
None => Err(io::Error::new( None => Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
@ -101,10 +101,10 @@ impl<In, Out> LengthDelimitedCodec<In, Out> {
let buf = buf.split_to(n).freeze(); let buf = buf.split_to(n).freeze();
trace!("Attempting to decode"); trace!("Attempting to decode");
let msg = try!(deserialize::<Out>(buf.as_ref()).map_err(|e| match *e { let msg = deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
bincode::ErrorKind::Io(e) => e, bincode::ErrorKind::Io(e) => e,
_ => io::Error::new(io::ErrorKind::Other, *e), _ => io::Error::new(io::ErrorKind::Other, *e),
})); })?;
trace!("... Decoded {:?}", msg); trace!("... Decoded {:?}", msg);
Ok(Some(msg)) Ok(Some(msg))
@ -122,7 +122,7 @@ where
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> { fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
let n = match self.state { let n = match self.state {
State::Length => { State::Length => {
match try!(self.decode_length(buf)) { match self.decode_length(buf)? {
Some(n) => { Some(n) => {
self.state = State::Data(n); self.state = State::Data(n);
@ -138,7 +138,7 @@ where
State::Data(n) => n, State::Data(n) => n,
}; };
match try!(self.decode_data(buf, n)) { match self.decode_data(buf, n)? {
Some(data) => { Some(data) => {
// Update the decode state // Update the decode state
self.state = State::Length; self.state = State::Length;

Просмотреть файл

@ -6,33 +6,9 @@
// Ease accessing reactor::Core handles. // Ease accessing reactor::Core handles.
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{Future, IntoFuture};
use std::sync::mpsc; use std::sync::mpsc;
use std::{fmt, io, thread}; use std::{fmt, io, thread};
use tokio_core::reactor::{Core, Handle, Remote}; use tokio::runtime::current_thread;
scoped_thread_local! {
static HANDLE: Handle
}
pub fn handle() -> Handle {
HANDLE.with(|handle| handle.clone())
}
pub fn spawn<F>(f: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
HANDLE.with(|handle| handle.spawn(f))
}
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
R: IntoFuture<Item = (), Error = ()> + 'static,
{
HANDLE.with(|handle| handle.spawn_fn(f))
}
struct Inner { struct Inner {
join: thread::JoinHandle<()>, join: thread::JoinHandle<()>,
@ -41,12 +17,12 @@ struct Inner {
pub struct CoreThread { pub struct CoreThread {
inner: Option<Inner>, inner: Option<Inner>,
remote: Remote, handle: current_thread::Handle,
} }
impl CoreThread { impl CoreThread {
pub fn remote(&self) -> Remote { pub fn handle(&self) -> current_thread::Handle {
self.remote.clone() self.handle.clone()
} }
} }
@ -61,9 +37,9 @@ impl Drop for CoreThread {
} }
impl fmt::Debug for CoreThread { impl fmt::Debug for CoreThread {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// f.debug_tuple("CoreThread").field(&"...").finish() // f.debug_tuple("CoreThread").field(&"...").finish()
f.debug_tuple("CoreThread").field(&self.remote).finish() f.debug_tuple("CoreThread").field(&self.handle).finish()
} }
} }
@ -73,33 +49,35 @@ where
F: FnOnce() -> io::Result<()> + Send + 'static, F: FnOnce() -> io::Result<()> + Send + 'static,
{ {
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (remote_tx, remote_rx) = mpsc::channel::<Remote>(); let (handle_tx, handle_rx) = mpsc::channel::<current_thread::Handle>();
let join = try!(thread::Builder::new().name(name.into()).spawn(move || { let join = thread::Builder::new().name(name.into()).spawn(move || {
let mut core = Core::new().expect("Failed to create reactor::Core"); let mut rt =
let handle = core.handle(); current_thread::Runtime::new().expect("Failed to create current_thread::Runtime");
let remote = handle.remote().clone(); let handle = rt.handle();
drop(remote_tx.send(remote)); drop(handle_tx.send(handle.clone()));
drop(HANDLE.set(&handle, || { rt.spawn(futures::future::lazy(|| {
f().and_then(|_| { let _ = f();
let _ = core.run(shutdown_rx); Ok(())
Ok(())
})
})); }));
trace!("thread shutdown...");
}));
let remote = try!(remote_rx.recv().or_else(|_| Err(io::Error::new( let _ = rt.block_on(shutdown_rx);
io::ErrorKind::Other, trace!("thread shutdown...");
"Failed to receive remote handle from spawned thread" })?;
))));
let handle = handle_rx.recv().or_else(|_| {
Err(io::Error::new(
io::ErrorKind::Other,
"Failed to receive remote handle from spawned thread",
))
})?;
Ok(CoreThread { Ok(CoreThread {
inner: Some(Inner { inner: Some(Inner {
join: join, join,
shutdown: shutdown_tx, shutdown: shutdown_tx,
}), }),
remote: remote, handle,
}) })
} }

Просмотреть файл

@ -3,12 +3,12 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use async::{AsyncRecvMsg, AsyncSendMsg}; use crate::async_msg::{AsyncRecvMsg, AsyncSendMsg};
use crate::cmsg;
use crate::codec::Codec;
use crate::messages::AssocRawPlatformHandle;
use bytes::{Bytes, BytesMut, IntoBuf}; use bytes::{Bytes, BytesMut, IntoBuf};
use cmsg;
use codec::Codec;
use futures::{AsyncSink, Poll, Sink, StartSend, Stream}; use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
use messages::AssocRawPlatformHandle;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::os::unix::io::RawFd; use std::os::unix::io::RawFd;
use std::{fmt, io, mem}; use std::{fmt, io, mem};
@ -103,7 +103,8 @@ where
let fds = match frame.fds { let fds = match frame.fds {
Some(ref fds) => fds.clone(), Some(ref fds) => fds.clone(),
None => Bytes::new(), None => Bytes::new(),
}.into_buf(); }
.into_buf();
try_ready!(self.io.send_msg_buf(&mut msgs, &fds)) try_ready!(self.io.send_msg_buf(&mut msgs, &fds))
} }
_ => { _ => {
@ -179,14 +180,14 @@ where
// readable again, at which point the stream is terminated. // readable again, at which point the stream is terminated.
if self.is_readable { if self.is_readable {
if self.eof { if self.eof {
let mut item = try!(self.codec.decode_eof(&mut self.read_buf)); let mut item = self.codec.decode_eof(&mut self.read_buf)?;
item.take_platform_handles(|| self.incoming_fds.take_fds()); item.take_platform_handles(|| self.incoming_fds.take_fds());
return Ok(Some(item).into()); return Ok(Some(item).into());
} }
trace!("attempting to decode a frame"); trace!("attempting to decode a frame");
if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) { if let Some(mut item) = self.codec.decode(&mut self.read_buf)? {
trace!("frame decoded from buffer"); trace!("frame decoded from buffer");
item.take_platform_handles(|| self.incoming_fds.take_fds()); item.take_platform_handles(|| self.incoming_fds.take_fds());
return Ok(Some(item).into()); return Ok(Some(item).into());
@ -200,10 +201,9 @@ where
// Otherwise, try to read more data and try again. Make sure we've // Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't // got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF // get a spurious 0 that looks like EOF
let (n, _) = try_ready!( let (n, _) = try_ready!(self
self.io .io
.recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg()) .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg()));
);
if n == 0 { if n == 0 {
self.eof = true; self.eof = true;
@ -230,14 +230,14 @@ where
// then attempt to flush it. If after flush it's *still* // then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send. // over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD { if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete()); self.poll_complete()?;
if self.write_buf.len() > BACKPRESSURE_THRESHOLD { if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
return Ok(AsyncSink::NotReady(item)); return Ok(AsyncSink::NotReady(item));
} }
} }
let fds = item.platform_handles(); let fds = item.platform_handles();
try!(self.codec.encode(item, &mut self.write_buf)); self.codec.encode(item, &mut self.write_buf)?;
let fds = fds.and_then(|fds| { let fds = fds.and_then(|fds| {
cmsg::builder(&mut self.outgoing_fds) cmsg::builder(&mut self.outgoing_fds)
.rights(&fds.0[..]) .rights(&fds.0[..])
@ -275,8 +275,8 @@ where
pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> { pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> {
FramedWithPlatformHandles { FramedWithPlatformHandles {
io: io, io,
codec: codec, codec,
read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
incoming_fds: IncomingFds::new(FDS_CAPACITY), incoming_fds: IncomingFds::new(FDS_CAPACITY),
is_readable: false, is_readable: false,
@ -313,8 +313,8 @@ mod tests {
use libc; use libc;
use std; use std;
extern { extern "C" {
fn cmsghdr_bytes(size: *mut libc::size_t) -> *const libc::uint8_t; fn cmsghdr_bytes(size: *mut libc::size_t) -> *const u8;
} }
fn cmsg_bytes() -> &'static [u8] { fn cmsg_bytes() -> &'static [u8] {

Просмотреть файл

@ -3,8 +3,8 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use crate::codec::Codec;
use bytes::{Buf, Bytes, BytesMut, IntoBuf}; use bytes::{Buf, Bytes, BytesMut, IntoBuf};
use codec::Codec;
use futures::{AsyncSink, Poll, Sink, StartSend, Stream}; use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
use std::io; use std::io;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -79,13 +79,13 @@ where
// readable again, at which point the stream is terminated. // readable again, at which point the stream is terminated.
if self.is_readable { if self.is_readable {
if self.eof { if self.eof {
let frame = try!(self.codec.decode_eof(&mut self.read_buf)); let frame = self.codec.decode_eof(&mut self.read_buf)?;
return Ok(Some(frame).into()); return Ok(Some(frame).into());
} }
trace!("attempting to decode a frame"); trace!("attempting to decode a frame");
if let Some(frame) = try!(self.codec.decode(&mut self.read_buf)) { if let Some(frame) = self.codec.decode(&mut self.read_buf)? {
trace!("frame decoded from buffer"); trace!("frame decoded from buffer");
return Ok(Some(frame).into()); return Ok(Some(frame).into());
} }
@ -120,13 +120,13 @@ where
// then attempt to flush it. If after flush it's *still* // then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send. // over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD { if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete()); self.poll_complete()?;
if self.write_buf.len() > BACKPRESSURE_THRESHOLD { if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
return Ok(AsyncSink::NotReady(item)); return Ok(AsyncSink::NotReady(item));
} }
} }
try!(self.codec.encode(item, &mut self.write_buf)); self.codec.encode(item, &mut self.write_buf)?;
Ok(AsyncSink::Ready) Ok(AsyncSink::Ready)
} }
@ -149,8 +149,8 @@ where
pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> { pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
Framed { Framed {
io: io, io,
codec: codec, codec,
read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
write_buf: BytesMut::with_capacity(INITIAL_CAPACITY), write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
frame: None, frame: None,

Просмотреть файл

@ -3,13 +3,13 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use tokio_io::{AsyncRead, AsyncWrite}; use crate::codec::Codec;
use crate::messages::AssocRawPlatformHandle;
use bytes::{Bytes, BytesMut, IntoBuf}; use bytes::{Bytes, BytesMut, IntoBuf};
use codec::Codec;
use futures::{AsyncSink, Poll, Sink, StartSend, Stream}; use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
use messages::AssocRawPlatformHandle;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::{fmt, io}; use std::{fmt, io};
use tokio_io::{AsyncRead, AsyncWrite};
const INITIAL_CAPACITY: usize = 1024; const INITIAL_CAPACITY: usize = 1024;
const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY; const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
@ -117,13 +117,13 @@ where
// readable again, at which point the stream is terminated. // readable again, at which point the stream is terminated.
if self.is_readable { if self.is_readable {
if self.eof { if self.eof {
let item = try!(self.codec.decode_eof(&mut self.read_buf)); let item = self.codec.decode_eof(&mut self.read_buf)?;
return Ok(Some(item).into()); return Ok(Some(item).into());
} }
trace!("attempting to decode a frame"); trace!("attempting to decode a frame");
if let Some(item) = try!(self.codec.decode(&mut self.read_buf)) { if let Some(item) = self.codec.decode(&mut self.read_buf)? {
trace!("frame decoded from buffer"); trace!("frame decoded from buffer");
return Ok(Some(item).into()); return Ok(Some(item).into());
} }
@ -136,10 +136,7 @@ where
// Otherwise, try to read more data and try again. Make sure we've // Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't // got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF // get a spurious 0 that looks like EOF
let n = try_ready!( let n = try_ready!(self.io.read_buf(&mut self.read_buf));
self.io
.read_buf(&mut self.read_buf)
);
if n == 0 { if n == 0 {
self.eof = true; self.eof = true;
@ -159,14 +156,17 @@ where
type SinkItem = C::In; type SinkItem = C::In;
type SinkError = io::Error; type SinkError = io::Error;
fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { fn start_send(
&mut self,
mut item: Self::SinkItem,
) -> StartSend<Self::SinkItem, Self::SinkError> {
trace!("start_send: item={:?}", item); trace!("start_send: item={:?}", item);
// If the buffer is already over BACKPRESSURE_THRESHOLD, // If the buffer is already over BACKPRESSURE_THRESHOLD,
// then attempt to flush it. If after flush it's *still* // then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send. // over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD { if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete()); self.poll_complete()?;
if self.write_buf.len() > BACKPRESSURE_THRESHOLD { if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
return Ok(AsyncSink::NotReady(item)); return Ok(AsyncSink::NotReady(item));
} }
@ -176,15 +176,21 @@ where
if let Some((handles, target_pid)) = item.platform_handles() { if let Some((handles, target_pid)) = item.platform_handles() {
got_handles = true; got_handles = true;
let remote_handles = unsafe { let remote_handles = unsafe {
[duplicate_platformhandle(handles[0], target_pid)?, [
duplicate_platformhandle(handles[1], target_pid)?, duplicate_platformhandle(handles[0], target_pid)?,
duplicate_platformhandle(handles[2], target_pid)?] duplicate_platformhandle(handles[1], target_pid)?,
duplicate_platformhandle(handles[2], target_pid)?,
]
}; };
trace!("item handles: {:?} remote_handles: {:?}", handles, remote_handles); trace!(
"item handles: {:?} remote_handles: {:?}",
handles,
remote_handles
);
item.take_platform_handles(|| Some(remote_handles)); item.take_platform_handles(|| Some(remote_handles));
} }
try!(self.codec.encode(item, &mut self.write_buf)); self.codec.encode(item, &mut self.write_buf)?;
if got_handles { if got_handles {
// Enforce splitting sends on messages that contain file // Enforce splitting sends on messages that contain file
@ -214,8 +220,8 @@ where
pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> { pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformHandles<A, C> {
FramedWithPlatformHandles { FramedWithPlatformHandles {
io: io, io,
codec: codec, codec,
read_buf: BytesMut::with_capacity(INITIAL_CAPACITY), read_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
is_readable: false, is_readable: false,
eof: false, eof: false,
@ -224,33 +230,41 @@ pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformH
} }
} }
use winapi::um::{processthreadsapi, winnt, handleapi};
use winapi::shared::minwindef::{DWORD, FALSE};
use super::PlatformHandleType; use super::PlatformHandleType;
use winapi::shared::minwindef::{DWORD, FALSE};
use winapi::um::{handleapi, processthreadsapi, winnt};
// source_handle is effectively taken ownership of (consumed) and // source_handle is effectively taken ownership of (consumed) and
// closed when duplicate_platformhandle is called. // closed when duplicate_platformhandle is called.
// TODO: Make this transfer more explicit via the type system. // TODO: Make this transfer more explicit via the type system.
unsafe fn duplicate_platformhandle(source_handle: PlatformHandleType, unsafe fn duplicate_platformhandle(
target_pid: DWORD) -> Result<PlatformHandleType, std::io::Error> { source_handle: PlatformHandleType,
target_pid: DWORD,
) -> Result<PlatformHandleType, std::io::Error> {
let source = processthreadsapi::GetCurrentProcess(); let source = processthreadsapi::GetCurrentProcess();
let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, target_pid);
FALSE,
target_pid);
if !super::valid_handle(target) { if !super::valid_handle(target) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "invalid target process")); return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"invalid target process",
));
} }
let mut target_handle = std::ptr::null_mut(); let mut target_handle = std::ptr::null_mut();
let ok = handleapi::DuplicateHandle(source, let ok = handleapi::DuplicateHandle(
source_handle, source,
target, source_handle,
&mut target_handle, target,
0, &mut target_handle,
FALSE, 0,
winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS); FALSE,
winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS,
);
if ok == FALSE { if ok == FALSE {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "DuplicateHandle failed")); return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"DuplicateHandle failed",
));
} }
Ok(target_handle) Ok(target_handle)
} }

Просмотреть файл

@ -2,36 +2,20 @@
// //
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
#![warn(unused_extern_crates)]
#![recursion_limit = "1024"] #![recursion_limit = "1024"]
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
extern crate bincode;
extern crate bytes;
extern crate cubeb;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
extern crate iovec;
extern crate libc;
extern crate memmap;
#[macro_use]
extern crate scoped_tls;
extern crate serde;
extern crate tokio_core;
#[macro_use] #[macro_use]
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_uds;
#[cfg(windows)]
extern crate winapi;
mod async; mod async_msg;
#[cfg(unix)] #[cfg(unix)]
mod cmsg; mod cmsg;
pub mod codec; pub mod codec;
@ -41,7 +25,7 @@ pub mod errors;
#[cfg(unix)] #[cfg(unix)]
pub mod fd_passing; pub mod fd_passing;
#[cfg(unix)] #[cfg(unix)]
pub use fd_passing as platformhandle_passing; pub use crate::fd_passing as platformhandle_passing;
#[cfg(windows)] #[cfg(windows)]
pub mod handle_passing; pub mod handle_passing;
#[cfg(windows)] #[cfg(windows)]
@ -53,14 +37,21 @@ mod msg;
pub mod rpc; pub mod rpc;
pub mod shm; pub mod shm;
pub use messages::{ClientMessage, ServerMessage}; // TODO: Remove local fork when https://github.com/tokio-rs/tokio/pull/1294 is resolved.
#[cfg(unix)]
mod tokio_uds_stream;
pub use crate::messages::{ClientMessage, ServerMessage};
use std::env::temp_dir; use std::env::temp_dir;
use std::path::PathBuf; use std::path::PathBuf;
#[cfg(windows)] // TODO: Remove hardcoded size and allow allocation based on cubeb backend requirements.
use std::os::windows::io::{FromRawHandle, IntoRawHandle}; pub const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd}; use std::os::unix::io::{FromRawFd, IntoRawFd};
#[cfg(windows)]
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
// This must match the definition of // This must match the definition of
// ipc::FileDescriptor::PlatformHandleType in Gecko. // ipc::FileDescriptor::PlatformHandleType in Gecko.
@ -93,7 +84,7 @@ struct PlatformHandleVisitor;
impl<'de> serde::de::Visitor<'de> for PlatformHandleVisitor { impl<'de> serde::de::Visitor<'de> for PlatformHandleVisitor {
type Value = PlatformHandle; type Value = PlatformHandle;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("an integer between -2^63 and 2^63") formatter.write_str("an integer between -2^63 and 2^63")
} }
@ -158,7 +149,7 @@ impl PlatformHandle {
std::fs::File::from_raw_fd(self.0) std::fs::File::from_raw_fd(self.0)
} }
pub fn as_raw(&self) -> PlatformHandleType { pub fn as_raw(self) -> PlatformHandleType {
self.0 self.0
} }
@ -187,11 +178,9 @@ pub fn get_shm_path(dir: &str) -> PathBuf {
#[cfg(unix)] #[cfg(unix)]
pub mod messagestream_unix; pub mod messagestream_unix;
#[cfg(unix)] #[cfg(unix)]
pub use messagestream_unix::*; pub use crate::messagestream_unix::*;
#[cfg(windows)] #[cfg(windows)]
pub mod messagestream_win; pub mod messagestream_win;
#[cfg(windows)] #[cfg(windows)]
pub use messagestream_win::*; pub use messagestream_win::*;
#[cfg(windows)]
mod tokio_named_pipes;

Просмотреть файл

@ -3,8 +3,8 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use PlatformHandle; use crate::PlatformHandle;
use PlatformHandleType; use crate::PlatformHandleType;
use cubeb::{self, ffi}; use cubeb::{self, ffi};
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::os::raw::{c_char, c_int, c_uint}; use std::os::raw::{c_char, c_int, c_uint};
@ -174,6 +174,12 @@ pub struct StreamCreate {
pub target_pid: u32, pub target_pid: u32,
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct RegisterDeviceCollectionChanged {
pub platform_handles: [PlatformHandle; 3],
pub target_pid: u32,
}
// Client -> Server messages. // Client -> Server messages.
// TODO: Callbacks should be different messages types so // TODO: Callbacks should be different messages types so
// ServerConn::process_msg doesn't have a catch-all case. // ServerConn::process_msg doesn't have a catch-all case.
@ -187,6 +193,8 @@ pub enum ServerMessage {
ContextGetMinLatency(StreamParams), ContextGetMinLatency(StreamParams),
ContextGetPreferredSampleRate, ContextGetPreferredSampleRate,
ContextGetDeviceEnumeration(ffi::cubeb_device_type), ContextGetDeviceEnumeration(ffi::cubeb_device_type),
ContextSetupDeviceCollectionCallback,
ContextRegisterDeviceCollectionChanged(ffi::cubeb_device_type, bool),
StreamInit(StreamInitParams), StreamInit(StreamInitParams),
StreamDestroy(usize), StreamDestroy(usize),
@ -199,6 +207,7 @@ pub enum ServerMessage {
StreamSetVolume(usize, f32), StreamSetVolume(usize, f32),
StreamSetPanning(usize, f32), StreamSetPanning(usize, f32),
StreamGetCurrentDevice(usize), StreamGetCurrentDevice(usize),
StreamRegisterDeviceChangeCallback(usize, bool),
} }
// Server -> Client messages. // Server -> Client messages.
@ -208,11 +217,13 @@ pub enum ClientMessage {
ClientConnected, ClientConnected,
ClientDisconnected, ClientDisconnected,
ContextBackendId(), ContextBackendId(String),
ContextMaxChannelCount(u32), ContextMaxChannelCount(u32),
ContextMinLatency(u32), ContextMinLatency(u32),
ContextPreferredSampleRate(u32), ContextPreferredSampleRate(u32),
ContextEnumeratedDevices(Vec<DeviceInfo>), ContextEnumeratedDevices(Vec<DeviceInfo>),
ContextSetupDeviceCollectionCallback(RegisterDeviceCollectionChanged),
ContextRegisteredDeviceCollectionChanged,
StreamCreated(StreamCreate), StreamCreated(StreamCreate),
StreamDestroyed, StreamDestroyed,
@ -225,22 +236,37 @@ pub enum ClientMessage {
StreamVolumeSet, StreamVolumeSet,
StreamPanningSet, StreamPanningSet,
StreamCurrentDevice(Device), StreamCurrentDevice(Device),
StreamRegisterDeviceChangeCallback,
Error(c_int), Error(c_int),
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub enum CallbackReq { pub enum CallbackReq {
Data { nframes: isize, Data {
input_frame_size: usize, nframes: isize,
output_frame_size: usize }, input_frame_size: usize,
output_frame_size: usize,
},
State(ffi::cubeb_state), State(ffi::cubeb_state),
DeviceChange,
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub enum CallbackResp { pub enum CallbackResp {
Data(isize), Data(isize),
State, State,
DeviceChange,
}
#[derive(Debug, Deserialize, Serialize)]
pub enum DeviceCollectionReq {
DeviceChange(ffi::cubeb_device_type),
}
#[derive(Debug, Deserialize, Serialize)]
pub enum DeviceCollectionResp {
DeviceChange,
} }
pub trait AssocRawPlatformHandle { pub trait AssocRawPlatformHandle {
@ -250,7 +276,8 @@ pub trait AssocRawPlatformHandle {
fn take_platform_handles<F>(&mut self, f: F) fn take_platform_handles<F>(&mut self, f: F)
where where
F: FnOnce() -> Option<[PlatformHandleType; 3]> { F: FnOnce() -> Option<[PlatformHandleType; 3]>,
{
assert!(f().is_none()); assert!(f().is_none());
} }
} }
@ -260,10 +287,22 @@ impl AssocRawPlatformHandle for ServerMessage {}
impl AssocRawPlatformHandle for ClientMessage { impl AssocRawPlatformHandle for ClientMessage {
fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> { fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> {
match *self { match *self {
ClientMessage::StreamCreated(ref data) => Some(([data.platform_handles[0].as_raw(), ClientMessage::StreamCreated(ref data) => Some((
data.platform_handles[1].as_raw(), [
data.platform_handles[2].as_raw()], data.platform_handles[0].as_raw(),
data.target_pid)), data.platform_handles[1].as_raw(),
data.platform_handles[2].as_raw(),
],
data.target_pid,
)),
ClientMessage::ContextSetupDeviceCollectionCallback(ref data) => Some((
[
data.platform_handles[0].as_raw(),
data.platform_handles[1].as_raw(),
data.platform_handles[2].as_raw(),
],
data.target_pid,
)),
_ => None, _ => None,
} }
} }
@ -272,11 +311,25 @@ impl AssocRawPlatformHandle for ClientMessage {
where where
F: FnOnce() -> Option<[PlatformHandleType; 3]>, F: FnOnce() -> Option<[PlatformHandleType; 3]>,
{ {
if let ClientMessage::StreamCreated(ref mut data) = *self { match *self {
let handles = f().expect("platform_handles must be available when processing StreamCreated"); ClientMessage::StreamCreated(ref mut data) => {
data.platform_handles = [PlatformHandle::new(handles[0]), let handles =
PlatformHandle::new(handles[1]), f().expect("platform_handles must be available when processing StreamCreated");
PlatformHandle::new(handles[2])] data.platform_handles = [
PlatformHandle::new(handles[0]),
PlatformHandle::new(handles[1]),
PlatformHandle::new(handles[2]),
]
}
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
let handles = f().expect("platform_handles must be available when processing ContextSetupDeviceCollectionCallback");
data.platform_handles = [
PlatformHandle::new(handles[0]),
PlatformHandle::new(handles[1]),
PlatformHandle::new(handles[2]),
]
}
_ => {}
} }
} }
} }

Просмотреть файл

@ -3,7 +3,10 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use std::os::unix::io::{IntoRawFd, FromRawFd, AsRawFd, RawFd}; use super::tokio_uds_stream as tokio_uds;
use futures::Poll;
use mio::Ready;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::net; use std::os::unix::net;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -16,7 +19,8 @@ impl MessageStream {
MessageStream(stream) MessageStream(stream)
} }
pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { pub fn anonymous_ipc_pair(
) -> std::result::Result<(MessageStream, MessageStream), std::io::Error> {
let pair = net::UnixStream::pair()?; let pair = net::UnixStream::pair()?;
Ok((MessageStream::new(pair.0), MessageStream::new(pair.1))) Ok((MessageStream::new(pair.0), MessageStream::new(pair.1)))
} }
@ -25,8 +29,13 @@ impl MessageStream {
MessageStream::new(net::UnixStream::from_raw_fd(raw)) MessageStream::new(net::UnixStream::from_raw_fd(raw))
} }
pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result<AsyncMessageStream, std::io::Error> { pub fn into_tokio_ipc(
Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_stream(self.0, handle)?)) self,
handle: &tokio::reactor::Handle,
) -> std::result::Result<AsyncMessageStream, std::io::Error> {
Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_std(
self.0, handle,
)?))
} }
} }
@ -35,20 +44,20 @@ impl AsyncMessageStream {
AsyncMessageStream(stream) AsyncMessageStream(stream)
} }
pub fn poll_read(&self) -> futures::Async<()> { pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, std::io::Error> {
self.0.poll_read() self.0.poll_read_ready(ready)
} }
pub fn poll_write(&self) -> futures::Async<()> { pub fn clear_read_ready(&self, ready: Ready) -> Result<(), std::io::Error> {
self.0.poll_write() self.0.clear_read_ready(ready)
} }
pub fn need_read(&self) { pub fn poll_write_ready(&self) -> Poll<Ready, std::io::Error> {
self.0.need_read() self.0.poll_write_ready()
} }
pub fn need_write(&self) { pub fn clear_write_ready(&self) -> Result<(), std::io::Error> {
self.0.need_write() self.0.clear_write_ready()
} }
} }

Просмотреть файл

@ -3,8 +3,8 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
extern crate mio_named_pipes; use mio_named_pipes;
use std::os::windows::io::{IntoRawHandle, FromRawHandle, AsRawHandle, RawHandle}; use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_named_pipes; use tokio_named_pipes;
@ -18,8 +18,8 @@ impl MessageStream {
MessageStream(stream) MessageStream(stream)
} }
pub fn anonymous_ipc_pair(
pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { ) -> std::result::Result<(MessageStream, MessageStream), std::io::Error> {
let pipe1 = mio_named_pipes::NamedPipe::new(get_pipe_name())?; let pipe1 = mio_named_pipes::NamedPipe::new(get_pipe_name())?;
let pipe2 = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(pipe1.as_raw_handle()) }; let pipe2 = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(pipe1.as_raw_handle()) };
Ok((MessageStream::new(pipe1), MessageStream::new(pipe2))) Ok((MessageStream::new(pipe1), MessageStream::new(pipe2)))
@ -29,8 +29,13 @@ impl MessageStream {
MessageStream::new(mio_named_pipes::NamedPipe::from_raw_handle(raw)) MessageStream::new(mio_named_pipes::NamedPipe::from_raw_handle(raw))
} }
pub fn into_tokio_ipc(self, handle: &tokio_core::reactor::Handle) -> std::result::Result<AsyncMessageStream, std::io::Error> { pub fn into_tokio_ipc(
Ok(AsyncMessageStream::new(tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?)) self,
handle: &tokio::reactor::Handle,
) -> std::result::Result<AsyncMessageStream, std::io::Error> {
Ok(AsyncMessageStream::new(
tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?,
))
} }
} }
@ -38,22 +43,6 @@ impl AsyncMessageStream {
fn new(stream: tokio_named_pipes::NamedPipe) -> AsyncMessageStream { fn new(stream: tokio_named_pipes::NamedPipe) -> AsyncMessageStream {
AsyncMessageStream(stream) AsyncMessageStream(stream)
} }
pub fn poll_read(&self) -> futures::Async<()> {
self.0.poll_read()
}
pub fn poll_write(&self) -> futures::Async<()> {
self.0.poll_write()
}
pub fn need_read(&self) {
self.0.need_read()
}
pub fn need_write(&self) {
self.0.need_write()
}
} }
impl std::io::Read for AsyncMessageStream { impl std::io::Read for AsyncMessageStream {

Просмотреть файл

@ -3,7 +3,7 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details. // accompanying file LICENSE for details.
use iovec::unix as iovec; use iovec::unix;
use iovec::IoVec; use iovec::IoVec;
use libc; use libc;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
@ -68,7 +68,7 @@ pub fn recv_msg_with_flags(
cmsg: &mut [u8], cmsg: &mut [u8],
flags: libc::c_int, flags: libc::c_int,
) -> io::Result<(usize, usize, libc::c_int)> { ) -> io::Result<(usize, usize, libc::c_int)> {
let slice = iovec::as_os_slice_mut(bufs); let slice = unix::as_os_slice_mut(bufs);
let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
let (control, controllen) = if cmsg.is_empty() { let (control, controllen) = if cmsg.is_empty() {
(ptr::null_mut(), 0) (ptr::null_mut(), 0)
@ -84,9 +84,7 @@ pub fn recv_msg_with_flags(
msghdr.msg_control = control; msghdr.msg_control = control;
msghdr.msg_controllen = controllen as _; msghdr.msg_controllen = controllen as _;
let n = try!(cvt_r(|| unsafe { let n = cvt_r(|| unsafe { libc::recvmsg(socket, &mut msghdr as *mut _, flags) })?;
libc::recvmsg(socket, &mut msghdr as *mut _, flags)
}));
let controllen = msghdr.msg_controllen as usize; let controllen = msghdr.msg_controllen as usize;
Ok((n, controllen, msghdr.msg_flags)) Ok((n, controllen, msghdr.msg_flags))
@ -98,7 +96,7 @@ pub fn send_msg_with_flags(
cmsg: &[u8], cmsg: &[u8],
flags: libc::c_int, flags: libc::c_int,
) -> io::Result<usize> { ) -> io::Result<usize> {
let slice = iovec::as_os_slice(bufs); let slice = unix::as_os_slice(bufs);
let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
let (control, controllen) = if cmsg.is_empty() { let (control, controllen) = if cmsg.is_empty() {
(ptr::null_mut(), 0) (ptr::null_mut(), 0)

Просмотреть файл

@ -39,22 +39,19 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::rpc::driver::Driver;
use crate::rpc::Handler;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{Async, Future, Poll, Sink, Stream}; use futures::{Async, Future, Poll, Sink, Stream};
use rpc::driver::Driver;
use rpc::Handler;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io; use std::io;
use tokio_core::reactor::Handle; use tokio::runtime::current_thread;
mod proxy; mod proxy;
pub use self::proxy::{ClientProxy, Response}; pub use self::proxy::{ClientProxy, Response};
pub fn bind_client<C>( pub fn bind_client<C>(transport: C::Transport) -> proxy::ClientProxy<C::Request, C::Response>
transport: C::Transport,
handle: &Handle,
) -> proxy::ClientProxy<C::Request, C::Response>
where where
C: Client, C: Client,
{ {
@ -62,7 +59,7 @@ where
let fut = { let fut = {
let handler = ClientHandler::<C> { let handler = ClientHandler::<C> {
transport: transport, transport,
requests: rx, requests: rx,
in_flight: VecDeque::with_capacity(32), in_flight: VecDeque::with_capacity(32),
}; };
@ -70,7 +67,7 @@ where
}; };
// Spawn the RPC driver into task // Spawn the RPC driver into task
handle.spawn(Box::new(fut.map_err(|_| ()))); current_thread::spawn(fut.map_err(|_| ()));
tx tx
} }

Просмотреть файл

@ -104,7 +104,7 @@ where
R: fmt::Debug, R: fmt::Debug,
Q: fmt::Debug, Q: fmt::Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ClientProxy {{ ... }}") write!(f, "ClientProxy {{ ... }}")
} }
} }
@ -130,7 +130,7 @@ impl<Q> fmt::Debug for Response<Q>
where where
Q: fmt::Debug, Q: fmt::Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Response {{ ... }}") write!(f, "Response {{ ... }}")
} }
} }

Просмотреть файл

@ -3,8 +3,8 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use crate::rpc::Handler;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use rpc::Handler;
use std::fmt; use std::fmt;
use std::io; use std::io;
@ -29,7 +29,7 @@ where
/// Create a new rpc driver with the given service and transport. /// Create a new rpc driver with the given service and transport.
pub fn new(handler: T) -> Driver<T> { pub fn new(handler: T) -> Driver<T> {
Driver { Driver {
handler: handler, handler,
run: true, run: true,
is_flushed: true, is_flushed: true,
} }
@ -43,8 +43,8 @@ where
/// Process incoming messages off the transport. /// Process incoming messages off the transport.
fn receive_incoming(&mut self) -> io::Result<()> { fn receive_incoming(&mut self) -> io::Result<()> {
while self.run { while self.run {
if let Async::Ready(req) = try!(self.handler.transport().poll()) { if let Async::Ready(req) = self.handler.transport().poll()? {
try!(self.process_incoming(req)); self.process_incoming(req)?;
} else { } else {
break; break;
} }
@ -82,10 +82,10 @@ where
fn send_outgoing(&mut self) -> io::Result<()> { fn send_outgoing(&mut self) -> io::Result<()> {
trace!("send_responses"); trace!("send_responses");
loop { loop {
match try!(self.handler.produce()) { match self.handler.produce()? {
Async::Ready(Some(message)) => { Async::Ready(Some(message)) => {
trace!(" --> got message"); trace!(" --> got message");
try!(self.process_outgoing(message)); self.process_outgoing(message)?;
} }
Async::Ready(None) => { Async::Ready(None) => {
trace!(" --> got None"); trace!(" --> got None");
@ -103,13 +103,13 @@ where
fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> { fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> {
trace!("process_outgoing"); trace!("process_outgoing");
try!(assert_send(&mut self.handler.transport(), message)); assert_send(&mut self.handler.transport(), message)?;
Ok(()) Ok(())
} }
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
self.is_flushed = try!(self.handler.transport().poll_complete()).is_ready(); self.is_flushed = self.handler.transport().poll_complete()?.is_ready();
// TODO: // TODO:
Ok(()) Ok(())
@ -131,13 +131,13 @@ where
trace!("rpc::Driver::tick"); trace!("rpc::Driver::tick");
// First read off data from the socket // First read off data from the socket
try!(self.receive_incoming()); self.receive_incoming()?;
// Handle completed responses // Handle completed responses
try!(self.send_outgoing()); self.send_outgoing()?;
// Try flushing buffered writes // Try flushing buffered writes
try!(self.flush()); self.flush()?;
if self.is_done() { if self.is_done() {
trace!(" --> is done."); trace!(" --> is done.");
@ -150,7 +150,7 @@ where
} }
fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> { fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> {
match try!(s.start_send(item)) { match s.start_send(item)? {
AsyncSink::Ready => Ok(()), AsyncSink::Ready => Ok(()),
AsyncSink::NotReady(_) => panic!( AsyncSink::NotReady(_) => panic!(
"sink reported itself as ready after `poll_ready` but was \ "sink reported itself as ready after `poll_ready` but was \
@ -163,7 +163,7 @@ impl<T> fmt::Debug for Driver<T>
where where
T: Handler + fmt::Debug, T: Handler + fmt::Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("rpc::Handler") f.debug_struct("rpc::Handler")
.field("handler", &self.handler) .field("handler", &self.handler)
.field("run", &self.run) .field("run", &self.run)

Просмотреть файл

@ -39,29 +39,29 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::rpc::driver::Driver;
use crate::rpc::Handler;
use futures::{Async, Future, Poll, Sink, Stream}; use futures::{Async, Future, Poll, Sink, Stream};
use rpc::driver::Driver;
use rpc::Handler;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io; use std::io;
use tokio_core::reactor::Handle; use tokio::runtime::current_thread;
/// Bind an async I/O object `io` to the `server`. /// Bind an async I/O object `io` to the `server`.
pub fn bind_server<S>(transport: S::Transport, server: S, handle: &Handle) pub fn bind_server<S>(transport: S::Transport, server: S)
where where
S: Server, S: Server,
{ {
let fut = { let fut = {
let handler = ServerHandler { let handler = ServerHandler {
server: server, server,
transport: transport, transport,
in_flight: VecDeque::with_capacity(32), in_flight: VecDeque::with_capacity(32),
}; };
Driver::new(handler) Driver::new(handler)
}; };
// Spawn the RPC driver into task // Spawn the RPC driver into task
handle.spawn(Box::new(fut.map_err(|_| ()))) current_thread::spawn(fut.map_err(|_| ()))
} }
pub trait Server: 'static { pub trait Server: 'static {

Просмотреть файл

@ -3,11 +3,12 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details. // accompanying file LICENSE for details.
use errors::*; use crate::errors::*;
use memmap::{Mmap, MmapViewSync, Protection}; use memmap::{Mmap, MmapMut, MmapOptions};
use std::cell::UnsafeCell;
use std::fs::{remove_file, File, OpenOptions}; use std::fs::{remove_file, File, OpenOptions};
use std::path::Path; use std::path::Path;
use std::sync::atomic; use std::sync::{atomic, Arc};
pub struct SharedMemReader { pub struct SharedMemReader {
mmap: Mmap, mmap: Mmap,
@ -22,7 +23,7 @@ impl SharedMemReader {
.open(path)?; .open(path)?;
let _ = remove_file(path); let _ = remove_file(path);
file.set_len(size as u64)?; file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::Read)?; let mmap = unsafe { MmapOptions::new().map(&file)? };
assert_eq!(mmap.len(), size); assert_eq!(mmap.len(), size);
Ok((SharedMemReader { mmap }, file)) Ok((SharedMemReader { mmap }, file))
} }
@ -34,10 +35,8 @@ impl SharedMemReader {
// TODO: Track how much is in the shm area. // TODO: Track how much is in the shm area.
if buf.len() <= self.mmap.len() { if buf.len() <= self.mmap.len() {
atomic::fence(atomic::Ordering::Acquire); atomic::fence(atomic::Ordering::Acquire);
unsafe { let len = buf.len();
let len = buf.len(); buf.copy_from_slice(&self.mmap[..len]);
buf.copy_from_slice(&self.mmap.as_slice()[..len]);
}
Ok(()) Ok(())
} else { } else {
bail!("mmap size"); bail!("mmap size");
@ -46,15 +45,15 @@ impl SharedMemReader {
} }
pub struct SharedMemSlice { pub struct SharedMemSlice {
view: MmapViewSync, mmap: Arc<Mmap>,
} }
impl SharedMemSlice { impl SharedMemSlice {
pub fn from(file: &File, size: usize) -> Result<SharedMemSlice> { pub fn from(file: &File, size: usize) -> Result<SharedMemSlice> {
let mmap = Mmap::open(file, Protection::Read)?; let mmap = unsafe { MmapOptions::new().map(file)? };
assert_eq!(mmap.len(), size); assert_eq!(mmap.len(), size);
let view = mmap.into_view_sync(); let mmap = Arc::new(mmap);
Ok(SharedMemSlice { view }) Ok(SharedMemSlice { mmap })
} }
pub fn get_slice(&self, size: usize) -> Result<&[u8]> { pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
@ -62,28 +61,30 @@ impl SharedMemSlice {
return Ok(&[]); return Ok(&[]);
} }
// TODO: Track how much is in the shm area. // TODO: Track how much is in the shm area.
if size <= self.view.len() { if size <= self.mmap.len() {
atomic::fence(atomic::Ordering::Acquire); atomic::fence(atomic::Ordering::Acquire);
let buf = unsafe { &self.view.as_slice()[..size] }; let buf = &self.mmap[..size];
Ok(buf) Ok(buf)
} else { } else {
bail!("mmap size"); bail!("mmap size");
} }
} }
/// Clones the view of the memory map. /// Clones the memory map.
/// ///
/// The underlying memory map is shared, and thus the caller must ensure that the memory /// The underlying memory map is shared, and thus the caller must
/// underlying the view is not illegally aliased. /// ensure that the memory is not illegally aliased.
pub unsafe fn clone_view(&self) -> Self { pub unsafe fn unsafe_clone(&self) -> Self {
SharedMemSlice { SharedMemSlice {
view: self.view.clone(), mmap: self.mmap.clone(),
} }
} }
} }
unsafe impl Send for SharedMemSlice {}
pub struct SharedMemWriter { pub struct SharedMemWriter {
mmap: Mmap, mmap: MmapMut,
} }
impl SharedMemWriter { impl SharedMemWriter {
@ -95,7 +96,7 @@ impl SharedMemWriter {
.open(path)?; .open(path)?;
let _ = remove_file(path); let _ = remove_file(path);
file.set_len(size as u64)?; file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::ReadWrite)?; let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
Ok((SharedMemWriter { mmap }, file)) Ok((SharedMemWriter { mmap }, file))
} }
@ -105,9 +106,7 @@ impl SharedMemWriter {
} }
// TODO: Track how much is in the shm area. // TODO: Track how much is in the shm area.
if buf.len() <= self.mmap.len() { if buf.len() <= self.mmap.len() {
unsafe { self.mmap[..buf.len()].copy_from_slice(buf);
self.mmap.as_mut_slice()[..buf.len()].copy_from_slice(buf);
}
atomic::fence(atomic::Ordering::Release); atomic::fence(atomic::Ordering::Release);
Ok(()) Ok(())
} else { } else {
@ -117,15 +116,15 @@ impl SharedMemWriter {
} }
pub struct SharedMemMutSlice { pub struct SharedMemMutSlice {
view: MmapViewSync, mmap: Arc<UnsafeCell<MmapMut>>,
} }
impl SharedMemMutSlice { impl SharedMemMutSlice {
pub fn from(file: &File, size: usize) -> Result<SharedMemMutSlice> { pub fn from(file: &File, size: usize) -> Result<SharedMemMutSlice> {
let mmap = Mmap::open(file, Protection::ReadWrite)?; let mmap = unsafe { MmapOptions::new().map_mut(file)? };
assert_eq!(mmap.len(), size); assert_eq!(mmap.len(), size);
let view = mmap.into_view_sync(); let mmap = Arc::new(UnsafeCell::new(mmap));
Ok(SharedMemMutSlice { view }) Ok(SharedMemMutSlice { mmap })
} }
pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> { pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
@ -133,8 +132,8 @@ impl SharedMemMutSlice {
return Ok(&mut []); return Ok(&mut []);
} }
// TODO: Track how much is in the shm area. // TODO: Track how much is in the shm area.
if size <= self.view.len() { if size <= self.inner().len() {
let buf = unsafe { &mut self.view.as_mut_slice()[..size] }; let buf = &mut self.inner_mut()[..size];
atomic::fence(atomic::Ordering::Release); atomic::fence(atomic::Ordering::Release);
Ok(buf) Ok(buf)
} else { } else {
@ -142,14 +141,23 @@ impl SharedMemMutSlice {
} }
} }
/// Clones the view of the memory map. /// Clones the memory map.
/// ///
/// The underlying memory map is shared, and thus the caller must /// The underlying memory map is shared, and thus the caller must
/// ensure that the memory underlying the view is not illegally /// ensure that the memory is not illegally aliased.
/// aliased. pub unsafe fn unsafe_clone(&self) -> Self {
pub unsafe fn clone_view(&self) -> Self {
SharedMemMutSlice { SharedMemMutSlice {
view: self.view.clone(), mmap: self.mmap.clone(),
} }
} }
fn inner(&self) -> &MmapMut {
unsafe { &*self.mmap.get() }
}
fn inner_mut(&mut self) -> &mut MmapMut {
unsafe { &mut *self.mmap.get() }
}
} }
unsafe impl Send for SharedMemMutSlice {}

Просмотреть файл

@ -1,175 +0,0 @@
// From https://github.com/alexcrichton/tokio-named-pipes/commit/3a22f8fc9a441b548aec25bd5df3b1e0ab99fabe
// License MIT/Apache-2.0
// Sloppily updated to be compatible with tokio_io
// To be replaced with tokio_named_pipes crate after tokio 0.1 update.
#![cfg(windows)]
extern crate bytes;
extern crate tokio_core;
extern crate mio_named_pipes;
extern crate futures;
use std::ffi::OsStr;
use std::fmt;
use std::io::{self, Read, Write};
use std::os::windows::io::*;
use futures::{Async, Poll};
use bytes::{BufMut, Buf};
#[allow(deprecated)]
use tokio_core::io::Io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::{PollEvented, Handle};
pub struct NamedPipe {
io: PollEvented<mio_named_pipes::NamedPipe>,
}
impl NamedPipe {
pub fn new<P: AsRef<OsStr>>(p: P, handle: &Handle) -> io::Result<NamedPipe> {
NamedPipe::_new(p.as_ref(), handle)
}
fn _new(p: &OsStr, handle: &Handle) -> io::Result<NamedPipe> {
let inner = try!(mio_named_pipes::NamedPipe::new(p));
NamedPipe::from_pipe(inner, handle)
}
pub fn from_pipe(pipe: mio_named_pipes::NamedPipe,
handle: &Handle)
-> io::Result<NamedPipe> {
Ok(NamedPipe {
io: try!(PollEvented::new(pipe, handle)),
})
}
pub fn connect(&self) -> io::Result<()> {
self.io.get_ref().connect()
}
pub fn disconnect(&self) -> io::Result<()> {
self.io.get_ref().disconnect()
}
pub fn need_read(&self) {
self.io.need_read()
}
pub fn need_write(&self) {
self.io.need_write()
}
pub fn poll_read(&self) -> Async<()> {
self.io.poll_read()
}
pub fn poll_write(&self) -> Async<()> {
self.io.poll_write()
}
}
impl Read for NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl Write for NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
#[allow(deprecated)]
impl Io for NamedPipe {
fn poll_read(&mut self) -> Async<()> {
<NamedPipe>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<NamedPipe>::poll_write(self)
}
}
impl AsyncRead for NamedPipe {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if NamedPipe::poll_read(self).is_not_ready() {
return Ok(Async::NotReady)
}
let mut stack_buf = [0u8; 1024];
let bytes_read = self.io.read(&mut stack_buf);
match bytes_read {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io.need_read();
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_read) => {
buf.put_slice(&stack_buf[0..bytes_read]);
Ok(Async::Ready(bytes_read))
}
}
}
}
impl AsyncWrite for NamedPipe {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if NamedPipe::poll_write(self).is_not_ready() {
return Ok(Async::NotReady)
}
let bytes_wrt = self.io.write(buf.bytes());
match bytes_wrt {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io.need_write();
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_wrt) => {
buf.advance(bytes_wrt);
Ok(Async::Ready(bytes_wrt))
}
}
}
}
impl<'a> Read for &'a NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.io).flush()
}
}
impl fmt::Debug for NamedPipe {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> RawHandle {
self.io.get_ref().as_raw_handle()
}
}

Просмотреть файл

@ -0,0 +1,363 @@
// Copied from tokio-uds/src/stream.rs revision 25e835c5b7e2cfeb9c22b1fd576844f6814a9477 (tokio-uds 0.2.5)
// License MIT per upstream: https://github.com/tokio-rs/tokio/blob/master/tokio-uds/LICENSE
// - Removed ucred for build simplicity
// - Added clear_{read,write}_ready per: https://github.com/tokio-rs/tokio/pull/1294
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::{Handle, PollEvented};
use bytes::{Buf, BufMut};
use futures::{Async, Future, Poll};
use iovec::{self, IoVec};
use libc;
use mio::Ready;
use mio_uds;
use std::fmt;
use std::io::{self, Read, Write};
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
/// A structure representing a connected Unix socket.
///
/// This socket can be connected directly with `UnixStream::connect` or accepted
/// from a listener with `UnixListener::incoming`. Additionally, a pair of
/// anonymous Unix sockets can be created with `UnixStream::pair`.
pub struct UnixStream {
io: PollEvented<mio_uds::UnixStream>,
}
/// Future returned by `UnixStream::connect` which will resolve to a
/// `UnixStream` when the stream is connected.
#[derive(Debug)]
pub struct ConnectFuture {
inner: State,
}
#[derive(Debug)]
enum State {
Waiting(UnixStream),
Error(io::Error),
Empty,
}
impl UnixStream {
/// Connects to the socket named by `path`.
///
/// This function will create a new Unix socket and connect to the path
/// specified, associating the returned stream with the default event loop's
/// handle.
pub fn connect<P>(path: P) -> ConnectFuture
where
P: AsRef<Path>,
{
let res = mio_uds::UnixStream::connect(path).map(UnixStream::new);
let inner = match res {
Ok(stream) => State::Waiting(stream),
Err(e) => State::Error(e),
};
ConnectFuture { inner }
}
/// Consumes a `UnixStream` in the standard library and returns a
/// nonblocking `UnixStream` from this crate.
///
/// The returned stream will be associated with the given event loop
/// specified by `handle` and is ready to perform I/O.
pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
let stream = mio_uds::UnixStream::from_stream(stream)?;
let io = PollEvented::new_with_handle(stream, handle)?;
Ok(UnixStream { io })
}
/// Creates an unnamed pair of connected sockets.
///
/// This function will create a pair of interconnected Unix sockets for
/// communicating back and forth between one another. Each socket will
/// be associated with the default event loop's handle.
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
let (a, b) = mio_uds::UnixStream::pair()?;
let a = UnixStream::new(a);
let b = UnixStream::new(b);
Ok((a, b))
}
pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
let io = PollEvented::new(stream);
UnixStream { io }
}
/// Test whether this socket is ready to be read or not.
pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
self.io.poll_read_ready(ready)
}
/// Clear read ready state.
pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
self.io.clear_read_ready(ready)
}
/// Test whether this socket is ready to be written to or not.
pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
self.io.poll_write_ready()
}
/// Clear write ready state.
pub fn clear_write_ready(&self) -> io::Result<()> {
self.io.clear_write_ready()
}
/// Returns the socket address of the local half of this connection.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
/// Returns the socket address of the remote half of this connection.
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().peer_addr()
}
/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.io.get_ref().take_error()
}
/// Shuts down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O calls on the
/// specified portions to immediately return with an appropriate value
/// (see the documentation of `Shutdown`).
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}
}
impl Read for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl Write for UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl AsyncRead for UnixStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&UnixStream>::read_buf(&mut &*self, buf)
}
}
impl AsyncWrite for UnixStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
<&UnixStream>::shutdown(&mut &*self)
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&UnixStream>::write_buf(&mut &*self, buf)
}
}
impl<'a> Read for &'a UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.io).flush()
}
}
impl<'a> AsyncRead for &'a UnixStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
return Ok(Async::NotReady);
}
unsafe {
let r = read_ready(buf, self.as_raw_fd());
if r == -1 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::WouldBlock {
self.io.clear_read_ready(Ready::readable())?;
Ok(Async::NotReady)
} else {
Err(e)
}
} else {
let r = r as usize;
buf.advance_mut(r);
Ok(r.into())
}
}
}
}
impl<'a> AsyncWrite for &'a UnixStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
return Ok(Async::NotReady);
}
unsafe {
let r = write_ready(buf, self.as_raw_fd());
if r == -1 {
let e = io::Error::last_os_error();
if e.kind() == io::ErrorKind::WouldBlock {
self.io.clear_write_ready()?;
Ok(Async::NotReady)
} else {
Err(e)
}
} else {
let r = r as usize;
buf.advance(r);
Ok(r.into())
}
}
}
}
impl fmt::Debug for UnixStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl AsRawFd for UnixStream {
fn as_raw_fd(&self) -> RawFd {
self.io.get_ref().as_raw_fd()
}
}
impl Future for ConnectFuture {
type Item = UnixStream;
type Error = io::Error;
fn poll(&mut self) -> Poll<UnixStream, io::Error> {
use std::mem;
match self.inner {
State::Waiting(ref mut stream) => {
if let Async::NotReady = stream.io.poll_write_ready()? {
return Ok(Async::NotReady);
}
if let Some(e) = stream.io.get_ref().take_error()? {
return Err(e);
}
}
State::Error(_) => {
let e = match mem::replace(&mut self.inner, State::Empty) {
State::Error(e) => e,
_ => unreachable!(),
};
return Err(e);
}
State::Empty => panic!("can't poll stream twice"),
}
match mem::replace(&mut self.inner, State::Empty) {
State::Waiting(stream) => Ok(Async::Ready(stream)),
_ => unreachable!(),
}
}
}
unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
// The `IoVec` type can't have a 0-length size, so we create a bunch
// of dummy versions on the stack with 1 length which we'll quickly
// overwrite.
let b1: &mut [u8] = &mut [0];
let b2: &mut [u8] = &mut [0];
let b3: &mut [u8] = &mut [0];
let b4: &mut [u8] = &mut [0];
let b5: &mut [u8] = &mut [0];
let b6: &mut [u8] = &mut [0];
let b7: &mut [u8] = &mut [0];
let b8: &mut [u8] = &mut [0];
let b9: &mut [u8] = &mut [0];
let b10: &mut [u8] = &mut [0];
let b11: &mut [u8] = &mut [0];
let b12: &mut [u8] = &mut [0];
let b13: &mut [u8] = &mut [0];
let b14: &mut [u8] = &mut [0];
let b15: &mut [u8] = &mut [0];
let b16: &mut [u8] = &mut [0];
let mut bufs: [&mut IoVec; 16] = [
b1.into(),
b2.into(),
b3.into(),
b4.into(),
b5.into(),
b6.into(),
b7.into(),
b8.into(),
b9.into(),
b10.into(),
b11.into(),
b12.into(),
b13.into(),
b14.into(),
b15.into(),
b16.into(),
];
let n = buf.bytes_vec_mut(&mut bufs);
read_ready_vecs(&mut bufs[..n], raw_fd)
}
unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
let iovecs = iovec::unix::as_os_slice_mut(bufs);
libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
}
unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
// The `IoVec` type can't have a zero-length size, so create a dummy
// version from a 1-length slice which we'll overwrite with the
// `bytes_vec` method.
static DUMMY: &[u8] = &[0];
let iovec = <&IoVec>::from(DUMMY);
let mut bufs = [
iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
iovec, iovec, iovec,
];
let n = buf.bytes_vec(&mut bufs);
write_ready_vecs(&bufs[..n], raw_fd)
}
unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
let iovecs = iovec::unix::as_os_slice(bufs);
libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
}

Просмотреть файл

@ -6,17 +6,14 @@ authors = [
"Dan Glastonbury <dan.glastonbury@gmail.com>" "Dan Glastonbury <dan.glastonbury@gmail.com>"
] ]
description = "Cubeb Backend for talking to remote cubeb server." description = "Cubeb Backend for talking to remote cubeb server."
edition = "2018"
[dependencies] [dependencies]
audio_thread_priority = "0.15.0"
audioipc = { path="../audioipc" } audioipc = { path="../audioipc" }
cubeb-backend = "0.5.4" cubeb-backend = "0.5.5"
foreign-types = "0.3"
futures = { version="0.1.18", default-features=false, features=["use_std"] } futures = { version="0.1.18", default-features=false, features=["use_std"] }
futures-cpupool = { version="0.1.8", default-features=false } futures-cpupool = { version="0.1.8", default-features=false }
libc = "0.2"
log = "0.4"
tokio-core = "0.1"
tokio-uds = "0.1.7"
audio_thread_priority = "0.15.0"
lazy_static = "1.2.0" lazy_static = "1.2.0"
cfg-if = "0.1.0" log = "0.4"
tokio = "0.1"

Просмотреть файл

@ -3,48 +3,46 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use assert_not_in_callback; use crate::assert_not_in_callback;
use crate::stream;
#[cfg(target_os = "linux")]
use crate::G_THREAD_POOL;
use crate::{ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
use audio_thread_priority::promote_current_thread_to_real_time; use audio_thread_priority::promote_current_thread_to_real_time;
use audioipc::codec::LengthDelimitedCodec; use audioipc::codec::LengthDelimitedCodec;
use audioipc::frame::{framed, Framed};
use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles}; use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles};
use audioipc::{core, rpc}; use audioipc::{core, rpc};
use audioipc::{messages, ClientMessage, ServerMessage}; use audioipc::{
messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage,
ServerMessage,
};
use cubeb_backend::{ use cubeb_backend::{
ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result, ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result,
Stream, StreamParams, StreamParamsRef, Stream, StreamParams, StreamParamsRef,
}; };
use futures::Future; use futures::Future;
use futures_cpupool::CpuPool; use futures_cpupool::{CpuFuture, CpuPool};
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::os::raw::c_void; use std::os::raw::c_void;
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::{fmt, io, mem, ptr}; use std::{fmt, io, mem, ptr};
use stream; use tokio::reactor;
use tokio_core::reactor::{Handle, Remote}; use tokio::runtime::current_thread;
use {ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
cfg_if! {
if #[cfg(target_os = "linux")] {
use {G_THREAD_POOL};
}
}
struct CubebClient; struct CubebClient;
impl rpc::Client for CubebClient { impl rpc::Client for CubebClient {
type Request = ServerMessage; type Request = ServerMessage;
type Response = ClientMessage; type Response = ClientMessage;
type Transport = FramedWithPlatformHandles<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>; type Transport = FramedWithPlatformHandles<
audioipc::AsyncMessageStream,
LengthDelimitedCodec<Self::Request, Self::Response>,
>;
} }
macro_rules! t(
($e:expr) => (
match $e {
Ok(e) => e,
Err(_) => return Err(Error::default())
}
));
pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream); pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the // ClientContext's layout *must* match cubeb.c's `struct cubeb` for the
@ -55,12 +53,16 @@ pub struct ClientContext {
rpc: rpc::ClientProxy<ServerMessage, ClientMessage>, rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
core: core::CoreThread, core: core::CoreThread,
cpu_pool: CpuPool, cpu_pool: CpuPool,
backend_id: CString,
device_collection_rpc: bool,
input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
} }
impl ClientContext { impl ClientContext {
#[doc(hidden)] #[doc(hidden)]
pub fn remote(&self) -> Remote { pub fn handle(&self) -> current_thread::Handle {
self.core.remote() self.core.handle()
} }
#[doc(hidden)] #[doc(hidden)]
@ -75,13 +77,16 @@ impl ClientContext {
} }
// TODO: encapsulate connect, etc inside audioipc. // TODO: encapsulate connect, etc inside audioipc.
fn open_server_stream() -> Result<audioipc::MessageStream> { fn open_server_stream() -> io::Result<audioipc::MessageStream> {
unsafe { unsafe {
if let Some(fd) = G_SERVER_FD { if let Some(fd) = G_SERVER_FD {
return Ok(audioipc::MessageStream::from_raw_fd(fd.as_raw())); return Ok(audioipc::MessageStream::from_raw_fd(fd.as_raw()));
} }
Err(Error::default()) Err(io::Error::new(
io::ErrorKind::Other,
"Failed to get server connection.",
))
} }
} }
@ -110,21 +115,74 @@ fn create_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
.create() .create()
} }
cfg_if! { #[cfg(target_os = "linux")]
if #[cfg(target_os = "linux")] { fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { let mut guard = G_THREAD_POOL.lock().unwrap();
let mut guard = G_THREAD_POOL.lock().unwrap(); if guard.is_some() {
if guard.is_some() { // Sandbox is on, and the thread pool was created earlier, before the lockdown.
// Sandbox is on, and the thread pool was created earlier, before the lockdown. guard.take().unwrap()
guard.take().unwrap()
} else {
// Sandbox is off, let's create the pool now, promoting the threads will work.
create_thread_pool(init_params)
}
}
} else { } else {
fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool { // Sandbox is off, let's create the pool now, promoting the threads will work.
create_thread_pool(init_params) create_thread_pool(init_params)
}
}
#[cfg(not(target_os = "linux"))]
fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
create_thread_pool(init_params)
}
#[derive(Default)]
struct DeviceCollectionCallback {
cb: ffi::cubeb_device_collection_changed_callback,
user_ptr: usize,
}
struct DeviceCollectionServer {
input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
cpu_pool: CpuPool,
}
impl rpc::Server for DeviceCollectionServer {
type Request = DeviceCollectionReq;
type Response = DeviceCollectionResp;
type Future = CpuFuture<Self::Response, ()>;
type Transport =
Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
match req {
DeviceCollectionReq::DeviceChange(device_type) => {
trace!(
"ctx_thread: DeviceChange Callback: device_type={}",
device_type
);
let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type);
let (input_cb, input_user_ptr) = {
let dcb = self.input_device_callback.lock().unwrap();
(dcb.cb, dcb.user_ptr)
};
let (output_cb, output_user_ptr) = {
let dcb = self.output_device_callback.lock().unwrap();
(dcb.cb, dcb.user_ptr)
};
self.cpu_pool.spawn_fn(move || {
if devtype.contains(cubeb_backend::DeviceType::INPUT) {
unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) }
}
if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
unsafe {
output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void)
}
}
Ok(DeviceCollectionResp::DeviceChange)
})
}
} }
} }
} }
@ -133,15 +191,14 @@ impl ContextOps for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<Context> { fn init(_context_name: Option<&CStr>) -> Result<Context> {
fn bind_and_send_client( fn bind_and_send_client(
stream: audioipc::AsyncMessageStream, stream: audioipc::AsyncMessageStream,
handle: &Handle,
tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>, tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
) -> Option<()> { ) -> io::Result<()> {
let transport = framed_with_platformhandles(stream, Default::default()); let transport = framed_with_platformhandles(stream, Default::default());
let rpc = rpc::bind_client::<CubebClient>(transport, handle); let rpc = rpc::bind_client::<CubebClient>(transport);
// If send fails then the rx end has closed // If send fails then the rx end has closed
// which is unlikely here. // which is unlikely here.
let _ = tx_rpc.send(rpc); let _ = tx_rpc.send(rpc);
Some(()) Ok(())
} }
assert_not_in_callback(); assert_not_in_callback();
@ -150,43 +207,45 @@ impl ContextOps for ClientContext {
let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap()); let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap());
let core = t!(core::spawn_thread("AudioIPC Client RPC", move || { let core = core::spawn_thread("AudioIPC Client RPC", move || {
let handle = core::handle(); let handle = reactor::Handle::default();
register_thread(params.thread_create_callback); register_thread(params.thread_create_callback);
open_server_stream() open_server_stream()
.ok() .and_then(|stream| stream.into_tokio_ipc(&handle))
.and_then(|stream| stream.into_tokio_ipc(&handle).ok()) .and_then(|stream| bind_and_send_client(stream, &tx_rpc))
.and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc)) })
.ok_or_else(|| { .map_err(|_| Error::default())?;
io::Error::new(
io::ErrorKind::Other,
"Failed to open stream and create rpc.",
)
})
}));
let rpc = t!(rx_rpc.recv()); let rpc = rx_rpc.recv().map_err(|_| Error::default())?;
// Don't let errors bubble from here. Later calls against this context // Don't let errors bubble from here. Later calls against this context
// will return errors the caller expects to handle. // will return errors the caller expects to handle.
let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected);
let pool = get_thread_pool(params); let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId())
.unwrap_or_else(|_| "(remote error)".to_string());
let backend_id = CString::new(backend_id).expect("backend_id query failed");
let cpu_pool = get_thread_pool(params);
let ctx = Box::new(ClientContext { let ctx = Box::new(ClientContext {
_ops: &CLIENT_OPS as *const _, _ops: &CLIENT_OPS as *const _,
rpc: rpc, rpc,
core: core, core,
cpu_pool: pool, cpu_pool,
backend_id,
device_collection_rpc: false,
input_device_callback: Arc::new(Mutex::new(Default::default())),
output_device_callback: Arc::new(Mutex::new(Default::default())),
}); });
Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
} }
fn backend_id(&mut self) -> &'static CStr { fn backend_id(&mut self) -> &CStr {
assert_not_in_callback(); assert_not_in_callback();
unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) } self.backend_id.as_c_str()
} }
fn max_channel_count(&mut self) -> Result<u32> { fn max_channel_count(&mut self) -> Result<u32> {
@ -264,7 +323,7 @@ impl ContextOps for ClientContext {
input_stream_params: Option<&StreamParamsRef>, input_stream_params: Option<&StreamParamsRef>,
output_device: DeviceId, output_device: DeviceId,
output_stream_params: Option<&StreamParamsRef>, output_stream_params: Option<&StreamParamsRef>,
latency_frame: u32, latency_frames: u32,
// These params aren't sent to the server // These params aren't sent to the server
data_callback: ffi::cubeb_data_callback, data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback, state_callback: ffi::cubeb_state_callback,
@ -288,24 +347,75 @@ impl ContextOps for ClientContext {
let output_stream_params = opt_stream_params(output_stream_params); let output_stream_params = opt_stream_params(output_stream_params);
let init_params = messages::StreamInitParams { let init_params = messages::StreamInitParams {
stream_name: stream_name, stream_name,
input_device: input_device as usize, input_device: input_device as usize,
input_stream_params: input_stream_params, input_stream_params,
output_device: output_device as usize, output_device: output_device as usize,
output_stream_params: output_stream_params, output_stream_params,
latency_frames: latency_frame, latency_frames,
}; };
stream::init(self, init_params, data_callback, state_callback, user_ptr) stream::init(self, init_params, data_callback, state_callback, user_ptr)
} }
fn register_device_collection_changed( fn register_device_collection_changed(
&mut self, &mut self,
_dev_type: DeviceType, devtype: DeviceType,
_collection_changed_callback: ffi::cubeb_device_collection_changed_callback, collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
_user_ptr: *mut c_void, user_ptr: *mut c_void,
) -> Result<()> { ) -> Result<()> {
assert_not_in_callback(); assert_not_in_callback();
Err(Error::not_supported())
if !self.device_collection_rpc {
let fds = send_recv!(self.rpc(),
ContextSetupDeviceCollectionCallback =>
ContextSetupDeviceCollectionCallback())?;
let stream =
unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].as_raw()) };
// TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only
// need one here. Drop the dummy handles the other side sent us to discard.
unsafe {
fds.platform_handles[1].into_file();
fds.platform_handles[2].into_file();
}
let server = DeviceCollectionServer {
input_device_callback: self.input_device_callback.clone(),
output_device_callback: self.output_device_callback.clone(),
cpu_pool: self.cpu_pool(),
};
let (wait_tx, wait_rx) = mpsc::channel();
self.handle()
.spawn(futures::future::lazy(move || {
let handle = reactor::Handle::default();
let stream = stream.into_tokio_ipc(&handle).unwrap();
let transport = framed(stream, Default::default());
rpc::bind_server(transport, server);
wait_tx.send(()).unwrap();
Ok(())
}))
.expect("Failed to spawn DeviceCollectionServer");
wait_rx.recv().unwrap();
self.device_collection_rpc = true;
}
if devtype.contains(cubeb_backend::DeviceType::INPUT) {
let mut cb = self.input_device_callback.lock().unwrap();
cb.cb = collection_changed_callback;
cb.user_ptr = user_ptr as usize;
}
if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
let mut cb = self.output_device_callback.lock().unwrap();
cb.cb = collection_changed_callback;
cb.user_ptr = user_ptr as usize;
}
let enable = collection_changed_callback.is_some();
send_recv!(self.rpc(),
ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) =>
ContextRegisteredDeviceCollectionChanged)
} }
} }
@ -322,7 +432,7 @@ impl Drop for ClientContext {
} }
impl fmt::Debug for ClientContext { impl fmt::Debug for ClientContext {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClientContext") f.debug_struct("ClientContext")
.field("_ops", &self._ops) .field("_ops", &self._ops)
.field("rpc", &self.rpc) .field("rpc", &self.rpc)

Просмотреть файл

@ -2,45 +2,36 @@
// //
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details. // accompanying file LICENSE for details.
#![warn(unused_extern_crates)]
extern crate audioipc;
#[macro_use] #[macro_use]
extern crate cubeb_backend; extern crate cubeb_backend;
extern crate foreign_types;
extern crate futures;
extern crate futures_cpupool;
extern crate libc;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate tokio_core;
extern crate tokio_uds;
extern crate audio_thread_priority;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
#[macro_use]
extern crate cfg_if;
#[macro_use] #[macro_use]
mod send_recv; mod send_recv;
mod context; mod context;
mod stream; mod stream;
use audioipc::{PlatformHandleType, PlatformHandle}; use crate::context::ClientContext;
use context::ClientContext; use crate::stream::ClientStream;
use cubeb_backend::{capi, ffi}; #[cfg(target_os = "linux")]
use std::os::raw::{c_char, c_int}; use audio_thread_priority::promote_current_thread_to_real_time;
use stream::ClientStream;
use std::sync::{Mutex};
use futures_cpupool::CpuPool;
use audio_thread_priority::RtPriorityHandle; use audio_thread_priority::RtPriorityHandle;
cfg_if! { use audioipc::{PlatformHandle, PlatformHandleType};
if #[cfg(target_os = "linux")] { use cubeb_backend::{capi, ffi};
use std::sync::{Arc, Condvar}; use futures_cpupool::CpuPool;
use std::ffi::CString; #[cfg(target_os = "linux")]
use std::thread; use std::ffi::CString;
use audio_thread_priority::promote_current_thread_to_real_time; use std::os::raw::{c_char, c_int};
} use std::sync::Mutex;
} #[cfg(target_os = "linux")]
use std::sync::{Arc, Condvar};
#[cfg(target_os = "linux")]
use std::thread;
type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>; type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>;
@ -105,58 +96,63 @@ where
static mut G_SERVER_FD: Option<PlatformHandle> = None; static mut G_SERVER_FD: Option<PlatformHandle> = None;
cfg_if! { #[cfg(target_os = "linux")]
if #[cfg(target_os = "linux")] { #[no_mangle]
#[no_mangle] pub unsafe extern "C" fn audioipc_init_threads(init_params: *const AudioIpcInitParams) {
pub unsafe extern "C" fn audioipc_init_threads(init_params: *const AudioIpcInitParams) { let thread_create_callback = (*init_params).thread_create_callback;
let thread_create_callback = (*init_params).thread_create_callback;
// It is critical that this function waits until the various threads are created, promoted to // It is critical that this function waits until the various threads are created, promoted to
// real-time, and _then_ return, because the sandbox lockdown happens right after returning // real-time, and _then_ return, because the sandbox lockdown happens right after returning
// from here. // from here.
let pair = Arc::new((Mutex::new((*init_params).pool_size), Condvar::new())); let pair = Arc::new((Mutex::new((*init_params).pool_size), Condvar::new()));
let pair2 = pair.clone(); let pair2 = pair.clone();
let register_thread = move || { let register_thread = move || {
if let Some(func) = thread_create_callback { if let Some(func) = thread_create_callback {
match promote_current_thread_to_real_time(0, 48000) { match promote_current_thread_to_real_time(0, 48000) {
Ok(handle) => { Ok(handle) => {
G_PRIORITY_HANDLES.with(|handles| { G_PRIORITY_HANDLES.with(|handles| {
(handles.borrow_mut()).push(handle); (handles.borrow_mut()).push(handle);
}); });
} }
Err(_) => { Err(_) => {
warn!("Could not promote audio threads to real-time during initialization."); warn!("Could not promote audio threads to real-time during initialization.");
}
}
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
let &(ref lock, ref cvar) = &*pair2;
let mut count = lock.lock().unwrap();
*count -= 1;
cvar.notify_one();
} }
};
let mut pool = G_THREAD_POOL.lock().unwrap();
*pool = Some(futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size((*init_params).pool_size)
.stack_size((*init_params).stack_size)
.create());
let &(ref lock, ref cvar) = &*pair;
let mut count = lock.lock().unwrap();
while *count != 0 {
count = cvar.wait(count).unwrap();
} }
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
let &(ref lock, ref cvar) = &*pair2;
let mut count = lock.lock().unwrap();
*count -= 1;
cvar.notify_one();
} }
};
let mut pool = G_THREAD_POOL.lock().unwrap();
*pool = Some(
futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size((*init_params).pool_size)
.stack_size((*init_params).stack_size)
.create(),
);
let &(ref lock, ref cvar) = &*pair;
let mut count = lock.lock().unwrap();
while *count != 0 {
count = cvar.wait(count).unwrap();
} }
} }
#[cfg(not(target_os = "linux"))]
#[no_mangle]
pub unsafe extern "C" fn audioipc_init_threads(_: *const AudioIpcInitParams) {
unimplemented!();
}
#[no_mangle] #[no_mangle]
/// Entry point from C code. /// Entry point from C code.
pub unsafe extern "C" fn audioipc_client_init( pub unsafe extern "C" fn audioipc_client_init(

Просмотреть файл

@ -3,6 +3,8 @@
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
use crate::ClientContext;
use crate::{assert_not_in_callback, set_in_callback};
use audioipc::codec::LengthDelimitedCodec; use audioipc::codec::LengthDelimitedCodec;
use audioipc::frame::{framed, Framed}; use audioipc::frame::{framed, Framed};
use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage}; use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
@ -15,11 +17,8 @@ use std::ffi::CString;
use std::os::raw::c_void; use std::os::raw::c_void;
use std::ptr; use std::ptr;
use std::sync::mpsc; use std::sync::mpsc;
use ClientContext; use std::sync::{Arc, Mutex};
use {assert_not_in_callback, set_in_callback}; use tokio::reactor;
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
pub struct Device(ffi::cubeb_device); pub struct Device(ffi::cubeb_device);
@ -46,6 +45,7 @@ pub struct ClientStream<'ctx> {
context: &'ctx ClientContext, context: &'ctx ClientContext,
user_ptr: *mut c_void, user_ptr: *mut c_void,
token: usize, token: usize,
device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
} }
struct CallbackServer { struct CallbackServer {
@ -55,17 +55,23 @@ struct CallbackServer {
state_cb: ffi::cubeb_state_callback, state_cb: ffi::cubeb_state_callback,
user_ptr: usize, user_ptr: usize,
cpu_pool: CpuPool, cpu_pool: CpuPool,
device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
} }
impl rpc::Server for CallbackServer { impl rpc::Server for CallbackServer {
type Request = CallbackReq; type Request = CallbackReq;
type Response = CallbackResp; type Response = CallbackResp;
type Future = CpuFuture<Self::Response, ()>; type Future = CpuFuture<Self::Response, ()>;
type Transport = Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>; type Transport =
Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future { fn process(&mut self, req: Self::Request) -> Self::Future {
match req { match req {
CallbackReq::Data { nframes, input_frame_size, output_frame_size } => { CallbackReq::Data {
nframes,
input_frame_size,
output_frame_size,
} => {
trace!( trace!(
"stream_thread: Data Callback: nframes={} input_fs={} output_fs={}", "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}",
nframes, nframes,
@ -75,11 +81,11 @@ impl rpc::Server for CallbackServer {
// Clone values that need to be moved into the cpu pool thread. // Clone values that need to be moved into the cpu pool thread.
let input_shm = match self.input_shm { let input_shm = match self.input_shm {
Some(ref shm) => unsafe { Some(shm.clone_view()) }, Some(ref shm) => unsafe { Some(shm.unsafe_clone()) },
None => None, None => None,
}; };
let mut output_shm = match self.output_shm { let mut output_shm = match self.output_shm {
Some(ref shm) => unsafe { Some(shm.clone_view()) }, Some(ref shm) => unsafe { Some(shm.unsafe_clone()) },
None => None, None => None,
}; };
let user_ptr = self.user_ptr; let user_ptr = self.user_ptr;
@ -131,6 +137,24 @@ impl rpc::Server for CallbackServer {
Ok(CallbackResp::State) Ok(CallbackResp::State)
}) })
} }
CallbackReq::DeviceChange => {
let cb = self.device_change_cb.clone();
let user_ptr = self.user_ptr;
self.cpu_pool.spawn_fn(move || {
set_in_callback(true);
let cb = cb.lock().unwrap();
if let Some(cb) = *cb {
unsafe {
cb(user_ptr as *mut _);
}
} else {
warn!("DeviceChange received with null callback");
}
set_in_callback(false);
Ok(CallbackResp::DeviceChange)
})
}
} }
} }
} }
@ -149,9 +173,12 @@ impl<'ctx> ClientStream<'ctx> {
let has_output = init_params.output_stream_params.is_some(); let has_output = init_params.output_stream_params.is_some();
let rpc = ctx.rpc(); let rpc = ctx.rpc();
let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated())); let data = send_recv!(rpc, StreamInit(init_params) => StreamCreated())?;
debug!("token = {}, handles = {:?}", data.token, data.platform_handles); debug!(
"token = {}, handles = {:?}",
data.token, data.platform_handles
);
let stm = data.platform_handles[0]; let stm = data.platform_handles[0];
let stream = unsafe { audioipc::MessageStream::from_raw_fd(stm.as_raw()) }; let stream = unsafe { audioipc::MessageStream::from_raw_fd(stm.as_raw()) };
@ -159,7 +186,7 @@ impl<'ctx> ClientStream<'ctx> {
let input = data.platform_handles[1]; let input = data.platform_handles[1];
let input_file = unsafe { input.into_file() }; let input_file = unsafe { input.into_file() };
let input_shm = if has_input { let input_shm = if has_input {
Some(SharedMemSlice::from(&input_file, SHM_AREA_SIZE).unwrap()) Some(SharedMemSlice::from(&input_file, audioipc::SHM_AREA_SIZE).unwrap())
} else { } else {
None None
}; };
@ -167,7 +194,7 @@ impl<'ctx> ClientStream<'ctx> {
let output = data.platform_handles[2]; let output = data.platform_handles[2];
let output_file = unsafe { output.into_file() }; let output_file = unsafe { output.into_file() };
let output_shm = if has_output { let output_shm = if has_output {
Some(SharedMemMutSlice::from(&output_file, SHM_AREA_SIZE).unwrap()) Some(SharedMemMutSlice::from(&output_file, audioipc::SHM_AREA_SIZE).unwrap())
} else { } else {
None None
}; };
@ -176,29 +203,37 @@ impl<'ctx> ClientStream<'ctx> {
let cpu_pool = ctx.cpu_pool(); let cpu_pool = ctx.cpu_pool();
let null_cb: ffi::cubeb_device_changed_callback = None;
let device_change_cb = Arc::new(Mutex::new(null_cb));
let server = CallbackServer { let server = CallbackServer {
input_shm: input_shm, input_shm,
output_shm: output_shm, output_shm,
data_cb: data_callback, data_cb: data_callback,
state_cb: state_callback, state_cb: state_callback,
user_ptr: user_data, user_ptr: user_data,
cpu_pool: cpu_pool, cpu_pool,
device_change_cb: device_change_cb.clone(),
}; };
let (wait_tx, wait_rx) = mpsc::channel(); let (wait_tx, wait_rx) = mpsc::channel();
ctx.remote().spawn(move |handle| { ctx.handle()
let stream = stream.into_tokio_ipc(handle).unwrap(); .spawn(futures::future::lazy(move || {
let transport = framed(stream, Default::default()); let handle = reactor::Handle::default();
rpc::bind_server(transport, server, handle); let stream = stream.into_tokio_ipc(&handle).unwrap();
wait_tx.send(()).unwrap(); let transport = framed(stream, Default::default());
Ok(()) rpc::bind_server(transport, server);
}); wait_tx.send(()).unwrap();
Ok(())
}))
.expect("Failed to spawn CallbackServer");
wait_rx.recv().unwrap(); wait_rx.recv().unwrap();
let stream = Box::into_raw(Box::new(ClientStream { let stream = Box::into_raw(Box::new(ClientStream {
context: ctx, context: ctx,
user_ptr: user_ptr, user_ptr,
token: data.token, token: data.token,
device_change_cb,
})); }));
Ok(unsafe { Stream::from_ptr(stream as *mut _) }) Ok(unsafe { Stream::from_ptr(stream as *mut _) })
} }
@ -277,13 +312,15 @@ impl<'ctx> StreamOps for ClientStream<'ctx> {
} }
} }
// TODO: How do we call this back? On what thread?
fn register_device_changed_callback( fn register_device_changed_callback(
&mut self, &mut self,
_device_changed_callback: ffi::cubeb_device_changed_callback, device_changed_callback: ffi::cubeb_device_changed_callback,
) -> Result<()> { ) -> Result<()> {
assert_not_in_callback(); assert_not_in_callback();
Ok(()) let rpc = self.context.rpc();
let enable = device_changed_callback.is_some();
*self.device_change_cb.lock().unwrap() = device_changed_callback;
send_recv!(rpc, StreamRegisterDeviceChangeCallback(self.token, enable) => StreamRegisterDeviceChangeCallback)
} }
} }
@ -294,13 +331,7 @@ pub fn init(
state_callback: ffi::cubeb_state_callback, state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void, user_ptr: *mut c_void,
) -> Result<Stream> { ) -> Result<Stream> {
let stm = try!(ClientStream::init( let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?;
ctx,
init_params,
data_callback,
state_callback,
user_ptr
));
debug_assert_eq!(stm.user_ptr(), user_ptr); debug_assert_eq!(stm.user_ptr(), user_ptr);
Ok(stm) Ok(stm)
} }

Просмотреть файл

@ -6,20 +6,17 @@ authors = [
"Dan Glastonbury <dan.glastonbury@gmail.com>" "Dan Glastonbury <dan.glastonbury@gmail.com>"
] ]
description = "Remote cubeb server" description = "Remote cubeb server"
edition = "2018"
[dependencies] [dependencies]
audio_thread_priority = "0.15.0"
audioipc = { path = "../audioipc" } audioipc = { path = "../audioipc" }
cubeb-core = "0.5.4" cubeb-core = "0.5.5"
bytes = "0.4" futures = "0.1.18"
lazycell = "^0.4" lazy_static = "1.2.0"
libc = "0.2"
log = "0.4" log = "0.4"
slab = "0.3.0" slab = "0.3.0"
futures = "0.1.18" tokio = "0.1"
tokio-core = "0.1"
tokio-uds = "0.1.7"
audio_thread_priority = "0.15.0"
lazy_static = "1.2.0"
[dependencies.error-chain] [dependencies.error-chain]
version = "0.11.0" version = "0.11.0"

Просмотреть файл

@ -2,26 +2,16 @@
// //
// This program is made available under an ISC-style license. See the // This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details // accompanying file LICENSE for details
#![warn(unused_extern_crates)]
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate audioipc;
extern crate bytes;
extern crate cubeb_core as cubeb;
extern crate futures;
extern crate lazycell;
extern crate libc;
extern crate slab;
extern crate tokio_core;
extern crate tokio_uds;
extern crate audio_thread_priority;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
use audio_thread_priority::promote_current_thread_to_real_time;
use audioipc::core; use audioipc::core;
use audioipc::platformhandle_passing::framed_with_platformhandles; use audioipc::platformhandle_passing::framed_with_platformhandles;
use audioipc::rpc; use audioipc::rpc;
@ -29,11 +19,11 @@ use audioipc::{MessageStream, PlatformHandle, PlatformHandleType};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::Future; use futures::Future;
use std::error::Error; use std::error::Error;
use std::ffi::{CStr, CString};
use std::os::raw::c_void; use std::os::raw::c_void;
use std::ptr; use std::ptr;
use audio_thread_priority::promote_current_thread_to_real_time;
use std::ffi::{CStr, CString};
use std::sync::Mutex; use std::sync::Mutex;
use tokio::reactor;
mod server; mod server;
@ -56,14 +46,14 @@ pub mod errors {
AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind); AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
} }
foreign_links { foreign_links {
Cubeb(::cubeb::Error); Cubeb(cubeb_core::Error);
Io(::std::io::Error); Io(::std::io::Error);
Canceled(::futures::sync::oneshot::Canceled); Canceled(::futures::sync::oneshot::Canceled);
} }
} }
} }
use errors::*; use crate::errors::*;
struct ServerWrapper { struct ServerWrapper {
core_thread: core::CoreThread, core_thread: core::CoreThread,
@ -73,50 +63,49 @@ struct ServerWrapper {
fn run() -> Result<ServerWrapper> { fn run() -> Result<ServerWrapper> {
trace!("Starting up cubeb audio server event loop thread..."); trace!("Starting up cubeb audio server event loop thread...");
let callback_thread = try!( let callback_thread = core::spawn_thread("AudioIPC Callback RPC", || {
core::spawn_thread("AudioIPC Callback RPC", || { match promote_current_thread_to_real_time(0, 48000) {
match promote_current_thread_to_real_time(0, 48000) { Ok(_) => {}
Ok(_) => { } Err(_) => {
Err(_) => { debug!("Failed to promote audio callback thread to real-time.");
debug!("Failed to promote audio callback thread to real-time.");
}
} }
trace!("Starting up cubeb audio callback event loop thread..."); }
Ok(()) trace!("Starting up cubeb audio callback event loop thread...");
}).or_else(|e| { Ok(())
debug!( })
"Failed to start cubeb audio callback event loop thread: {:?}", .or_else(|e| {
e.description() debug!(
); "Failed to start cubeb audio callback event loop thread: {:?}",
Err(e) e.description()
}) );
); Err(e)
})?;
let core_thread = try!( let core_thread = core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| {
core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| { debug!(
debug!( "Failed to cubeb audio core event loop thread: {:?}",
"Failed to cubeb audio core event loop thread: {:?}", e.description()
e.description() );
); Err(e)
Err(e) })?;
})
);
Ok(ServerWrapper { Ok(ServerWrapper {
core_thread: core_thread, core_thread,
callback_thread: callback_thread, callback_thread,
}) })
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn audioipc_server_start(context_name: *const std::os::raw::c_char, pub unsafe extern "C" fn audioipc_server_start(
backend_name: *const std::os::raw::c_char) -> *mut c_void { context_name: *const std::os::raw::c_char,
backend_name: *const std::os::raw::c_char,
) -> *mut c_void {
let mut params = G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); let mut params = G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
if !context_name.is_null() { if !context_name.is_null() {
params.context_name = unsafe { CStr::from_ptr(context_name) }.to_owned(); params.context_name = CStr::from_ptr(context_name).to_owned();
} }
if !backend_name.is_null() { if !backend_name.is_null() {
let backend_string = unsafe { CStr::from_ptr(backend_name) }.to_owned(); let backend_string = CStr::from_ptr(backend_name).to_owned();
params.backend_name = Some(backend_string); params.backend_name = Some(backend_string);
} }
match run() { match run() {
@ -130,7 +119,7 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
let (wait_tx, wait_rx) = oneshot::channel(); let (wait_tx, wait_rx) = oneshot::channel();
let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) }; let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) };
let cb_remote = wrapper.callback_thread.remote(); let core_handle = wrapper.callback_thread.handle();
// We create a connected pair of anonymous IPC endpoints. One side // We create a connected pair of anonymous IPC endpoints. One side
// is registered with the reactor core, the other side is returned // is registered with the reactor core, the other side is returned
@ -139,22 +128,28 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
.and_then(|(sock1, sock2)| { .and_then(|(sock1, sock2)| {
// Spawn closure to run on same thread as reactor::Core // Spawn closure to run on same thread as reactor::Core
// via remote handle. // via remote handle.
wrapper.core_thread.remote().spawn(|handle| { wrapper
trace!("Incoming connection"); .core_thread
sock2.into_tokio_ipc(handle) .handle()
.spawn(futures::future::lazy(|| {
trace!("Incoming connection");
let handle = reactor::Handle::default();
sock2.into_tokio_ipc(&handle)
.and_then(|sock| { .and_then(|sock| {
let transport = framed_with_platformhandles(sock, Default::default()); let transport = framed_with_platformhandles(sock, Default::default());
rpc::bind_server(transport, server::CubebServer::new(cb_remote), handle); rpc::bind_server(transport, server::CubebServer::new(core_handle));
Ok(()) Ok(())
}).map_err(|_| ()) }).map_err(|_| ())
// Notify waiting thread that sock2 has been registered. // Notify waiting thread that sock2 has been registered.
.and_then(|_| wait_tx.send(())) .and_then(|_| wait_tx.send(()))
}); }))
.expect("Failed to spawn CubebServer");
// Wait for notification that sock2 has been registered // Wait for notification that sock2 has been registered
// with reactor::Core. // with reactor::Core.
let _ = wait_rx.wait(); let _ = wait_rx.wait();
Ok(PlatformHandle::from(sock1).as_raw()) Ok(PlatformHandle::from(sock1).as_raw())
}).unwrap_or(-1isize as PlatformHandleType) })
.unwrap_or(-1isize as PlatformHandleType)
} }
#[no_mangle] #[no_mangle]

Просмотреть файл

@ -4,19 +4,19 @@
// accompanying file LICENSE for details // accompanying file LICENSE for details
use audioipc; use audioipc;
use audioipc::{MessageStream, PlatformHandle};
use audioipc::codec::LengthDelimitedCodec; use audioipc::codec::LengthDelimitedCodec;
use audioipc::core;
use audioipc::platformhandle_passing::FramedWithPlatformHandles;
use audioipc::frame::{framed, Framed}; use audioipc::frame::{framed, Framed};
use audioipc::messages::{ use audioipc::messages::{
CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo, ServerMessage, StreamCreate, CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
StreamInitParams, StreamParams, DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamInitParams,
StreamParams,
}; };
use audioipc::platformhandle_passing::FramedWithPlatformHandles;
use audioipc::rpc; use audioipc::rpc;
use audioipc::shm::{SharedMemReader, SharedMemWriter}; use audioipc::shm::{SharedMemReader, SharedMemWriter};
use cubeb; use audioipc::{MessageStream, PlatformHandle};
use cubeb::ffi; use cubeb_core as cubeb;
use cubeb_core::ffi;
use futures::future::{self, FutureResult}; use futures::future::{self, FutureResult};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::Future; use futures::Future;
@ -26,25 +26,134 @@ use std::convert::From;
use std::ffi::CStr; use std::ffi::CStr;
use std::mem::{size_of, ManuallyDrop}; use std::mem::{size_of, ManuallyDrop};
use std::os::raw::{c_long, c_void}; use std::os::raw::{c_long, c_void};
use std::rc::Rc;
use std::{panic, slice}; use std::{panic, slice};
use tokio_core::reactor::Remote; use tokio::reactor;
use tokio::runtime::current_thread;
use errors::*; use crate::errors::*;
fn error(error: cubeb::Error) -> ClientMessage { fn error(error: cubeb::Error) -> ClientMessage {
ClientMessage::Error(error.raw_code()) ClientMessage::Error(error.raw_code())
} }
type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>; struct CubebDeviceCollectionManager {
thread_local!(static CONTEXT_KEY: ContextKey = RefCell::new(None)); servers: Vec<Rc<RefCell<CubebServerCallbacks>>>,
devtype: cubeb::DeviceType,
}
impl CubebDeviceCollectionManager {
fn new() -> CubebDeviceCollectionManager {
CubebDeviceCollectionManager {
servers: Vec::new(),
devtype: cubeb::DeviceType::empty(),
}
}
fn register(&mut self, context: &cubeb::Context, server: &Rc<RefCell<CubebServerCallbacks>>) {
if self
.servers
.iter()
.find(|s| Rc::ptr_eq(s, server))
.is_none()
{
self.servers.push(server.clone());
}
self.update(context);
}
fn unregister(&mut self, context: &cubeb::Context, server: &Rc<RefCell<CubebServerCallbacks>>) {
self.servers
.retain(|s| !(Rc::ptr_eq(&s, server) && s.borrow().devtype.is_empty()));
self.update(context);
}
fn update(&mut self, context: &cubeb::Context) {
let mut devtype = cubeb::DeviceType::empty();
for s in &self.servers {
devtype |= s.borrow().devtype;
}
for &dir in &[cubeb::DeviceType::INPUT, cubeb::DeviceType::OUTPUT] {
match (devtype.contains(dir), self.devtype.contains(dir)) {
(true, false) => self.internal_register(context, dir, true),
(false, true) => self.internal_register(context, dir, false),
_ => {}
}
}
}
fn internal_register(
&mut self,
context: &cubeb::Context,
devtype: cubeb::DeviceType,
enable: bool,
) {
let user_ptr = if enable {
self as *const CubebDeviceCollectionManager as *mut c_void
} else {
std::ptr::null_mut()
};
for &(dir, cb) in &[
(
cubeb::DeviceType::INPUT,
device_collection_changed_input_cb_c as _,
),
(
cubeb::DeviceType::OUTPUT,
device_collection_changed_output_cb_c as _,
),
] {
if devtype.contains(dir) {
assert_eq!(self.devtype.contains(dir), !enable);
unsafe {
context
.register_device_collection_changed(
dir,
if enable { Some(cb) } else { None },
user_ptr,
)
.expect("devcol register failed");
}
if enable {
self.devtype.insert(dir);
} else {
self.devtype.remove(dir);
}
}
}
}
// Warning: this is called from an internal cubeb thread, so we must not mutate unprotected shared state.
unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
self.servers.iter().for_each(|server| {
if server
.borrow()
.devtype
.contains(cubeb::DeviceType::from_bits_truncate(device_type))
{
server
.borrow_mut()
.device_collection_changed_callback(device_type)
}
});
}
}
struct CubebContextState {
context: cubeb::Result<cubeb::Context>,
manager: CubebDeviceCollectionManager,
}
type ContextKey = RefCell<Option<CubebContextState>>;
thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
fn with_local_context<T, F>(f: F) -> T fn with_local_context<T, F>(f: F) -> T
where where
F: FnOnce(&cubeb::Result<cubeb::Context>) -> T, F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,
{ {
CONTEXT_KEY.with(|k| { CONTEXT_KEY.with(|k| {
let mut context = k.borrow_mut(); let mut state = k.borrow_mut();
if context.is_none() { if state.is_none() {
let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
let context_name = Some(params.context_name.as_c_str()); let context_name = Some(params.context_name.as_c_str());
let backend_name = if let Some(ref name) = params.backend_name { let backend_name = if let Some(ref name) = params.backend_name {
@ -52,24 +161,34 @@ where
} else { } else {
None None
}; };
*context = Some(cubeb::Context::init(context_name, backend_name)); let context = cubeb::Context::init(context_name, backend_name);
let manager = CubebDeviceCollectionManager::new();
*state = Some(CubebContextState { context, manager });
} }
f(context.as_ref().unwrap()) let state = state.as_mut().unwrap();
f(&state.context, &mut state.manager)
}) })
} }
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
// The size in which the stream slab is grown. // The size in which the stream slab is grown.
const STREAM_CONN_CHUNK_SIZE: usize = 64; const STREAM_CONN_CHUNK_SIZE: usize = 64;
struct DeviceCollectionClient;
impl rpc::Client for DeviceCollectionClient {
type Request = DeviceCollectionReq;
type Response = DeviceCollectionResp;
type Transport =
Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
struct CallbackClient; struct CallbackClient;
impl rpc::Client for CallbackClient { impl rpc::Client for CallbackClient {
type Request = CallbackReq; type Request = CallbackReq;
type Response = CallbackResp; type Response = CallbackResp;
type Transport = Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>; type Transport =
Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
} }
struct ServerStreamCallbacks { struct ServerStreamCallbacks {
@ -87,17 +206,23 @@ struct ServerStreamCallbacks {
impl ServerStreamCallbacks { impl ServerStreamCallbacks {
fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize { fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize {
trace!("Stream data callback: {} {} {}", nframes, input.len(), output.len()); trace!(
"Stream data callback: {} {} {}",
nframes,
input.len(),
output.len()
);
self.input_shm.write(input).unwrap(); self.input_shm.write(input).unwrap();
let r = self let r = self
.rpc .rpc
.call(CallbackReq::Data { .call(CallbackReq::Data {
nframes: nframes, nframes,
input_frame_size: self.input_frame_size as usize, input_frame_size: self.input_frame_size as usize,
output_frame_size: self.output_frame_size as usize, output_frame_size: self.output_frame_size as usize,
}).wait(); })
.wait();
match r { match r {
Ok(CallbackResp::Data(frames)) => { Ok(CallbackResp::Data(frames)) => {
@ -121,7 +246,18 @@ impl ServerStreamCallbacks {
match r { match r {
Ok(CallbackResp::State) => {} Ok(CallbackResp::State) => {}
_ => { _ => {
debug!("Unexpected message {:?} during callback", r); debug!("Unexpected message {:?} during state callback", r);
}
}
}
fn device_change_callback(&mut self) {
trace!("Stream device change callback");
let r = self.rpc.call(CallbackReq::DeviceChange).wait();
match r {
Ok(CallbackResp::DeviceChange) => {}
_ => {
debug!("Unexpected message {:?} during device change callback", r);
} }
} }
} }
@ -143,38 +279,67 @@ impl Drop for ServerStream {
type StreamSlab = slab::Slab<ServerStream, usize>; type StreamSlab = slab::Slab<ServerStream, usize>;
struct CubebServerCallbacks {
rpc: rpc::ClientProxy<DeviceCollectionReq, DeviceCollectionResp>,
devtype: cubeb::DeviceType,
}
impl CubebServerCallbacks {
fn device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type) {
// TODO: Assert device_type is in devtype.
debug!(
"Sending device collection ({:?}) changed event",
device_type
);
let _ = self
.rpc
.call(DeviceCollectionReq::DeviceChange(device_type))
.wait();
}
}
pub struct CubebServer { pub struct CubebServer {
cb_remote: Remote, handle: current_thread::Handle,
streams: StreamSlab, streams: StreamSlab,
remote_pid: Option<u32>, remote_pid: Option<u32>,
cbs: Option<Rc<RefCell<CubebServerCallbacks>>>,
} }
impl rpc::Server for CubebServer { impl rpc::Server for CubebServer {
type Request = ServerMessage; type Request = ServerMessage;
type Response = ClientMessage; type Response = ClientMessage;
type Future = FutureResult<Self::Response, ()>; type Future = FutureResult<Self::Response, ()>;
type Transport = FramedWithPlatformHandles<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>; type Transport = FramedWithPlatformHandles<
audioipc::AsyncMessageStream,
LengthDelimitedCodec<Self::Response, Self::Request>,
>;
fn process(&mut self, req: Self::Request) -> Self::Future { fn process(&mut self, req: Self::Request) -> Self::Future {
let resp = with_local_context(|context| match *context { let resp = with_local_context(|context, manager| match *context {
Err(_) => error(cubeb::Error::error()), Err(_) => error(cubeb::Error::error()),
Ok(ref context) => self.process_msg(context, &req), Ok(ref context) => self.process_msg(context, manager, &req),
}); });
future::ok(resp) future::ok(resp)
} }
} }
impl CubebServer { impl CubebServer {
pub fn new(cb_remote: Remote) -> Self { pub fn new(handle: current_thread::Handle) -> Self {
CubebServer { CubebServer {
cb_remote: cb_remote, handle,
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE), streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
remote_pid: None, remote_pid: None,
cbs: None,
} }
} }
// Process a request coming from the client. // Process a request coming from the client.
fn process_msg(&mut self, context: &cubeb::Context, msg: &ServerMessage) -> ClientMessage { fn process_msg(
&mut self,
context: &cubeb::Context,
manager: &mut CubebDeviceCollectionManager,
msg: &ServerMessage,
) -> ClientMessage {
let resp: ClientMessage = match *msg { let resp: ClientMessage = match *msg {
ServerMessage::ClientConnect(pid) => { ServerMessage::ClientConnect(pid) => {
self.remote_pid = Some(pid); self.remote_pid = Some(pid);
@ -187,7 +352,9 @@ impl CubebServer {
ClientMessage::ClientDisconnected ClientMessage::ClientDisconnected
} }
ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(), ServerMessage::ContextGetBackendId => {
ClientMessage::ContextBackendId(context.backend_id().to_string())
}
ServerMessage::ContextGetMaxChannelCount => context ServerMessage::ContextGetMaxChannelCount => context
.max_channel_count() .max_channel_count()
@ -200,8 +367,8 @@ impl CubebServer {
let params = cubeb::StreamParamsBuilder::new() let params = cubeb::StreamParamsBuilder::new()
.format(format) .format(format)
.rate(u32::from(params.rate)) .rate(params.rate)
.channels(u32::from(params.channels)) .channels(params.channels)
.layout(layout) .layout(layout)
.take(); .take();
@ -221,7 +388,8 @@ impl CubebServer {
.map(|devices| { .map(|devices| {
let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect(); let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
ClientMessage::ContextEnumeratedDevices(v) ClientMessage::ContextEnumeratedDevices(v)
}).unwrap_or_else(error), })
.unwrap_or_else(error),
ServerMessage::StreamInit(ref params) => self ServerMessage::StreamInit(ref params) => self
.process_stream_init(context, params) .process_stream_init(context, params)
@ -279,6 +447,76 @@ impl CubebServer {
.current_device() .current_device()
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device))) .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error), .unwrap_or_else(error),
ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => self.streams
[stm_tok]
.stream
.register_device_changed_callback(if enable {
Some(device_change_cb_c)
} else {
None
})
.map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
.unwrap_or_else(error),
ServerMessage::ContextSetupDeviceCollectionCallback => {
if let Ok((stm1, stm2)) = MessageStream::anonymous_ipc_pair() {
debug!("Created device collection RPC pair: {:?}-{:?}", stm1, stm2);
// This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the
// bind_client to the callback RPC handling thread. This is
// done by spawning a future on `handle`.
let (tx, rx) = oneshot::channel();
self.handle
.spawn(futures::future::lazy(move || {
let handle = reactor::Handle::default();
let stream = stm2.into_tokio_ipc(&handle).unwrap();
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<DeviceCollectionClient>(transport);
drop(tx.send(rpc));
Ok(())
}))
.expect("Failed to spawn DeviceCollectionClient");
// TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only
// need one here. Send some dummy handles over for the other side to discard.
let (dummy1, dummy2) =
MessageStream::anonymous_ipc_pair().expect("need dummy IPC pair");
if let Ok(rpc) = rx.wait() {
self.cbs = Some(Rc::new(RefCell::new(CubebServerCallbacks {
rpc,
devtype: cubeb::DeviceType::empty(),
})));
let fds = RegisterDeviceCollectionChanged {
platform_handles: [
PlatformHandle::from(stm1),
PlatformHandle::from(dummy1),
PlatformHandle::from(dummy2),
],
target_pid: self.remote_pid.unwrap(),
};
ClientMessage::ContextSetupDeviceCollectionCallback(fds)
} else {
warn!("Failed to setup RPC client");
error(cubeb::Error::error())
}
} else {
warn!("Failed to create RPC pair");
error(cubeb::Error::error())
}
}
ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self
.process_register_device_collection_changed(
context,
manager,
cubeb::DeviceType::from_bits_truncate(device_type),
enable,
)
.unwrap_or_else(error),
}; };
trace!("process_msg: req={:?}, resp={:?}", msg, resp); trace!("process_msg: req={:?}, resp={:?}", msg, resp);
@ -286,6 +524,30 @@ impl CubebServer {
resp resp
} }
fn process_register_device_collection_changed(
&mut self,
context: &cubeb::Context,
manager: &mut CubebDeviceCollectionManager,
devtype: cubeb::DeviceType,
enable: bool,
) -> cubeb::Result<ClientMessage> {
if devtype == cubeb::DeviceType::UNKNOWN {
return Err(cubeb::Error::invalid_parameter());
}
assert!(self.cbs.is_some());
let cbs = self.cbs.as_ref().unwrap();
if enable {
cbs.borrow_mut().devtype.insert(devtype);
manager.register(context, cbs);
} else {
cbs.borrow_mut().devtype.remove(devtype);
manager.unregister(context, cbs);
}
Ok(ClientMessage::ContextRegisteredDeviceCollectionChanged)
}
// Stream init is special, so it's been separated from process_msg. // Stream init is special, so it's been separated from process_msg.
fn process_stream_init( fn process_stream_init(
&mut self, &mut self,
@ -306,7 +568,8 @@ impl CubebServer {
}; };
let channel_count = p.channels as u16; let channel_count = p.channels as u16;
sample_size * channel_count sample_size * channel_count
}).unwrap_or(0u16) })
.unwrap_or(0u16)
} }
// Create the callback handling struct which is attached the cubeb stream. // Create the callback handling struct which is attached the cubeb stream.
@ -316,28 +579,26 @@ impl CubebServer {
let (stm1, stm2) = MessageStream::anonymous_ipc_pair()?; let (stm1, stm2) = MessageStream::anonymous_ipc_pair()?;
debug!("Created callback pair: {:?}-{:?}", stm1, stm2); debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (input_shm, input_file) = let (input_shm, input_file) =
SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?; SharedMemWriter::new(&audioipc::get_shm_path("input"), audioipc::SHM_AREA_SIZE)?;
let (output_shm, output_file) = let (output_shm, output_file) =
SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?; SharedMemReader::new(&audioipc::get_shm_path("output"), audioipc::SHM_AREA_SIZE)?;
// This code is currently running on the Client/Server RPC // This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the // handling thread. We need to move the registration of the
// bind_client to the callback RPC handling thread. This is // bind_client to the callback RPC handling thread. This is
// done by spawning a future on cb_remote. // done by spawning a future on `handle`.
let id = core::handle().id();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.cb_remote.spawn(move |handle| { self.handle
// Ensure we're running on a loop different to the one .spawn(futures::future::lazy(move || {
// invoking spawn_fn. let handle = reactor::Handle::default();
assert_ne!(id, handle.id()); let stream = stm2.into_tokio_ipc(&handle).unwrap();
let stream = stm2.into_tokio_ipc(handle).unwrap(); let transport = framed(stream, Default::default());
let transport = framed(stream, Default::default()); let rpc = rpc::bind_client::<CallbackClient>(transport);
let rpc = rpc::bind_client::<CallbackClient>(transport, handle); drop(tx.send(rpc));
drop(tx.send(rpc)); Ok(())
Ok(()) }))
}); .expect("Failed to spawn CallbackClient");
let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() { let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
Ok(rpc) => rpc, Ok(rpc) => rpc,
@ -384,7 +645,8 @@ impl CubebServer {
Some(data_cb_c), Some(data_cb_c),
Some(state_cb_c), Some(state_cb_c),
user_ptr, user_ptr,
).and_then(|stream| { )
.and_then(|stream| {
if !self.streams.has_available() { if !self.streams.has_available() {
trace!( trace!(
"server connection ran out of stream slots. reserving {} more.", "server connection ran out of stream slots. reserving {} more.",
@ -401,7 +663,8 @@ impl CubebServer {
.insert(ServerStream { .insert(ServerStream {
stream: ManuallyDrop::new(stream), stream: ManuallyDrop::new(stream),
cbs: ManuallyDrop::new(cbs), cbs: ManuallyDrop::new(cbs),
}).index() })
.index()
} }
None => { None => {
// TODO: Turn into error // TODO: Turn into error
@ -416,9 +679,10 @@ impl CubebServer {
PlatformHandle::from(input_file), PlatformHandle::from(input_file),
PlatformHandle::from(output_file), PlatformHandle::from(output_file),
], ],
target_pid: self.remote_pid.unwrap() target_pid: self.remote_pid.unwrap(),
})) }))
}).map_err(|e| e.into()) })
.map_err(|e| e.into())
} }
} }
} }
@ -436,13 +700,13 @@ unsafe extern "C" fn data_cb_c(
let input = if input_buffer.is_null() { let input = if input_buffer.is_null() {
&[] &[]
} else { } else {
let nbytes = nframes * cbs.input_frame_size as c_long; let nbytes = nframes * c_long::from(cbs.input_frame_size);
slice::from_raw_parts(input_buffer as *const u8, nbytes as usize) slice::from_raw_parts(input_buffer as *const u8, nbytes as usize)
}; };
let output: &mut [u8] = if output_buffer.is_null() { let output: &mut [u8] = if output_buffer.is_null() {
&mut [] &mut []
} else { } else {
let nbytes = nframes * cbs.output_frame_size as c_long; let nbytes = nframes * c_long::from(cbs.output_frame_size);
slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize) slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize)
}; };
cbs.data_callback(input, output, nframes as isize) as c_long cbs.data_callback(input, output, nframes as isize) as c_long
@ -462,3 +726,33 @@ unsafe extern "C" fn state_cb_c(
}); });
ok.expect("State callback panicked"); ok.expect("State callback panicked");
} }
unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) {
let ok = panic::catch_unwind(|| {
let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
cbs.device_change_callback();
});
ok.expect("Device change callback panicked");
}
unsafe extern "C" fn device_collection_changed_input_cb_c(
_: *mut ffi::cubeb,
user_ptr: *mut c_void,
) {
let ok = panic::catch_unwind(|| {
let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT);
});
ok.expect("Collection changed (input) callback panicked");
}
unsafe extern "C" fn device_collection_changed_output_cb_c(
_: *mut ffi::cubeb,
user_ptr: *mut c_void,
) {
let ok = panic::catch_unwind(|| {
let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT);
});
ok.expect("Collection changed (output) callback panicked");
}