Bug 1403048 - Update media/audioipc to b5559d28. r=kamidphish

--HG--
extra : rebase_source : aeb3f0b9a45729fa21248983d34ce956e98a9b79
This commit is contained in:
Matthew Gregan 2017-09-26 15:49:26 +13:00
Родитель 9d9610f6a3
Коммит 6e690e73d3
15 изменённых файлов: 1266 добавлений и 551 удалений

Просмотреть файл

@ -8,13 +8,14 @@ authors = [
description = "Remote Cubeb IPC" description = "Remote Cubeb IPC"
[dependencies] [dependencies]
cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
bincode = "0.8"
bytes = "0.4"
error-chain = "0.10.0" error-chain = "0.10.0"
libc = "0.2"
log = "^0.3.6" log = "^0.3.6"
memmap = "0.5.2"
mio = "0.6.7"
mio-uds = "0.6.4"
serde = "1.*.*" serde = "1.*.*"
serde_derive = "1.*.*" serde_derive = "1.*.*"
bincode = "0.8"
libc = "0.2"
mio = "0.6.7"
cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
byteorder = "1"
memmap = "0.5.2"

Просмотреть файл

@ -0,0 +1,153 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
//! Various async helpers modelled after futures-rs and tokio-io.
use {RecvFd, SendFd};
use bytes::{Buf, BufMut};
use mio_uds;
use std::io as std_io;
use std::os::unix::io::RawFd;
use std::os::unix::net;
/// A convenience macro for working with `io::Result<T>` from the
/// `std::io::Read` and `std::io::Write` traits.
///
/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
/// the input type is of the `Err` variant, then `Async::NotReady` is returned if
/// it indicates `WouldBlock` or otherwise `Err` is returned.
#[macro_export]
macro_rules! try_nb {
($e:expr) => (match $e {
Ok(t) => t,
Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
return Ok(Async::NotReady)
}
Err(e) => return Err(e.into()),
})
}
/////////////////////////////////////////////////////////////////////////////////////////
// Async support - Handle EWOULDBLOCK/EAGAIN from non-blocking I/O operations.
/// Return type for async methods, indicates whether the operation was
/// ready or not.
///
/// * `Ok(Async::Ready(t))` means that the operation has completed successfully.
/// * `Ok(Async::NotReady)` means that the underlying system is not ready to handle operation.
/// * `Err(e)` means that the operation has completed with the given error `e`.
pub type AsyncResult<T, E> = Result<Async<T>, E>;
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum Async<T> {
/// Represents that a value is immediately ready.
Ready(T),
/// Represents that a value is not ready yet, but may be so later.
NotReady
}
impl<T> Async<T> {
pub fn is_ready(&self) -> bool {
match *self {
Async::Ready(_) => true,
Async::NotReady => false,
}
}
pub fn is_not_ready(&self) -> bool {
!self.is_ready()
}
}
/// Return type for an async attempt to send a value.
///
/// * `Ok(AsyncSend::Ready)` means that the operation has completed successfully.
/// * `Ok(AsyncSend::NotReady(t))` means that the underlying system is not ready to handle
/// send. returns the value that tried to be sent in `t`.
/// * `Err(e)` means that operation has completed with the given error `e`.
pub type AsyncSendResult<T, E> = Result<AsyncSend<T>, E>;
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum AsyncSend<T> {
Ready,
NotReady(T)
}
pub trait AsyncRecvFd: RecvFd {
unsafe fn prepare_uninitialized_buffer(&self, bytes: &mut [u8]) -> bool {
for byte in bytes.iter_mut() {
*byte = 0;
}
true
}
/// Pull some bytes from this source into the specified `Buf`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
fn recv_buf_fd<B>(&mut self, buf: &mut B) -> AsyncResult<(usize, Option<RawFd>), std_io::Error>
where
Self: Sized,
B: BufMut,
{
if !buf.has_remaining_mut() {
return Ok(Async::Ready((0, None)));
}
unsafe {
let (n, fd) = {
let bytes = buf.bytes_mut();
self.prepare_uninitialized_buffer(bytes);
try_nb!(self.recv_fd(bytes))
};
buf.advance_mut(n);
Ok(Async::Ready((n, fd)))
}
}
}
impl AsyncRecvFd for net::UnixStream {}
impl AsyncRecvFd for mio_uds::UnixStream {}
/// A trait for writable objects which operated in an async fashion.
///
/// This trait inherits from `std::io::Write` and indicates that an I/O object is
/// **nonblocking**, meaning that it will return an error instead of blocking
/// when bytes cannot currently be written, but hasn't closed. Specifically
/// this means that the `write` function for types that implement this trait
/// can have a few return values:
///
/// * `Ok(n)` means that `n` bytes of data was immediately written .
/// * `Err(e) if e.kind() == ErrorKind::WouldBlock` means that no data was
/// written from the buffer provided. The I/O object is not currently
/// writable but may become writable in the future.
/// * `Err(e)` for other errors are standard I/O errors coming from the
/// underlying object.
pub trait AsyncSendFd: SendFd {
/// Write a `Buf` into this value, returning how many bytes were written.
///
/// Note that this method will advance the `buf` provided automatically by
/// the number of bytes written.
fn send_buf_fd<B>(&mut self, buf: &mut B, fd: Option<RawFd>) -> AsyncResult<usize, std_io::Error>
where
Self: Sized,
B: Buf,
{
if !buf.has_remaining() {
return Ok(Async::Ready(0));
}
let n = try_nb!(self.send_fd(buf.bytes(), fd));
buf.advance(n);
Ok(Async::Ready(n))
}
}
impl AsyncSendFd for net::UnixStream {}
impl AsyncSendFd for mio_uds::UnixStream {}

Просмотреть файл

@ -0,0 +1,141 @@
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.
use bincode::{self, Bounded, deserialize, serialize_into, serialized_size};
use bytes::{Buf, BufMut, BytesMut, LittleEndian};
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use std::io as std_io;
use std::io::Cursor;
use std::mem;
////////////////////////////////////////////////////////////////////////////////
// Split buffer into size delimited frames - This appears more complicated than
// might be necessary due to handling the possibility of messages being split
// across reads.
#[derive(Debug)]
enum FrameState {
Head,
Data(usize)
}
#[derive(Debug)]
pub struct Decoder {
state: FrameState
}
impl Decoder {
pub fn new() -> Self {
Decoder {
state: FrameState::Head
}
}
fn decode_head(&mut self, src: &mut BytesMut) -> std_io::Result<Option<usize>> {
let head_size = mem::size_of::<u16>();
if src.len() < head_size {
// Not enough data
return Ok(None);
}
let n = {
let mut src = Cursor::new(&mut *src);
// match endianess
let n = src.get_uint::<LittleEndian>(head_size);
if n > u64::from(u16::max_value()) {
return Err(std_io::Error::new(
std_io::ErrorKind::InvalidData,
"frame size too big"
));
}
// The check above ensures there is no overflow
n as usize
};
// Consume the length field
let _ = src.split_to(head_size);
Ok(Some(n))
}
fn decode_data(&self, n: usize, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
// At this point, the buffer has already had the required capacity
// reserved. All there is to do is read.
if src.len() < n {
return Ok(None);
}
Ok(Some(src.split_to(n)))
}
pub fn split_frame(&mut self, src: &mut BytesMut) -> std_io::Result<Option<BytesMut>> {
let n = match self.state {
FrameState::Head => {
match try!(self.decode_head(src)) {
Some(n) => {
self.state = FrameState::Data(n);
// Ensure that the buffer has enough space to read the
// incoming payload
src.reserve(n);
n
},
None => return Ok(None),
}
},
FrameState::Data(n) => n,
};
match try!(self.decode_data(n, src)) {
Some(data) => {
// Update the decode state
self.state = FrameState::Head;
// Make sure the buffer has enough space to read the next head
src.reserve(mem::size_of::<u16>());
Ok(Some(data))
},
None => Ok(None),
}
}
pub fn decode<ITEM: DeserializeOwned>(&mut self, src: &mut BytesMut) -> Result<Option<ITEM>, bincode::Error> {
match try!(self.split_frame(src)) {
Some(buf) => deserialize::<ITEM>(buf.as_ref()).and_then(|t| Ok(Some(t))),
None => Ok(None),
}
}
}
impl Default for Decoder {
fn default() -> Self {
Self::new()
}
}
pub fn encode<ITEM: Serialize>(dst: &mut BytesMut, item: &ITEM) -> Result<(), bincode::Error> {
let head_len = mem::size_of::<u16>() as u64;
let item_len = serialized_size(item);
if head_len + item_len > u64::from(u16::max_value()) {
return Err(Box::new(bincode::ErrorKind::IoError(std_io::Error::new(
std_io::ErrorKind::InvalidInput,
"frame too big"
))));
}
let n = (head_len + item_len) as usize;
dst.reserve(n);
dst.put_u16::<LittleEndian>(item_len as u16);
serialize_into(&mut dst.writer(), item, Bounded(item_len))
}

Просмотреть файл

@ -1,26 +1,19 @@
use bincode::{self, deserialize, serialize}; use {AutoCloseFd, RecvFd, SendFd};
use async::{Async, AsyncRecvFd};
use bytes::{BufMut, BytesMut};
use codec::{Decoder, encode};
use errors::*; use errors::*;
use msg;
use mio::{Poll, PollOpt, Ready, Token}; use mio::{Poll, PollOpt, Ready, Token};
use mio::event::Evented; use mio::event::Evented;
use mio::unix::EventedFd; use mio::unix::EventedFd;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::ser::Serialize; use serde::ser::Serialize;
use std::collections::VecDeque;
use std::fmt::Debug; use std::fmt::Debug;
use std::io::{self, Read}; use std::io::{self, Read};
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net; use std::os::unix::net;
use std::os::unix::prelude::*; use std::os::unix::prelude::*;
use libc;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
pub trait RecvFd {
fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
}
pub trait SendFd {
fn send_fd<FD: IntoRawFd>(&mut self, bytes: &[u8], fd: Option<FD>) -> io::Result<(usize)>;
}
// Because of the trait implementation rules in Rust, this needs to be // Because of the trait implementation rules in Rust, this needs to be
// a wrapper class to allow implementation of a trait from another // a wrapper class to allow implementation of a trait from another
@ -30,14 +23,23 @@ pub trait SendFd {
#[derive(Debug)] #[derive(Debug)]
pub struct Connection { pub struct Connection {
stream: net::UnixStream stream: net::UnixStream,
recv_buffer: BytesMut,
recv_fd: VecDeque<AutoCloseFd>,
send_buffer: BytesMut,
decoder: Decoder
} }
impl Connection { impl Connection {
pub fn new(stream: net::UnixStream) -> Connection { pub fn new(stream: net::UnixStream) -> Connection {
info!("Create new connection"); info!("Create new connection");
stream.set_nonblocking(false).unwrap();
Connection { Connection {
stream: stream stream: stream,
recv_buffer: BytesMut::with_capacity(1024),
recv_fd: VecDeque::new(),
send_buffer: BytesMut::with_capacity(1024),
decoder: Decoder::new()
} }
} }
@ -60,49 +62,72 @@ impl Connection {
/// ``` /// ```
pub fn pair() -> io::Result<(Connection, Connection)> { pub fn pair() -> io::Result<(Connection, Connection)> {
let (s1, s2) = net::UnixStream::pair()?; let (s1, s2) = net::UnixStream::pair()?;
Ok(( Ok((Connection::new(s1), Connection::new(s2)))
Connection { }
stream: s1
}, pub fn take_fd(&mut self) -> Option<RawFd> {
Connection { self.recv_fd.pop_front().map(|fd| fd.into_raw_fd())
stream: s2
}
))
} }
pub fn receive<RT>(&mut self) -> Result<RT> pub fn receive<RT>(&mut self) -> Result<RT>
where where
RT: DeserializeOwned + Debug, RT: DeserializeOwned + Debug,
{ {
match self.receive_with_fd() { self.receive_with_fd()
Ok((r, None)) => Ok(r),
Ok((_, Some(_))) => panic!("unexpected fd received"),
Err(e) => Err(e),
}
} }
pub fn receive_with_fd<RT>(&mut self) -> Result<(RT, Option<RawFd>)> pub fn receive_with_fd<RT>(&mut self) -> Result<RT>
where where
RT: DeserializeOwned + Debug, RT: DeserializeOwned + Debug,
{ {
// TODO: Check deserialize_from and serialize_into. trace!("received_with_fd...");
let mut encoded = vec![0; 32 * 1024]; // TODO: Get max size from bincode, or at least assert. loop {
// TODO: Read until block, EOF, or error. trace!(" recv_buffer = {:?}", self.recv_buffer);
// TODO: Switch back to recv_fd. if !self.recv_buffer.is_empty() {
match self.stream.recv_fd(&mut encoded) { let r = self.decoder.decode(&mut self.recv_buffer);
Ok((0, _)) => Err(ErrorKind::Disconnected.into()), trace!("receive {:?}", r);
// TODO: Handle partial read?
Ok((n, fd)) => {
let r = deserialize(&encoded[..n]);
debug!("receive {:?}", r);
match r { match r {
Ok(r) => Ok((r, fd)), Ok(Some(r)) => return Ok(r),
Err(e) => Err(e).chain_err(|| "Failed to deserialize message"), Ok(None) => {
/* Buffer doesn't contain enough data for a complete
* message, so need to enter recv_buf_fd to get more. */
},
Err(e) => return Err(e).chain_err(|| "Failed to deserialize message"),
} }
}, }
// TODO: Handle dropped message.
// Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"), // Otherwise, try to read more data and try again. Make sure we've
_ => bail!("socket write"), // got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
// The decoder.decode should have reserved an amount for
// the next bit it needs to read. Check that we reserved
// enough space for, at least the 2 byte size prefix.
assert!(self.recv_buffer.remaining_mut() > 2);
// TODO: Read until block, EOF, or error.
// TODO: Switch back to recv_fd.
match self.stream.recv_buf_fd(&mut self.recv_buffer) {
Ok(Async::Ready((0, _))) => return Err(ErrorKind::Disconnected.into()),
// TODO: Handle partial read?
Ok(Async::Ready((_, fd))) => {
trace!(
" recv_buf_fd: recv_buffer: {:?}, recv_fd: {:?}, fd: {:?}",
self.recv_buffer,
self.recv_fd,
fd
);
if let Some(fd) = fd {
self.recv_fd.push_back(
unsafe { AutoCloseFd::from_raw_fd(fd) }
);
}
},
Ok(Async::NotReady) => bail!("Socket should be blocking."),
// TODO: Handle dropped message.
// Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => panic!("wouldblock"),
_ => bail!("socket write"),
}
} }
} }
@ -118,9 +143,11 @@ impl Connection {
ST: Serialize + Debug, ST: Serialize + Debug,
FD: IntoRawFd + Debug, FD: IntoRawFd + Debug,
{ {
let encoded: Vec<u8> = serialize(&msg, bincode::Infinite)?; trace!("send_with_fd {:?}, {:?}", msg, fd_to_send);
info!("send_with_fd {:?}, {:?}", msg, fd_to_send); try!(encode(&mut self.send_buffer, &msg));
self.stream.send_fd(&encoded, fd_to_send).chain_err( let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
let send = self.send_buffer.take().freeze();
self.stream.send_fd(send.as_ref(), fd_to_send).chain_err(
|| "Failed to send message with fd" || "Failed to send message with fd"
) )
} }
@ -153,14 +180,6 @@ impl<'a> Read for &'a Connection {
} }
} }
impl RecvFd for net::UnixStream {
fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
let length = self.read_u32::<LittleEndian>()?;
msg::recvmsg(self.as_raw_fd(), &mut buf_to_recv[..length as usize])
}
}
impl RecvFd for Connection { impl RecvFd for Connection {
fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> { fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
self.stream.recv_fd(buf_to_recv) self.stream.recv_fd(buf_to_recv)
@ -169,9 +188,7 @@ impl RecvFd for Connection {
impl FromRawFd for Connection { impl FromRawFd for Connection {
unsafe fn from_raw_fd(fd: RawFd) -> Connection { unsafe fn from_raw_fd(fd: RawFd) -> Connection {
Connection { Connection::new(net::UnixStream::from_raw_fd(fd))
stream: net::UnixStream::from_raw_fd(fd)
}
} }
} }
@ -181,19 +198,8 @@ impl IntoRawFd for Connection {
} }
} }
impl SendFd for net::UnixStream {
fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> {
self.write_u32::<LittleEndian>(buf_to_send.len() as u32)?;
let fd_to_send = fd_to_send.map(|fd| fd.into_raw_fd());
let r = msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send);
fd_to_send.map(|fd| unsafe { libc::close(fd) });
r
}
}
impl SendFd for Connection { impl SendFd for Connection {
fn send_fd<FD: IntoRawFd>(&mut self, buf_to_send: &[u8], fd_to_send: Option<FD>) -> io::Result<usize> { fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
self.stream.send_fd(buf_to_send, fd_to_send) self.stream.send_fd(buf_to_send, fd_to_send)
} }
} }

Просмотреть файл

@ -13,18 +13,18 @@ extern crate log;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
extern crate serde;
extern crate bincode; extern crate bincode;
extern crate bytes;
extern crate mio;
extern crate cubeb_core; extern crate cubeb_core;
extern crate libc; extern crate libc;
extern crate byteorder;
extern crate memmap; extern crate memmap;
extern crate mio;
extern crate mio_uds;
extern crate serde;
pub mod async;
pub mod codec;
mod connection; mod connection;
pub mod errors; pub mod errors;
pub mod messages; pub mod messages;
@ -33,9 +33,89 @@ pub mod shm;
pub use connection::*; pub use connection::*;
pub use messages::{ClientMessage, ServerMessage}; pub use messages::{ClientMessage, ServerMessage};
use std::env::temp_dir; use std::env::temp_dir;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::net;
use std::path::PathBuf; use std::path::PathBuf;
// Extend sys::os::unix::net::UnixStream to support sending and receiving a single file desc.
// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
// UnixStream type.
pub trait RecvFd {
fn recv_fd(&mut self, bytes: &mut [u8]) -> io::Result<(usize, Option<RawFd>)>;
}
pub trait SendFd {
fn send_fd(&mut self, bytes: &[u8], fd: Option<RawFd>) -> io::Result<(usize)>;
}
impl RecvFd for net::UnixStream {
fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
msg::recvmsg(self.as_raw_fd(), buf_to_recv)
}
}
impl RecvFd for mio_uds::UnixStream {
fn recv_fd(&mut self, buf_to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
msg::recvmsg(self.as_raw_fd(), buf_to_recv)
}
}
impl SendFd for net::UnixStream {
fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
}
}
impl SendFd for mio_uds::UnixStream {
fn send_fd(&mut self, buf_to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
msg::sendmsg(self.as_raw_fd(), buf_to_send, fd_to_send)
}
}
////////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct AutoCloseFd(RawFd);
impl Drop for AutoCloseFd {
fn drop(&mut self) {
unsafe {
libc::close(self.0);
}
}
}
impl FromRawFd for AutoCloseFd {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
AutoCloseFd(fd)
}
}
impl IntoRawFd for AutoCloseFd {
fn into_raw_fd(self) -> RawFd {
let fd = self.0;
::std::mem::forget(self);
fd
}
}
impl AsRawFd for AutoCloseFd {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
impl<'a> AsRawFd for &'a AutoCloseFd {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}
////////////////////////////////////////////////////////////////////////////////
fn get_temp_path(name: &str) -> PathBuf { fn get_temp_path(name: &str) -> PathBuf {
let mut path = temp_dir(); let mut path = temp_dir();
path.push(name); path.push(name);

Просмотреть файл

@ -126,7 +126,7 @@ pub struct StreamParams {
impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams { impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
fn from(params: &'a ffi::cubeb_stream_params) -> Self { fn from(params: &'a ffi::cubeb_stream_params) -> Self {
assert!(params.channels <= u8::max_value() as u32); assert!(params.channels <= u32::from(u8::max_value()));
StreamParams { StreamParams {
format: params.format, format: params.format,
@ -141,8 +141,8 @@ impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
fn from(params: &StreamParams) -> Self { fn from(params: &StreamParams) -> Self {
ffi::cubeb_stream_params { ffi::cubeb_stream_params {
format: params.format, format: params.format,
rate: params.rate as u32, rate: u32::from(params.rate),
channels: params.channels as u32, channels: u32::from(params.channels),
layout: params.layout layout: params.layout
} }
} }

Просмотреть файл

@ -1,27 +1,36 @@
use libc; use libc;
use std::io; use std::io;
use std::mem; use std::mem;
use std::ptr;
use std::os::unix::io::RawFd; use std::os::unix::io::RawFd;
use std; use std::ptr;
// Note: The following fields must be laid out together, the OS expects them // Note: The following fields must be laid out together, the OS expects them
// to be part of a single allocation. // to be part of a single allocation.
#[repr(C)] #[repr(C)]
struct CmsgSpace { struct CmsgSpace {
cmsghdr: libc::cmsghdr, cmsghdr: libc::cmsghdr,
#[cfg(not(target_os = "macos"))]
__padding: [usize; 0],
data: libc::c_int, data: libc::c_int,
} }
unsafe fn sendmsg_retry(fd: libc::c_int, msg: *const libc::msghdr, flags: libc::c_int) -> libc::ssize_t { #[cfg(not(target_os = "macos"))]
loop { fn cmsg_align(len: usize) -> usize {
let r = libc::sendmsg(fd, msg, flags); let align_bytes = mem::size_of::<usize>() - 1;
if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN { (len + align_bytes) & !align_bytes
std::thread::yield_now(); }
continue;
} #[cfg(target_os = "macos")]
return r; fn cmsg_align(len: usize) -> usize {
} len
}
fn cmsg_space() -> usize {
mem::size_of::<CmsgSpace>()
}
fn cmsg_len() -> usize {
cmsg_align(mem::size_of::<libc::cmsghdr>()) + mem::size_of::<libc::c_int>()
} }
pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> { pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Result<usize> {
@ -33,7 +42,7 @@ pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Resu
msghdr.msg_iovlen = 1; msghdr.msg_iovlen = 1;
if fd_to_send.is_some() { if fd_to_send.is_some() {
msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _; msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _; msghdr.msg_controllen = cmsg_space() as _;
} }
iovec.iov_base = if to_send.is_empty() { iovec.iov_base = if to_send.is_empty() {
@ -44,13 +53,13 @@ pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Resu
}; };
iovec.iov_len = to_send.len(); iovec.iov_len = to_send.len();
cmsg.cmsghdr.cmsg_len = msghdr.msg_controllen; cmsg.cmsghdr.cmsg_len = cmsg_len() as _;
cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET; cmsg.cmsghdr.cmsg_level = libc::SOL_SOCKET;
cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS; cmsg.cmsghdr.cmsg_type = libc::SCM_RIGHTS;
cmsg.data = fd_to_send.unwrap_or(-1); cmsg.data = fd_to_send.unwrap_or(-1);
let result = unsafe { sendmsg_retry(fd, &msghdr, 0) }; let result = unsafe { libc::sendmsg(fd, &msghdr, 0) };
if result >= 0 { if result >= 0 {
Ok(result as usize) Ok(result as usize)
} else { } else {
@ -58,17 +67,6 @@ pub fn sendmsg(fd: RawFd, to_send: &[u8], fd_to_send: Option<RawFd>) -> io::Resu
} }
} }
unsafe fn recvmsg_retry(fd: libc::c_int, msg: *mut libc::msghdr, flags: libc::c_int) -> libc::ssize_t {
loop {
let r = libc::recvmsg(fd, msg, flags);
if r == -1 && io::Error::last_os_error().raw_os_error().unwrap() == libc::EAGAIN {
std::thread::yield_now();
continue;
}
return r;
}
}
pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> { pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd>)> {
let mut msghdr: libc::msghdr = unsafe { mem::zeroed() }; let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
let mut iovec: libc::iovec = unsafe { mem::zeroed() }; let mut iovec: libc::iovec = unsafe { mem::zeroed() };
@ -77,7 +75,7 @@ pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd
msghdr.msg_iov = &mut iovec as *mut _; msghdr.msg_iov = &mut iovec as *mut _;
msghdr.msg_iovlen = 1; msghdr.msg_iovlen = 1;
msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _; msghdr.msg_control = &mut cmsg.cmsghdr as *mut _ as *mut _;
msghdr.msg_controllen = mem::size_of::<CmsgSpace>() as _; msghdr.msg_controllen = cmsg_space() as _;
iovec.iov_base = if to_recv.is_empty() { iovec.iov_base = if to_recv.is_empty() {
// Empty Vecs have a non-null pointer. // Empty Vecs have a non-null pointer.
@ -87,10 +85,10 @@ pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd
}; };
iovec.iov_len = to_recv.len(); iovec.iov_len = to_recv.len();
let result = unsafe { recvmsg_retry(fd, &mut msghdr, 0) }; let result = unsafe { libc::recvmsg(fd, &mut msghdr, 0) };
if result >= 0 { if result >= 0 {
let fd = if msghdr.msg_controllen == mem::size_of::<CmsgSpace>() as _ && let fd = if msghdr.msg_controllen == cmsg_space() as _ &&
cmsg.cmsghdr.cmsg_len == mem::size_of::<CmsgSpace>() as _ && cmsg.cmsghdr.cmsg_len == cmsg_len() as _ &&
cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET && cmsg.cmsghdr.cmsg_level == libc::SOL_SOCKET &&
cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS { cmsg.cmsghdr.cmsg_type == libc::SCM_RIGHTS {
Some(cmsg.data) Some(cmsg.data)
@ -103,3 +101,61 @@ pub fn recvmsg(fd: RawFd, to_recv: &mut [u8]) -> io::Result<(usize, Option<RawFd
Err(io::Error::last_os_error()) Err(io::Error::last_os_error())
} }
} }
#[cfg(test)]
mod tests {
use libc;
use std::mem;
use std::os::unix::net::UnixStream;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::io::{Read, Write};
use super::{cmsg_len, cmsg_space, sendmsg, recvmsg};
#[test]
fn portable_sizes() {
if cfg!(all(target_os = "linux", target_pointer_width = "64")) {
assert_eq!(mem::size_of::<libc::cmsghdr>(), 16);
assert_eq!(cmsg_len(), 20);
assert_eq!(cmsg_space(), 24);
} else if cfg!(all(target_os = "linux", target_pointer_width = "32")) {
assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
assert_eq!(cmsg_len(), 16);
assert_eq!(cmsg_space(), 16);
} else if cfg!(target_os = "macos") {
assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
assert_eq!(cmsg_len(), 16);
assert_eq!(cmsg_space(), 16);
} else if cfg!(target_pointer_width = "64") {
assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
assert_eq!(cmsg_len(), 20);
assert_eq!(cmsg_space(), 24);
} else {
assert_eq!(mem::size_of::<libc::cmsghdr>(), 12);
assert_eq!(cmsg_len(), 16);
assert_eq!(cmsg_space(), 16);
}
}
#[test]
fn fd_passing() {
let (tx, rx) = UnixStream::pair().unwrap();
let (send_tx, mut send_rx) = UnixStream::pair().unwrap();
let fd = send_tx.into_raw_fd();
assert_eq!(sendmsg(tx.as_raw_fd(), b"a", Some(fd)).unwrap(), 1);
unsafe { libc::close(fd) };
let mut buf = [0u8];
let (got, fd) = recvmsg(rx.as_raw_fd(), &mut buf).unwrap();
assert_eq!(got, 1);
assert_eq!(&buf, b"a");
let mut send_tx = unsafe { UnixStream::from_raw_fd(fd.unwrap()) };
assert_eq!(send_tx.write(b"b").unwrap(), 1);
let mut buf = [0u8];
assert_eq!(send_rx.read(&mut buf).unwrap(), 1);
assert_eq!(&buf, b"b");
}
}

Просмотреть файл

@ -6,6 +6,6 @@ description = "Cubeb Backend for talking to remote cubeb server."
[dependencies] [dependencies]
audioipc = { path="../audioipc" } audioipc = { path="../audioipc" }
cubeb-core = { path="../../cubeb-rs/cubeb-core" }
cubeb-backend = { path="../../cubeb-rs/cubeb-backend" } cubeb-backend = { path="../../cubeb-rs/cubeb-backend" }
cubeb-core = { path="../../cubeb-rs/cubeb-core" }
log = "^0.3.6" log = "^0.3.6"

Просмотреть файл

@ -4,9 +4,10 @@
// accompanying file LICENSE for details // accompanying file LICENSE for details
use ClientStream; use ClientStream;
use assert_not_in_callback;
use audioipc::{self, ClientMessage, Connection, ServerMessage, messages}; use audioipc::{self, ClientMessage, Connection, ServerMessage, messages};
use cubeb_backend::{Context, Ops}; use cubeb_backend::{Context, Ops};
use cubeb_core::{DeviceId, DeviceType, Error, Result, StreamParams, ffi}; use cubeb_core::{DeviceId, DeviceType, Error, ErrorCode, Result, StreamParams, ffi};
use cubeb_core::binding::Binding; use cubeb_core::binding::Binding;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::mem; use std::mem;
@ -33,13 +34,14 @@ pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
impl ClientContext { impl ClientContext {
#[doc(hidden)] #[doc(hidden)]
pub fn conn(&self) -> MutexGuard<Connection> { pub fn connection(&self) -> MutexGuard<Connection> {
self.connection.lock().unwrap() self.connection.lock().unwrap()
} }
} }
impl Context for ClientContext { impl Context for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> { fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
assert_not_in_callback();
// TODO: encapsulate connect, etc inside audioipc. // TODO: encapsulate connect, etc inside audioipc.
let stream = t!(UnixStream::connect(audioipc::get_uds_path())); let stream = t!(UnixStream::connect(audioipc::get_uds_path()));
let ctx = Box::new(ClientContext { let ctx = Box::new(ClientContext {
@ -50,29 +52,44 @@ impl Context for ClientContext {
} }
fn backend_id(&self) -> &'static CStr { fn backend_id(&self) -> &'static CStr {
// HACK: This is called reentrantly from Gecko's AudioStream::DataCallback.
//assert_not_in_callback();
unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) } unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
} }
fn max_channel_count(&self) -> Result<u32> { fn max_channel_count(&self) -> Result<u32> {
send_recv!(self.conn(), ContextGetMaxChannelCount => ContextMaxChannelCount()) // HACK: This needs to be reentrant as MSG calls it from within data_callback.
//assert_not_in_callback();
//let mut conn = self.connection();
//send_recv!(conn, ContextGetMaxChannelCount => ContextMaxChannelCount())
warn!("Context::max_channel_count lying about result until reentrancy issues resolved.");
Ok(2)
} }
fn min_latency(&self, params: &StreamParams) -> Result<u32> { fn min_latency(&self, params: &StreamParams) -> Result<u32> {
assert_not_in_callback();
let params = messages::StreamParams::from(unsafe { &*params.raw() }); let params = messages::StreamParams::from(unsafe { &*params.raw() });
send_recv!(self.conn(), ContextGetMinLatency(params) => ContextMinLatency()) let mut conn = self.connection();
send_recv!(conn, ContextGetMinLatency(params) => ContextMinLatency())
} }
fn preferred_sample_rate(&self) -> Result<u32> { fn preferred_sample_rate(&self) -> Result<u32> {
send_recv!(self.conn(), ContextGetPreferredSampleRate => ContextPreferredSampleRate()) assert_not_in_callback();
let mut conn = self.connection();
send_recv!(conn, ContextGetPreferredSampleRate => ContextPreferredSampleRate())
} }
fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> { fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
send_recv!(self.conn(), ContextGetPreferredChannelLayout => ContextPreferredChannelLayout()) assert_not_in_callback();
let mut conn = self.connection();
send_recv!(conn, ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
} }
fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> { fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
assert_not_in_callback();
let mut conn = self.connection();
let v: Vec<ffi::cubeb_device_info> = let v: Vec<ffi::cubeb_device_info> =
match send_recv!(self.conn(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) { match send_recv!(conn, ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices()) {
Ok(mut v) => v.drain(..).map(|i| i.into()).collect(), Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
Err(e) => return Err(e), Err(e) => return Err(e),
}; };
@ -88,6 +105,7 @@ impl Context for ClientContext {
} }
fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) { fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
assert_not_in_callback();
unsafe { unsafe {
let coll = &*collection; let coll = &*collection;
let mut devices = Vec::from_raw_parts( let mut devices = Vec::from_raw_parts(
@ -95,7 +113,7 @@ impl Context for ClientContext {
coll.count, coll.count,
coll.count coll.count
); );
for dev in devices.iter_mut() { for dev in &mut devices {
if !dev.device_id.is_null() { if !dev.device_id.is_null() {
let _ = CString::from_raw(dev.device_id as *mut _); let _ = CString::from_raw(dev.device_id as *mut _);
} }
@ -125,6 +143,7 @@ impl Context for ClientContext {
state_callback: ffi::cubeb_state_callback, state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void, user_ptr: *mut c_void,
) -> Result<*mut ffi::cubeb_stream> { ) -> Result<*mut ffi::cubeb_stream> {
assert_not_in_callback();
fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> { fn opt_stream_params(p: Option<&ffi::cubeb_stream_params>) -> Option<messages::StreamParams> {
match p { match p {
@ -149,7 +168,7 @@ impl Context for ClientContext {
output_stream_params: output_stream_params, output_stream_params: output_stream_params,
latency_frames: latency_frame latency_frames: latency_frame
}; };
stream::init(&self, init_params, data_callback, state_callback, user_ptr) stream::init(self, init_params, data_callback, state_callback, user_ptr)
} }
fn register_device_collection_changed( fn register_device_collection_changed(
@ -158,13 +177,24 @@ impl Context for ClientContext {
_collection_changed_callback: ffi::cubeb_device_collection_changed_callback, _collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
_user_ptr: *mut c_void, _user_ptr: *mut c_void,
) -> Result<()> { ) -> Result<()> {
assert_not_in_callback();
Ok(()) Ok(())
} }
} }
impl Drop for ClientContext { impl Drop for ClientContext {
fn drop(&mut self) { fn drop(&mut self) {
let mut conn = self.connection();
info!("ClientContext drop..."); info!("ClientContext drop...");
let _: Result<()> = send_recv!(self.conn(), ClientDisconnect => ClientDisconnected); let r = conn.send(ServerMessage::ClientDisconnect);
if r.is_err() {
debug!("ClientContext::Drop send error={:?}", r);
} else {
let r = conn.receive();
if let Ok(ClientMessage::ClientDisconnected) = r {
} else {
debug!("ClientContext::Drop receive error={:?}", r);
}
}
} }
} }

Просмотреть файл

@ -21,6 +21,21 @@ use cubeb_core::ffi;
use std::os::raw::{c_char, c_int}; use std::os::raw::{c_char, c_int};
use stream::ClientStream; use stream::ClientStream;
thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
fn set_in_callback(in_callback: bool) {
IN_CALLBACK.with(|b| {
assert_eq!(*b.borrow(), !in_callback);
*b.borrow_mut() = in_callback;
});
}
fn assert_not_in_callback() {
IN_CALLBACK.with(|b| {
assert_eq!(*b.borrow(), false);
});
}
#[no_mangle] #[no_mangle]
/// Entry point from C code. /// Entry point from C code.
pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char) -> c_int { pub unsafe extern "C" fn audioipc_client_init(c: *mut *mut ffi::cubeb, context_name: *const c_char) -> c_int {

Просмотреть файл

@ -17,26 +17,36 @@ macro_rules! send_recv {
send_recv!(__recv $conn, $rmsg __result) send_recv!(__recv $conn, $rmsg __result)
}}; }};
// //
(__send $conn:expr, $smsg:ident) => ( (__send $conn:expr, $smsg:ident) => ({
$conn.send(ServerMessage::$smsg) let r = $conn.send(ServerMessage::$smsg);
.unwrap(); if r.is_err() {
); debug!("send error - got={:?}", r);
(__send $conn:expr, $smsg:ident, $($a:expr),*) => ( return Err(ErrorCode::Error.into());
$conn.send(ServerMessage::$smsg($($a),*)) }
.unwrap(); });
); (__send $conn:expr, $smsg:ident, $($a:expr),*) => ({
(__recv $conn:expr, $rmsg:ident) => ( let r = $conn.send(ServerMessage::$smsg($($a),*));
if let ClientMessage::$rmsg = $conn.receive().unwrap() { if r.is_err() {
debug!("send error - got={:?}", r);
return Err(ErrorCode::Error.into());
}
});
(__recv $conn:expr, $rmsg:ident) => ({
let r = $conn.receive().unwrap();
if let ClientMessage::$rmsg = r {
Ok(()) Ok(())
} else { } else {
panic!("wrong message received"); debug!("receive error - got={:?}", r);
Err(ErrorCode::Error.into())
} }
); });
(__recv $conn:expr, $rmsg:ident __result) => ( (__recv $conn:expr, $rmsg:ident __result) => ({
if let ClientMessage::$rmsg(v) = $conn.receive().unwrap() { let r = $conn.receive().unwrap();
if let ClientMessage::$rmsg(v) = r {
Ok(v) Ok(v)
} else { } else {
panic!("wrong message received"); debug!("receive error - got={:?}", r);
Err(ErrorCode::Error.into())
} }
) })
} }

Просмотреть файл

@ -4,14 +4,15 @@
// accompanying file LICENSE for details // accompanying file LICENSE for details
use ClientContext; use ClientContext;
use {set_in_callback, assert_not_in_callback};
use audioipc::{ClientMessage, Connection, ServerMessage, messages}; use audioipc::{ClientMessage, Connection, ServerMessage, messages};
use audioipc::shm::{SharedMemSlice, SharedMemMutSlice}; use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
use cubeb_backend::Stream; use cubeb_backend::Stream;
use cubeb_core::{ErrorCode, Result, ffi}; use cubeb_core::{ErrorCode, Result, ffi};
use std::ffi::CString; use std::ffi::CString;
use std::fs::File;
use std::os::raw::c_void; use std::os::raw::c_void;
use std::os::unix::io::FromRawFd; use std::os::unix::io::FromRawFd;
use std::fs::File;
use std::ptr; use std::ptr;
use std::thread; use std::thread;
@ -27,7 +28,7 @@ pub struct ClientStream<'ctx> {
fn stream_thread( fn stream_thread(
mut conn: Connection, mut conn: Connection,
input_shm: SharedMemSlice, input_shm: &SharedMemSlice,
mut output_shm: SharedMemMutSlice, mut output_shm: SharedMemMutSlice,
data_cb: ffi::cubeb_data_callback, data_cb: ffi::cubeb_data_callback,
state_cb: ffi::cubeb_state_callback, state_cb: ffi::cubeb_state_callback,
@ -38,7 +39,7 @@ fn stream_thread(
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
debug!("stream_thread: Failed to receive message: {:?}", e); debug!("stream_thread: Failed to receive message: {:?}", e);
continue; return;
}, },
}; };
@ -48,14 +49,21 @@ fn stream_thread(
return; return;
}, },
ClientMessage::StreamDataCallback(nframes, frame_size) => { ClientMessage::StreamDataCallback(nframes, frame_size) => {
info!( trace!(
"stream_thread: Data Callback: nframes={} frame_size={}", "stream_thread: Data Callback: nframes={} frame_size={}",
nframes, nframes,
frame_size frame_size
); );
// TODO: This is proof-of-concept. Make it better. // TODO: This is proof-of-concept. Make it better.
let input_ptr: *const u8 = input_shm.get_slice(nframes as usize * frame_size).unwrap().as_ptr(); let input_ptr: *const u8 = input_shm
let output_ptr: *mut u8 = output_shm.get_mut_slice(nframes as usize * frame_size).unwrap().as_mut_ptr(); .get_slice(nframes as usize * frame_size)
.unwrap()
.as_ptr();
let output_ptr: *mut u8 = output_shm
.get_mut_slice(nframes as usize * frame_size)
.unwrap()
.as_mut_ptr();
set_in_callback(true);
let nframes = data_cb( let nframes = data_cb(
ptr::null_mut(), ptr::null_mut(),
user_ptr as *mut c_void, user_ptr as *mut c_void,
@ -63,11 +71,18 @@ fn stream_thread(
output_ptr as *mut _, output_ptr as *mut _,
nframes as _ nframes as _
); );
conn.send(ServerMessage::StreamDataCallback(nframes as isize)).unwrap(); set_in_callback(false);
let r = conn.send(ServerMessage::StreamDataCallback(nframes as isize));
if r.is_err() {
debug!("stream_thread: Failed to send StreamDataCallback: {:?}", r);
return;
}
}, },
ClientMessage::StreamStateCallback(state) => { ClientMessage::StreamStateCallback(state) => {
info!("stream_thread: State Callback: {:?}", state); info!("stream_thread: State Callback: {:?}", state);
set_in_callback(true);
state_cb(ptr::null_mut(), user_ptr as *mut _, state); state_cb(ptr::null_mut(), user_ptr as *mut _, state);
set_in_callback(false);
}, },
m => { m => {
info!("Unexpected ClientMessage: {:?}", m); info!("Unexpected ClientMessage: {:?}", m);
@ -84,25 +99,30 @@ impl<'ctx> ClientStream<'ctx> {
state_callback: ffi::cubeb_state_callback, state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void, user_ptr: *mut c_void,
) -> Result<*mut ffi::cubeb_stream> { ) -> Result<*mut ffi::cubeb_stream> {
assert_not_in_callback();
let mut conn = ctx.connection();
ctx.conn() let r = conn.send(ServerMessage::StreamInit(init_params));
.send(ServerMessage::StreamInit(init_params)) if r.is_err() {
.unwrap(); debug!("ClientStream::init: Failed to send StreamInit: {:?}", r);
return Err(ErrorCode::Error.into());
}
let r = match ctx.conn().receive_with_fd::<ClientMessage>() { let r = match conn.receive_with_fd::<ClientMessage>() {
Ok(r) => r, Ok(r) => r,
Err(_) => return Err(ErrorCode::Error.into()), Err(_) => return Err(ErrorCode::Error.into()),
}; };
let (token, conn) = match r { let (token, conn2) = match r {
(ClientMessage::StreamCreated(tok), Some(fd)) => (tok, unsafe { ClientMessage::StreamCreated(tok) => {
Connection::from_raw_fd(fd) let fd = conn.take_fd();
}), if fd.is_none() {
(ClientMessage::StreamCreated(_), None) => { debug!("Missing fd!");
debug!("Missing fd!"); return Err(ErrorCode::Error.into());
return Err(ErrorCode::Error.into()); }
(tok, unsafe { Connection::from_raw_fd(fd.unwrap()) })
}, },
(m, _) => { m => {
debug!("Unexpected message: {:?}", m); debug!("Unexpected message: {:?}", m);
return Err(ErrorCode::Error.into()); return Err(ErrorCode::Error.into());
}, },
@ -111,45 +131,60 @@ impl<'ctx> ClientStream<'ctx> {
// TODO: It'd be nicer to receive these two fds as part of // TODO: It'd be nicer to receive these two fds as part of
// StreamCreated, but that requires changing sendmsg/recvmsg to // StreamCreated, but that requires changing sendmsg/recvmsg to
// support multiple fds. // support multiple fds.
let r = match ctx.conn().receive_with_fd::<ClientMessage>() { let r = match conn.receive_with_fd::<ClientMessage>() {
Ok(r) => r, Ok(r) => r,
Err(_) => return Err(ErrorCode::Error.into()), Err(_) => return Err(ErrorCode::Error.into()),
}; };
let input_file = match r { let input_file = match r {
(ClientMessage::StreamCreatedInputShm, Some(fd)) => unsafe { ClientMessage::StreamCreatedInputShm => {
File::from_raw_fd(fd) let fd = conn.take_fd();
if fd.is_none() {
debug!("Missing fd!");
return Err(ErrorCode::Error.into());
}
unsafe { File::from_raw_fd(fd.unwrap()) }
}, },
(m, _) => { m => {
debug!("Unexpected message: {:?}", m); debug!("Unexpected message: {:?}", m);
return Err(ErrorCode::Error.into()); return Err(ErrorCode::Error.into());
}, },
}; };
let input_shm = SharedMemSlice::from(input_file, let input_shm = SharedMemSlice::from(input_file, SHM_AREA_SIZE).unwrap();
SHM_AREA_SIZE).unwrap();
let r = match ctx.conn().receive_with_fd::<ClientMessage>() { let r = match conn.receive_with_fd::<ClientMessage>() {
Ok(r) => r, Ok(r) => r,
Err(_) => return Err(ErrorCode::Error.into()), Err(_) => return Err(ErrorCode::Error.into()),
}; };
let output_file = match r { let output_file = match r {
(ClientMessage::StreamCreatedOutputShm, Some(fd)) => unsafe { ClientMessage::StreamCreatedOutputShm => {
File::from_raw_fd(fd) let fd = conn.take_fd();
if fd.is_none() {
debug!("Missing fd!");
return Err(ErrorCode::Error.into());
}
unsafe { File::from_raw_fd(fd.unwrap()) }
}, },
(m, _) => { m => {
debug!("Unexpected message: {:?}", m); debug!("Unexpected message: {:?}", m);
return Err(ErrorCode::Error.into()); return Err(ErrorCode::Error.into());
}, },
}; };
let output_shm = SharedMemMutSlice::from(output_file, let output_shm = SharedMemMutSlice::from(output_file, SHM_AREA_SIZE).unwrap();
SHM_AREA_SIZE).unwrap();
let user_data = user_ptr as usize; let user_data = user_ptr as usize;
let join_handle = thread::spawn(move || { let join_handle = thread::spawn(move || {
stream_thread(conn, input_shm, output_shm, data_callback, state_callback, user_data) stream_thread(
conn2,
&input_shm,
output_shm,
data_callback,
state_callback,
user_data
)
}); });
Ok(Box::into_raw(Box::new(ClientStream { Ok(Box::into_raw(Box::new(ClientStream {
@ -162,48 +197,76 @@ impl<'ctx> ClientStream<'ctx> {
impl<'ctx> Drop for ClientStream<'ctx> { impl<'ctx> Drop for ClientStream<'ctx> {
fn drop(&mut self) { fn drop(&mut self) {
let _: Result<()> = send_recv!(self.context.conn(), StreamDestroy(self.token) => StreamDestroyed); let mut conn = self.context.connection();
let r = conn.send(ServerMessage::StreamDestroy(self.token));
if r.is_err() {
debug!("ClientStream::Drop send error={:?}", r);
} else {
let r = conn.receive();
if let Ok(ClientMessage::StreamDestroyed) = r {
} else {
debug!("ClientStream::Drop receive error={:?}", r);
}
}
// XXX: This is guaranteed to wait forever if the send failed.
self.join_handle.take().unwrap().join().unwrap(); self.join_handle.take().unwrap().join().unwrap();
} }
} }
impl<'ctx> Stream for ClientStream<'ctx> { impl<'ctx> Stream for ClientStream<'ctx> {
fn start(&self) -> Result<()> { fn start(&self) -> Result<()> {
send_recv!(self.context.conn(), StreamStart(self.token) => StreamStarted) assert_not_in_callback();
let mut conn = self.context.connection();
send_recv!(conn, StreamStart(self.token) => StreamStarted)
} }
fn stop(&self) -> Result<()> { fn stop(&self) -> Result<()> {
send_recv!(self.context.conn(), StreamStop(self.token) => StreamStopped) assert_not_in_callback();
let mut conn = self.context.connection();
send_recv!(conn, StreamStop(self.token) => StreamStopped)
} }
fn reset_default_device(&self) -> Result<()> { fn reset_default_device(&self) -> Result<()> {
send_recv!(self.context.conn(), StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset) assert_not_in_callback();
let mut conn = self.context.connection();
send_recv!(conn, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
} }
fn position(&self) -> Result<u64> { fn position(&self) -> Result<u64> {
send_recv!(self.context.conn(), StreamGetPosition(self.token) => StreamPosition()) assert_not_in_callback();
let mut conn = self.context.connection();
send_recv!(conn, StreamGetPosition(self.token) => StreamPosition())
} }
fn latency(&self) -> Result<u32> { fn latency(&self) -> Result<u32> {
send_recv!(self.context.conn(), StreamGetLatency(self.token) => StreamLatency()) assert_not_in_callback();
let mut conn = self.context.connection();
send_recv!(conn, StreamGetLatency(self.token) => StreamLatency())
} }
fn set_volume(&self, volume: f32) -> Result<()> { fn set_volume(&self, volume: f32) -> Result<()> {
send_recv!(self.context.conn(), StreamSetVolume(self.token, volume) => StreamVolumeSet) assert_not_in_callback();
let mut conn = self.context.connection();
send_recv!(conn, StreamSetVolume(self.token, volume) => StreamVolumeSet)
} }
fn set_panning(&self, panning: f32) -> Result<()> { fn set_panning(&self, panning: f32) -> Result<()> {
send_recv!(self.context.conn(), StreamSetPanning(self.token, panning) => StreamPanningSet) assert_not_in_callback();
let mut conn = self.context.connection();
send_recv!(conn, StreamSetPanning(self.token, panning) => StreamPanningSet)
} }
fn current_device(&self) -> Result<*const ffi::cubeb_device> { fn current_device(&self) -> Result<*const ffi::cubeb_device> {
match send_recv!(self.context.conn(), StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) { assert_not_in_callback();
let mut conn = self.context.connection();
match send_recv!(conn, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
Ok(d) => Ok(Box::into_raw(Box::new(d.into()))), Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
Err(e) => Err(e), Err(e) => Err(e),
} }
} }
fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> { fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
assert_not_in_callback();
// It's all unsafe... // It's all unsafe...
if !device.is_null() { if !device.is_null() {
unsafe { unsafe {
@ -224,6 +287,7 @@ impl<'ctx> Stream for ClientStream<'ctx> {
&self, &self,
_device_changed_callback: ffi::cubeb_device_changed_callback, _device_changed_callback: ffi::cubeb_device_changed_callback,
) -> Result<()> { ) -> Result<()> {
assert_not_in_callback();
Ok(()) Ok(())
} }
} }

Просмотреть файл

@ -6,8 +6,9 @@ description = "Remote cubeb server"
[dependencies] [dependencies]
audioipc = { path = "../audioipc" } audioipc = { path = "../audioipc" }
cubeb = { path = "../../cubeb-rs/cubeb-api" }
cubeb-core = { path = "../../cubeb-rs/cubeb-core" } cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
cubeb = { path = "../../cubeb-rs/cubeb-api" }
bytes = "0.4"
error-chain = "0.10.0" error-chain = "0.10.0"
lazycell = "^0.4" lazycell = "^0.4"
log = "^0.3.6" log = "^0.3.6"

Просмотреть файл

@ -114,10 +114,10 @@ impl Evented for ReceiverCtl {
let _ = set_readiness.set_readiness(Ready::readable()); let _ = set_readiness.set_readiness(Ready::readable());
} }
self.registration.fill(registration).ok().expect( self.registration.fill(registration).expect(
"unexpected state encountered" "unexpected state encountered"
); );
self.inner.set_readiness.fill(set_readiness).ok().expect( self.inner.set_readiness.fill(set_readiness).expect(
"unexpected state encountered" "unexpected state encountered"
); );
@ -136,7 +136,7 @@ impl Evented for ReceiverCtl {
fn deregister(&self, poll: &Poll) -> io::Result<()> { fn deregister(&self, poll: &Poll) -> io::Result<()> {
match self.registration.borrow() { match self.registration.borrow() {
Some(registration) => <Registration as Evented>::deregister(&registration, poll), Some(registration) => <Registration as Evented>::deregister(registration, poll),
None => Err(io::Error::new( None => Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"receiver not registered" "receiver not registered"
@ -192,19 +192,19 @@ impl<T> From<io::Error> for TrySendError<T> {
impl<T: Any> error::Error for SendError<T> { impl<T: Any> error::Error for SendError<T> {
fn description(&self) -> &str { fn description(&self) -> &str {
match self { match *self {
&SendError::Io(ref io_err) => io_err.description(), SendError::Io(ref io_err) => io_err.description(),
&SendError::Disconnected(..) => "Disconnected", SendError::Disconnected(..) => "Disconnected",
} }
} }
} }
impl<T: Any> error::Error for TrySendError<T> { impl<T: Any> error::Error for TrySendError<T> {
fn description(&self) -> &str { fn description(&self) -> &str {
match self { match *self {
&TrySendError::Io(ref io_err) => io_err.description(), TrySendError::Io(ref io_err) => io_err.description(),
&TrySendError::Full(..) => "Full", TrySendError::Full(..) => "Full",
&TrySendError::Disconnected(..) => "Disconnected", TrySendError::Disconnected(..) => "Disconnected",
} }
} }
} }
@ -235,17 +235,17 @@ impl<T> fmt::Display for TrySendError<T> {
#[inline] #[inline]
fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result { fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
match e { match *e {
&SendError::Io(ref io_err) => write!(f, "{}", io_err), SendError::Io(ref io_err) => write!(f, "{}", io_err),
&SendError::Disconnected(..) => write!(f, "Disconnected"), SendError::Disconnected(..) => write!(f, "Disconnected"),
} }
} }
#[inline] #[inline]
fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result { fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
match e { match *e {
&TrySendError::Io(ref io_err) => write!(f, "{}", io_err), TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
&TrySendError::Full(..) => write!(f, "Full"), TrySendError::Full(..) => write!(f, "Full"),
&TrySendError::Disconnected(..) => write!(f, "Disconnected"), TrySendError::Disconnected(..) => write!(f, "Disconnected"),
} }
} }

Разница между файлами не показана из-за своего большого размера Загрузить разницу