Bug 1440538 - P2: Update audioipc to commit 933fb48. r=kinetik

MozReview-Commit-ID: 29VUZKxz3xR

--HG--
extra : rebase_source : 52c8d16aadbd677d22102b9fb989055cb21bb607
This commit is contained in:
Dan Glastonbury 2018-02-13 14:39:06 +10:00
Родитель a006d86faa
Коммит 3855912725
25 изменённых файлов: 449 добавлений и 543 удалений

Просмотреть файл

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was d7798606aa590ef402344b7a519a0053725a9805 (2018-01-27 09:07:03 +1000)
The git commit ID used was 933fb48b252a10569ba8d598541577c6f2dc308f (2018-02-21 17:13:04 +1000)

Просмотреть файл

@ -8,7 +8,7 @@ authors = [
description = "Remote Cubeb IPC"
[dependencies]
cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
cubeb = "0.4"
bincode = "0.8"
bytes = "0.4"
# rayon-core in Gecko uses futures 0.1.13

Просмотреть файл

@ -113,11 +113,11 @@ impl AsyncRecvMsg for UnixStream {
cmsg.advance_mut(cmsg_len);
}
Ok((n, flags).into())
},
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.need_read();
Ok(Async::NotReady)
},
}
Err(e) => Err(e),
}
}
@ -139,22 +139,7 @@ impl AsyncSendMsg for UnixStream {
static DUMMY: &[u8] = &[0];
let nom = <&IoVec>::from(DUMMY);
let mut bufs = [
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom,
nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom, nom
];
let n = buf.bytes_vec(&mut bufs);
self.send_msg(&bufs[..n], cmsg.bytes())
@ -163,11 +148,11 @@ impl AsyncSendMsg for UnixStream {
Ok(n) => {
buf.advance(n);
Ok(Async::Ready(n))
},
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.need_write();
Ok(Async::NotReady)
},
}
Err(e) => Err(e),
}
}

Просмотреть файл

@ -10,7 +10,7 @@ use std::os::unix::io::RawFd;
#[derive(Clone, Debug)]
pub struct Fds {
fds: Bytes
fds: Bytes,
}
impl convert::AsRef<[RawFd]> for Fds {
@ -30,13 +30,11 @@ impl ops::Deref for Fds {
}
pub struct ControlMsgIter {
control: Bytes
control: Bytes,
}
pub fn iterator(c: Bytes) -> ControlMsgIter {
ControlMsgIter {
control: c
}
ControlMsgIter { control: c }
}
impl Iterator for ControlMsgIter {
@ -73,12 +71,12 @@ impl Iterator for ControlMsgIter {
(libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
trace!("Found SCM_RIGHTS...");
return Some(Fds {
fds: control.slice(cmsghdr_len, cmsg_len as _)
fds: control.slice(cmsghdr_len, cmsg_len as _),
});
},
}
(level, kind) => {
trace!("Skipping cmsg level, {}, type={}...", level, kind);
},
}
}
}
}
@ -87,19 +85,17 @@ impl Iterator for ControlMsgIter {
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum Error {
/// Not enough space in storage to insert control mesage.
NoSpace
NoSpace,
}
#[must_use]
pub struct ControlMsgBuilder {
result: Result<BytesMut, Error>
result: Result<BytesMut, Error>,
}
pub fn builder(buf: &mut BytesMut) -> ControlMsgBuilder {
let buf = aligned(buf);
ControlMsgBuilder {
result: Ok(buf)
}
ControlMsgBuilder { result: Ok(buf) }
}
impl ControlMsgBuilder {
@ -113,10 +109,12 @@ impl ControlMsgBuilder {
let cmsghdr = cmsghdr {
cmsg_len: cmsg_len as _,
cmsg_level: level,
cmsg_type: kind
cmsg_type: kind,
};
let cmsghdr = unsafe { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>()) };
let cmsghdr = unsafe {
slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>())
};
cmsg.put_slice(cmsghdr);
let mut cmsg = try!(align_buf(cmsg));
cmsg.put_slice(msg);

Просмотреть файл

@ -35,8 +35,8 @@ pub trait Codec {
Some(frame) => Ok(frame),
None => Err(io::Error::new(
io::ErrorKind::Other,
"bytes remaining on stream"
))
"bytes remaining on stream",
)),
}
}
@ -53,12 +53,12 @@ pub trait Codec {
pub struct LengthDelimitedCodec<In, Out> {
state: State,
__in: PhantomData<In>,
__out: PhantomData<Out>
__out: PhantomData<Out>,
}
enum State {
Length,
Data(u16)
Data(u16),
}
impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
@ -66,7 +66,7 @@ impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
LengthDelimitedCodec {
state: State::Length,
__in: PhantomData,
__out: PhantomData
__out: PhantomData,
}
}
}
@ -89,7 +89,7 @@ impl<In, Out> LengthDelimitedCodec<In, Out> {
fn decode_data(&mut self, buf: &mut BytesMut, n: u16) -> io::Result<Option<Out>>
where
Out: DeserializeOwned + Debug
Out: DeserializeOwned + Debug,
{
// At this point, the buffer has already had the required capacity
// reserved. All there is to do is read.
@ -103,7 +103,7 @@ impl<In, Out> LengthDelimitedCodec<In, Out> {
trace!("Attempting to decode");
let msg = try!(deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
bincode::ErrorKind::IoError(e) => e,
_ => io::Error::new(io::ErrorKind::Other, *e)
_ => io::Error::new(io::ErrorKind::Other, *e),
}));
trace!("... Decoded {:?}", msg);
@ -114,7 +114,7 @@ impl<In, Out> LengthDelimitedCodec<In, Out> {
impl<In, Out> Codec for LengthDelimitedCodec<In, Out>
where
In: Serialize + Debug,
Out: DeserializeOwned + Debug
Out: DeserializeOwned + Debug,
{
type In = In;
type Out = Out;
@ -131,11 +131,11 @@ where
buf.reserve(n as usize);
n
},
None => return Ok(None)
}
None => return Ok(None),
}
},
State::Data(n) => n
}
State::Data(n) => n,
};
match try!(self.decode_data(buf, n)) {
@ -147,8 +147,8 @@ where
buf.reserve(2);
Ok(Some(data))
},
None => Ok(None)
}
None => Ok(None),
}
}
@ -158,7 +158,7 @@ where
if encoded_len > 8 * 1024 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"encoded message too big"
"encoded message too big",
));
}
@ -171,7 +171,7 @@ where
{
match *e {
bincode::ErrorKind::IoError(e) => return Err(e),
_ => return Err(io::Error::new(io::ErrorKind::Other, *e))
_ => return Err(io::Error::new(io::ErrorKind::Other, *e)),
}
}

Просмотреть файл

@ -16,7 +16,7 @@ pub fn handle() -> Handle {
pub fn spawn<F>(f: F)
where
F: Future<Item = (), Error = ()> + 'static
F: Future<Item = (), Error = ()> + 'static,
{
HANDLE.with(|handle| handle.spawn(f))
}
@ -24,19 +24,19 @@ where
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
R: IntoFuture<Item = (), Error = ()> + 'static
R: IntoFuture<Item = (), Error = ()> + 'static,
{
HANDLE.with(|handle| handle.spawn_fn(f))
}
struct Inner {
join: thread::JoinHandle<()>,
shutdown: oneshot::Sender<()>
shutdown: oneshot::Sender<()>,
}
pub struct CoreThread {
inner: Option<Inner>,
remote: Remote
remote: Remote,
}
impl CoreThread {
@ -65,7 +65,7 @@ impl fmt::Debug for CoreThread {
pub fn spawn_thread<S, F>(name: S, f: F) -> io::Result<CoreThread>
where
S: Into<String>,
F: FnOnce() -> io::Result<()> + Send + 'static
F: FnOnce() -> io::Result<()> + Send + 'static,
{
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (remote_tx, remote_rx) = mpsc::channel::<Remote>();
@ -85,18 +85,16 @@ where
trace!("thread shutdown...");
}));
let remote = try!(remote_rx.recv().or_else(|_| {
Err(io::Error::new(
io::ErrorKind::Other,
"Failed to receive remote handle from spawned thread"
))
}));
let remote = try!(remote_rx.recv().or_else(|_| Err(io::Error::new(
io::ErrorKind::Other,
"Failed to receive remote handle from spawned thread"
))));
Ok(CoreThread {
inner: Some(Inner {
join: join,
shutdown: shutdown_tx
shutdown: shutdown_tx,
}),
remote: remote
remote: remote,
})
}

Просмотреть файл

@ -1,5 +1,5 @@
use bincode;
use cubeb_core;
use cubeb;
use std;
error_chain! {
@ -7,7 +7,7 @@ error_chain! {
foreign_links {
Bincode(bincode::Error);
Io(std::io::Error);
Cubeb(cubeb_core::Error);
Cubeb(cubeb::Error);
}
// Replace bail!(str) with explicit errors.

Просмотреть файл

@ -20,7 +20,7 @@ const FDS_CAPACITY: usize = 16;
struct IncomingFds {
cmsg: BytesMut,
recv_fds: Option<cmsg::ControlMsgIter>
recv_fds: Option<cmsg::ControlMsgIter>,
}
impl IncomingFds {
@ -28,7 +28,7 @@ impl IncomingFds {
let capacity = c * cmsg::space(mem::size_of::<[RawFd; 3]>());
IncomingFds {
cmsg: BytesMut::with_capacity(capacity),
recv_fds: None
recv_fds: None,
}
}
@ -60,7 +60,7 @@ impl IncomingFds {
#[derive(Debug)]
struct Frame {
msgs: Bytes,
fds: Option<Bytes>
fds: Option<Bytes>,
}
/// A unified `Stream` and `Sink` interface over an I/O object, using
@ -76,12 +76,12 @@ pub struct FramedWithFds<A, C> {
// Sink
frames: VecDeque<Frame>,
write_buf: BytesMut,
outgoing_fds: BytesMut
outgoing_fds: BytesMut,
}
impl<A, C> FramedWithFds<A, C>
where
A: AsyncSendMsg
A: AsyncSendMsg,
{
// If there is a buffered frame, try to write it to `A`
fn do_write(&mut self) -> Poll<(), io::Error> {
@ -102,10 +102,10 @@ where
let mut msgs = frame.msgs.clone().into_buf();
let mut fds = match frame.fds {
Some(ref fds) => fds.clone(),
None => Bytes::new()
None => Bytes::new(),
}.into_buf();
try_ready!(self.io.send_msg_buf(&mut msgs, &fds))
},
}
_ => {
// No pending frames.
return Ok(().into());
@ -137,8 +137,8 @@ where
self.frames.push_front(frame);
break;
}
},
_ => panic!()
}
_ => panic!(),
}
}
debug!("process {} frames", processed);
@ -158,10 +158,7 @@ where
let msgs = self.write_buf.take().freeze();
trace!("set_frame: msgs={:?} fds={:?}", msgs, fds);
self.frames.push_back(Frame {
msgs,
fds
});
self.frames.push_back(Frame { msgs, fds });
}
}
@ -169,7 +166,7 @@ impl<A, C> Stream for FramedWithFds<A, C>
where
A: AsyncRecvMsg,
C: Codec,
C::Out: AssocRawFd
C::Out: AssocRawFd,
{
type Item = C::Out;
type Error = io::Error;
@ -226,15 +223,12 @@ impl<A, C> Sink for FramedWithFds<A, C>
where
A: AsyncSendMsg,
C: Codec,
C::In: AssocRawFd + fmt::Debug
C::In: AssocRawFd + fmt::Debug,
{
type SinkItem = C::In;
type SinkError = io::Error;
fn start_send(
&mut self,
item: Self::SinkItem
) -> StartSend<Self::SinkItem, Self::SinkError> {
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
trace!("start_send: item={:?}", item);
// If the buffer is already over BACKPRESSURE_THRESHOLD,
@ -295,8 +289,8 @@ pub fn framed_with_fds<A, C>(io: A, codec: C) -> FramedWithFds<A, C> {
frames: VecDeque::new(),
write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
outgoing_fds: BytesMut::with_capacity(
FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>())
)
FDS_CAPACITY * cmsg::space(mem::size_of::<[RawFd; 3]>()),
),
}
}
@ -307,7 +301,7 @@ fn write_zero() -> io::Error {
fn clone_into_array<A, T>(slice: &[T]) -> A
where
A: Sized + Default + AsMut<[T]>,
T: Clone
T: Clone,
{
let mut a = Default::default();
<A as AsMut<[T]>>::as_mut(&mut a).clone_from_slice(slice);

Просмотреть файл

@ -21,7 +21,7 @@ pub struct Framed<A, C> {
write_buf: BytesMut,
frame: Option<<Bytes as IntoBuf>::Buf>,
is_readable: bool,
eof: bool
eof: bool,
}
impl<A, C> Framed<A, C>
@ -159,6 +159,6 @@ pub fn framed<A, C>(io: A, codec: C) -> Framed<A, C> {
write_buf: BytesMut::with_capacity(INITIAL_CAPACITY),
frame: None,
is_readable: false,
eof: false
eof: false,
}
}

Просмотреть файл

@ -3,7 +3,6 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
#![allow(dead_code)] // TODO: Remove.
#![recursion_limit = "1024"]
#[macro_use]
extern crate error_chain;
@ -16,15 +15,15 @@ extern crate serde_derive;
extern crate bincode;
extern crate bytes;
extern crate cubeb_core;
extern crate cubeb;
#[macro_use]
extern crate futures;
extern crate iovec;
extern crate libc;
extern crate memmap;
extern crate serde;
#[macro_use]
extern crate scoped_tls;
extern crate serde;
extern crate tokio_core;
#[macro_use]
extern crate tokio_io;
@ -43,11 +42,9 @@ mod msg;
pub mod shm;
use iovec::IoVec;
#[cfg(target_os = "linux")]
use libc::MSG_CMSG_CLOEXEC;
pub use messages::{ClientMessage, ServerMessage};
use std::env::temp_dir;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
@ -59,7 +56,11 @@ const MSG_CMSG_CLOEXEC: libc::c_int = 0;
// We can extend UnixStream by using traits, eliminating the need to introduce a new wrapped
// UnixStream type.
pub trait RecvMsg {
fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)>;
fn recv_msg(
&mut self,
iov: &mut [&mut IoVec],
cmsg: &mut [u8],
) -> io::Result<(usize, usize, i32)>;
}
pub trait SendMsg {
@ -67,7 +68,11 @@ pub trait SendMsg {
}
impl<T: AsRawFd> RecvMsg for T {
fn recv_msg(&mut self, iov: &mut [&mut IoVec], cmsg: &mut [u8]) -> io::Result<(usize, usize, i32)> {
fn recv_msg(
&mut self,
iov: &mut [&mut IoVec],
cmsg: &mut [u8],
) -> io::Result<(usize, usize, i32)> {
msg::recv_msg_with_flags(self.as_raw_fd(), iov, cmsg, MSG_CMSG_CLOEXEC)
}
}

Просмотреть файл

@ -3,23 +3,23 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use cubeb_core::{self, ffi};
use cubeb::{self, ffi};
use std::ffi::{CStr, CString};
use std::os::raw::c_char;
use std::os::raw::{c_char, c_int, c_uint};
use std::os::unix::io::RawFd;
use std::ptr;
#[derive(Debug, Serialize, Deserialize)]
pub struct Device {
pub output_name: Option<Vec<u8>>,
pub input_name: Option<Vec<u8>>
pub input_name: Option<Vec<u8>>,
}
impl<'a> From<cubeb_core::Device<'a>> for Device {
fn from(info: cubeb_core::Device) -> Self {
impl<'a> From<&'a cubeb::DeviceRef> for Device {
fn from(info: &'a cubeb::DeviceRef) -> Self {
Self {
output_name: info.output_name_bytes().map(|s| s.to_vec()),
input_name: info.input_name_bytes().map(|s| s.to_vec())
input_name: info.input_name_bytes().map(|s| s.to_vec()),
}
}
}
@ -28,7 +28,7 @@ impl From<ffi::cubeb_device> for Device {
fn from(info: ffi::cubeb_device) -> Self {
Self {
output_name: dup_str(info.output_name),
input_name: dup_str(info.input_name)
input_name: dup_str(info.input_name),
}
}
}
@ -37,7 +37,7 @@ impl From<Device> for ffi::cubeb_device {
fn from(info: Device) -> Self {
Self {
output_name: opt_str(info.output_name),
input_name: opt_str(info.input_name)
input_name: opt_str(info.input_name),
}
}
}
@ -62,11 +62,12 @@ pub struct DeviceInfo {
pub min_rate: u32,
pub latency_lo: u32,
pub latency_hi: u32
pub latency_hi: u32,
}
impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo {
fn from(info: &'a ffi::cubeb_device_info) -> Self {
impl<'a> From<&'a cubeb::DeviceInfoRef> for DeviceInfo {
fn from(info: &'a cubeb::DeviceInfoRef) -> Self {
let info = unsafe { &*info.as_ptr() };
DeviceInfo {
devid: info.devid as _,
device_id: dup_str(info.device_id),
@ -86,7 +87,7 @@ impl<'a> From<&'a ffi::cubeb_device_info> for DeviceInfo {
min_rate: info.min_rate,
latency_lo: info.latency_lo,
latency_hi: info.latency_hi
latency_hi: info.latency_hi,
}
}
}
@ -112,43 +113,23 @@ impl From<DeviceInfo> for ffi::cubeb_device_info {
min_rate: info.min_rate,
latency_lo: info.latency_lo,
latency_hi: info.latency_hi
latency_hi: info.latency_hi,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub struct StreamParams {
pub format: u32,
pub rate: u16,
pub channels: u8,
pub layout: i32,
pub prefs: i32
pub format: ffi::cubeb_sample_format,
pub rate: c_uint,
pub channels: c_uint,
pub layout: ffi::cubeb_channel_layout,
pub prefs: ffi::cubeb_stream_prefs,
}
impl<'a> From<&'a ffi::cubeb_stream_params> for StreamParams {
fn from(params: &'a ffi::cubeb_stream_params) -> Self {
assert!(params.channels <= u32::from(u8::max_value()));
StreamParams {
format: params.format,
rate: params.rate as u16,
channels: params.channels as u8,
layout: params.layout,
prefs: params.prefs
}
}
}
impl<'a> From<&'a StreamParams> for ffi::cubeb_stream_params {
fn from(params: &StreamParams) -> Self {
ffi::cubeb_stream_params {
format: params.format,
rate: u32::from(params.rate),
channels: u32::from(params.channels),
layout: params.layout,
prefs: params.prefs
}
impl<'a> From<&'a cubeb::StreamParamsRef> for StreamParams {
fn from(x: &cubeb::StreamParamsRef) -> StreamParams {
unsafe { *(x.as_ptr() as *mut StreamParams) }
}
}
@ -159,7 +140,7 @@ pub struct StreamInitParams {
pub input_stream_params: Option<StreamParams>,
pub output_device: usize,
pub output_stream_params: Option<StreamParams>,
pub latency_frames: u32
pub latency_frames: u32,
}
fn dup_str(s: *const c_char) -> Option<Vec<u8>> {
@ -171,23 +152,23 @@ fn dup_str(s: *const c_char) -> Option<Vec<u8>> {
}
}
fn opt_str(v: Option<Vec<u8>>) -> *const c_char {
fn opt_str(v: Option<Vec<u8>>) -> *mut c_char {
match v {
Some(v) => match CString::new(v) {
Ok(s) => s.into_raw(),
Err(_) => {
debug!("Failed to convert bytes to CString");
ptr::null()
ptr::null_mut()
}
},
None => ptr::null()
None => ptr::null_mut(),
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamCreate {
pub token: usize,
pub fds: [RawFd; 3]
pub fds: [RawFd; 3],
}
// Client -> Server messages.
@ -215,7 +196,7 @@ pub enum ServerMessage {
StreamGetLatency(usize),
StreamSetVolume(usize, f32),
StreamSetPanning(usize, f32),
StreamGetCurrentDevice(usize)
StreamGetCurrentDevice(usize),
}
// Server -> Client messages.
@ -244,19 +225,19 @@ pub enum ClientMessage {
StreamPanningSet,
StreamCurrentDevice(Device),
Error(ffi::cubeb_error_code)
Error(c_int),
}
#[derive(Debug, Deserialize, Serialize)]
pub enum CallbackReq {
Data(isize, usize),
State(ffi::cubeb_state)
State(ffi::cubeb_state),
}
#[derive(Debug, Deserialize, Serialize)]
pub enum CallbackResp {
Data(isize),
State
State,
}
pub trait AssocRawFd {
@ -265,7 +246,7 @@ pub trait AssocRawFd {
}
fn take_fd<F>(&mut self, _: F)
where
F: FnOnce() -> Option<[RawFd; 3]>
F: FnOnce() -> Option<[RawFd; 3]>,
{
}
}
@ -275,16 +256,48 @@ impl AssocRawFd for ClientMessage {
fn fd(&self) -> Option<[RawFd; 3]> {
match *self {
ClientMessage::StreamCreated(ref data) => Some(data.fds),
_ => None
_ => None,
}
}
fn take_fd<F>(&mut self, f: F)
where
F: FnOnce() -> Option<[RawFd; 3]>
F: FnOnce() -> Option<[RawFd; 3]>,
{
if let ClientMessage::StreamCreated(ref mut data) = *self {
data.fds = f().unwrap();
}
}
}
#[cfg(test)]
mod test {
use super::StreamParams;
use cubeb::ffi;
use std::mem;
#[test]
fn stream_params_size_check() {
assert_eq!(
mem::size_of::<StreamParams>(),
mem::size_of::<ffi::cubeb_stream_params>()
)
}
#[test]
fn stream_params_from() {
let mut raw = ffi::cubeb_stream_params::default();
raw.format = ffi::CUBEB_SAMPLE_FLOAT32BE;
raw.rate = 96_000;
raw.channels = 32;
raw.layout = ffi::CUBEB_LAYOUT_3F1_LFE;
raw.prefs = ffi::CUBEB_STREAM_PREF_LOOPBACK;
let wrapped = ::cubeb::StreamParams::from(raw);
let params = StreamParams::from(wrapped.as_ref());
assert_eq!(params.format, raw.format);
assert_eq!(params.rate, raw.rate);
assert_eq!(params.channels, raw.channels);
assert_eq!(params.layout, raw.layout);
assert_eq!(params.prefs, raw.prefs);
}
}

Просмотреть файл

@ -16,8 +16,8 @@ fn cvt(r: libc::ssize_t) -> io::Result<usize> {
fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
loop {
match cvt(f()) {
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {},
other => return other
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
other => return other,
}
}
}
@ -26,7 +26,7 @@ pub fn recv_msg_with_flags(
socket: RawFd,
bufs: &mut [&mut IoVec],
cmsg: &mut [u8],
flags: libc::c_int
flags: libc::c_int,
) -> io::Result<(usize, usize, libc::c_int)> {
let slice = iovec::as_os_slice_mut(bufs);
let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
@ -56,7 +56,7 @@ pub fn send_msg_with_flags(
socket: RawFd,
bufs: &[&IoVec],
cmsg: &[u8],
flags: libc::c_int
flags: libc::c_int,
) -> io::Result<usize> {
let slice = iovec::as_os_slice(bufs);
let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
@ -74,7 +74,5 @@ pub fn send_msg_with_flags(
msghdr.msg_control = control;
msghdr.msg_controllen = controllen as _;
cvt_r(|| unsafe {
libc::sendmsg(socket, &msghdr as *const _, flags)
})
cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) })
}

Просмотреть файл

@ -53,10 +53,10 @@ pub use self::proxy::{ClientProxy, Response};
pub fn bind_client<C>(
transport: C::Transport,
handle: &Handle
handle: &Handle,
) -> proxy::ClientProxy<C::Request, C::Response>
where
C: Client
C: Client,
{
let (tx, rx) = proxy::channel();
@ -64,7 +64,7 @@ where
let handler = ClientHandler::<C> {
transport: transport,
requests: rx,
in_flight: VecDeque::with_capacity(32)
in_flight: VecDeque::with_capacity(32),
};
Driver::new(handler)
};
@ -92,16 +92,16 @@ pub trait Client: 'static {
struct ClientHandler<C>
where
C: Client
C: Client,
{
transport: C::Transport,
requests: proxy::Receiver<C::Request, C::Response>,
in_flight: VecDeque<oneshot::Sender<C::Response>>
in_flight: VecDeque<oneshot::Sender<C::Response>>,
}
impl<C> Handler for ClientHandler<C>
where
C: Client
C: Client,
{
type In = C::Response;
type Out = C::Request;
@ -118,7 +118,7 @@ where
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"request / response mismatch"
"request / response mismatch",
));
}
@ -138,16 +138,16 @@ where
self.in_flight.push_back(complete);
Ok(Some(request).into())
},
}
Ok(Async::Ready(None)) => {
trace!(" --> client dropped");
Ok(None.into())
},
}
Ok(Async::NotReady) => {
trace!(" --> not ready");
Ok(Async::NotReady)
},
Err(_) => unreachable!()
}
Err(_) => unreachable!(),
}
}

Просмотреть файл

@ -57,17 +57,17 @@ pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
/// Response future returned from a client
pub struct Response<Q> {
inner: oneshot::Receiver<Q>
inner: oneshot::Receiver<Q>,
}
pub struct ClientProxy<R, Q> {
tx: mpsc::UnboundedSender<Request<R, Q>>
tx: mpsc::UnboundedSender<Request<R, Q>>,
}
impl<R, Q> Clone for ClientProxy<R, Q> {
fn clone(&self) -> Self {
ClientProxy {
tx: self.tx.clone()
tx: self.tx.clone(),
}
}
}
@ -78,9 +78,7 @@ pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
// Wrap the `tx` part in ClientProxy so the rpc call interface
// can be implemented.
let client = ClientProxy {
tx
};
let client = ClientProxy { tx };
(client, rx)
}
@ -97,16 +95,14 @@ impl<R, Q> ClientProxy<R, Q> {
// into a BrokenPipe, which conveys the proper error.
let _ = self.tx.send((request, tx));
Response {
inner: rx
}
Response { inner: rx }
}
}
impl<R, Q> fmt::Debug for ClientProxy<R, Q>
where
R: fmt::Debug,
Q: fmt::Debug
Q: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ClientProxy {{ ... }}")
@ -132,7 +128,7 @@ impl<Q> Future for Response<Q> {
impl<Q> fmt::Debug for Response<Q>
where
Q: fmt::Debug
Q: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Response {{ ... }}")

Просмотреть файл

@ -10,7 +10,7 @@ use std::io;
pub struct Driver<T>
where
T: Handler
T: Handler,
{
// Glue
handler: T,
@ -19,19 +19,19 @@ where
run: bool,
// True when the transport is fully flushed
is_flushed: bool
is_flushed: bool,
}
impl<T> Driver<T>
where
T: Handler
T: Handler,
{
/// Create a new rpc driver with the given service and transport.
pub fn new(handler: T) -> Driver<T> {
Driver {
handler: handler,
run: true,
is_flushed: true
is_flushed: true,
}
}
@ -65,7 +65,7 @@ where
// TODO: Should handler be infalliable?
panic!("unimplemented error handling: {:?}", e);
}
},
}
None => {
trace!("received None");
// At this point, we just return. This works
@ -86,14 +86,14 @@ where
Async::Ready(Some(message)) => {
trace!(" --> got message");
try!(self.process_outgoing(message));
},
}
Async::Ready(None) => {
trace!(" --> got None");
// The service is done with the connection.
break;
},
}
// Nothing to dispatch
Async::NotReady => break
Async::NotReady => break,
}
}
@ -121,7 +121,7 @@ where
impl<T> Future for Driver<T>
where
T: Handler
T: Handler,
{
type Item = ();
type Error = io::Error;
@ -153,13 +153,13 @@ fn assert_send<S: Sink>(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError
AsyncSink::NotReady(_) => panic!(
"sink reported itself as ready after `poll_ready` but was \
then unable to accept a message"
)
),
}
}
impl<T> fmt::Debug for Driver<T>
where
T: Handler + fmt::Debug
T: Handler + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("rpc::Handler")

Просмотреть файл

@ -49,13 +49,13 @@ use tokio_core::reactor::Handle;
/// Bind an async I/O object `io` to the `server`.
pub fn bind_server<S>(transport: S::Transport, server: S, handle: &Handle)
where
S: Server
S: Server,
{
let fut = {
let handler = ServerHandler {
server: server,
transport: transport,
in_flight: VecDeque::with_capacity(32)
in_flight: VecDeque::with_capacity(32),
};
Driver::new(handler)
};
@ -84,24 +84,23 @@ pub trait Server: 'static {
fn process(&mut self, req: Self::Request) -> Self::Future;
}
////////////////////////////////////////////////////////////////////////////////
struct ServerHandler<S>
where
S: Server
S: Server,
{
// The service handling the connection
server: S,
// The transport responsible for sending/receving messages over the wire
transport: S::Transport,
// FIFO of "in flight" responses to requests.
in_flight: VecDeque<InFlight<S::Future>>
in_flight: VecDeque<InFlight<S::Future>>,
}
impl<S> Handler for ServerHandler<S>
where
S: Server
S: Server,
{
type In = S::Request;
type Out = S::Response;
@ -133,7 +132,7 @@ where
// Is the head of the queue ready?
match self.in_flight.front() {
Some(&InFlight::Done(_)) => {},
Some(&InFlight::Done(_)) => {}
_ => {
trace!(" --> not ready");
return Ok(Async::NotReady);
@ -145,8 +144,8 @@ where
Some(InFlight::Done(res)) => {
trace!(" --> received response");
Ok(Async::Ready(Some(res)))
},
_ => panic!()
}
_ => panic!(),
}
}
@ -160,7 +159,7 @@ where
enum InFlight<F: Future<Error = ()>> {
Active(F),
Done(F::Item)
Done(F::Item),
}
impl<F: Future<Error = ()>> InFlight<F> {
@ -169,9 +168,9 @@ impl<F: Future<Error = ()>> InFlight<F> {
InFlight::Active(ref mut f) => match f.poll() {
Ok(Async::Ready(e)) => e,
Err(_) => unreachable!(),
Ok(Async::NotReady) => return
Ok(Async::NotReady) => return,
},
_ => return
_ => return,
};
*self = InFlight::Done(res);
}

Просмотреть файл

@ -5,7 +5,7 @@ use std::path::Path;
use std::sync::atomic;
pub struct SharedMemReader {
mmap: Mmap
mmap: Mmap,
}
impl SharedMemReader {
@ -19,12 +19,7 @@ impl SharedMemReader {
file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::Read)?;
assert_eq!(mmap.len(), size);
Ok((
SharedMemReader {
mmap
},
file
))
Ok((SharedMemReader { mmap }, file))
}
pub fn read(&self, buf: &mut [u8]) -> Result<()> {
@ -46,7 +41,7 @@ impl SharedMemReader {
}
pub struct SharedMemSlice {
view: MmapViewSync
view: MmapViewSync,
}
impl SharedMemSlice {
@ -54,9 +49,7 @@ impl SharedMemSlice {
let mmap = Mmap::open(file, Protection::Read)?;
assert_eq!(mmap.len(), size);
let view = mmap.into_view_sync();
Ok(SharedMemSlice {
view
})
Ok(SharedMemSlice { view })
}
pub fn get_slice(&self, size: usize) -> Result<&[u8]> {
@ -79,13 +72,13 @@ impl SharedMemSlice {
/// underlying the view is not illegally aliased.
pub unsafe fn clone_view(&self) -> Self {
SharedMemSlice {
view: self.view.clone()
view: self.view.clone(),
}
}
}
pub struct SharedMemWriter {
mmap: Mmap
mmap: Mmap,
}
impl SharedMemWriter {
@ -98,12 +91,7 @@ impl SharedMemWriter {
let _ = remove_file(path);
file.set_len(size as u64)?;
let mmap = Mmap::open(&file, Protection::ReadWrite)?;
Ok((
SharedMemWriter {
mmap
},
file
))
Ok((SharedMemWriter { mmap }, file))
}
pub fn write(&mut self, buf: &[u8]) -> Result<()> {
@ -124,7 +112,7 @@ impl SharedMemWriter {
}
pub struct SharedMemMutSlice {
view: MmapViewSync
view: MmapViewSync,
}
impl SharedMemMutSlice {
@ -132,9 +120,7 @@ impl SharedMemMutSlice {
let mmap = Mmap::open(file, Protection::ReadWrite)?;
assert_eq!(mmap.len(), size);
let view = mmap.into_view_sync();
Ok(SharedMemMutSlice {
view
})
Ok(SharedMemMutSlice { view })
}
pub fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
@ -158,7 +144,7 @@ impl SharedMemMutSlice {
/// aliased.
pub unsafe fn clone_view(&self) -> Self {
SharedMemMutSlice {
view: self.view.clone()
view: self.view.clone(),
}
}
}

Просмотреть файл

@ -1,6 +1,6 @@
[package]
name = "audioipc-client"
version = "0.2.0"
version = "0.3.0"
authors = [
"Matthew Gregan <kinetik@flim.org>",
"Dan Glastonbury <dan.glastonbury@gmail.com>"
@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server."
[dependencies]
audioipc = { path="../audioipc" }
cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
cubeb-backend = "0.4"
foreign-types = "0.3"
# rayon-core in Gecko uses futures 0.1.13
futures = { version="=0.1.13", default-features=false, features=["use_std"] }
# futures-cpupool 0.1.5 matches futures 0.1.13

Просмотреть файл

@ -9,13 +9,12 @@ use audioipc::{messages, ClientMessage, ServerMessage};
use audioipc::{core, rpc};
use audioipc::codec::LengthDelimitedCodec;
use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
use cubeb_backend::{Context, Ops};
use cubeb_core::{ffi, DeviceId, DeviceType, Error, Result, StreamParams};
use cubeb_core::binding::Binding;
use cubeb_backend::{ffi, ChannelLayout, Context, ContextOps, DeviceCollectionRef, DeviceId,
DeviceType, Error, Ops, Result, Stream, StreamParams, StreamParamsRef};
use futures::Future;
use futures_cpupool::{self, CpuPool};
use libc;
use std::{fmt, io, mem};
use std::{fmt, io, mem, ptr};
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::os::unix::io::FromRawFd;
@ -30,8 +29,7 @@ struct CubebClient;
impl rpc::Client for CubebClient {
type Request = ServerMessage;
type Response = ClientMessage;
type Transport =
FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
macro_rules! t(
@ -48,7 +46,7 @@ pub struct ClientContext {
_ops: *const Ops,
rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
core: core::CoreThread,
cpu_pool: CpuPool
cpu_pool: CpuPool,
}
impl ClientContext {
@ -79,12 +77,12 @@ fn open_server_stream() -> Result<net::UnixStream> {
}
}
impl Context for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<*mut ffi::cubeb> {
impl ContextOps for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<Context> {
fn bind_and_send_client(
stream: UnixStream,
handle: &Handle,
tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>
tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
) -> Option<()> {
let transport = framed_with_fds(stream, Default::default());
let rpc = rpc::bind_client::<CubebClient>(transport, handle);
@ -101,13 +99,16 @@ impl Context for ClientContext {
let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
let handle = core::handle();
open_server_stream().ok()
open_server_stream()
.ok()
.and_then(|stream| UnixStream::from_stream(stream, &handle).ok())
.and_then(|stream| bind_and_send_client(stream, &handle, &tx_rpc))
.ok_or_else(|| io::Error::new(
io::ErrorKind::Other,
"Failed to open stream and create rpc."
))
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"Failed to open stream and create rpc.",
)
})
}));
let rpc = t!(rx_rpc.recv());
@ -120,66 +121,72 @@ impl Context for ClientContext {
_ops: &CLIENT_OPS as *const _,
rpc: rpc,
core: core,
cpu_pool: cpupool
cpu_pool: cpupool,
});
Ok(Box::into_raw(ctx) as *mut _)
Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
}
fn backend_id(&self) -> &'static CStr {
fn backend_id(&mut self) -> &'static CStr {
assert_not_in_callback();
unsafe { CStr::from_ptr(b"remote\0".as_ptr() as *const _) }
}
fn max_channel_count(&self) -> Result<u32> {
fn max_channel_count(&mut self) -> Result<u32> {
assert_not_in_callback();
send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
}
fn min_latency(&self, params: &StreamParams) -> Result<u32> {
fn min_latency(&mut self, params: StreamParams) -> Result<u32> {
assert_not_in_callback();
let params = messages::StreamParams::from(unsafe { &*params.raw() });
let params = messages::StreamParams::from(params.as_ref());
send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
}
fn preferred_sample_rate(&self) -> Result<u32> {
fn preferred_sample_rate(&mut self) -> Result<u32> {
assert_not_in_callback();
send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
}
fn preferred_channel_layout(&self) -> Result<ffi::cubeb_channel_layout> {
fn preferred_channel_layout(&mut self) -> Result<ChannelLayout> {
assert_not_in_callback();
send_recv!(self.rpc(),
ContextGetPreferredChannelLayout => ContextPreferredChannelLayout())
.map(|l| {
ChannelLayout::from(l)
})
}
fn enumerate_devices(&self, devtype: DeviceType) -> Result<ffi::cubeb_device_collection> {
fn enumerate_devices(
&mut self,
devtype: DeviceType,
collection: &DeviceCollectionRef,
) -> Result<()> {
assert_not_in_callback();
let v: Vec<ffi::cubeb_device_info> = match send_recv!(self.rpc(),
ContextGetDeviceEnumeration(devtype.bits()) =>
ContextEnumeratedDevices())
{
Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
Err(e) => return Err(e)
};
let vs = v.into_boxed_slice();
let coll = ffi::cubeb_device_collection {
count: vs.len(),
device: vs.as_ptr()
Err(e) => return Err(e),
};
let mut vs = v.into_boxed_slice();
let coll = unsafe { &mut *collection.as_ptr() };
coll.device = vs.as_mut_ptr();
coll.count = vs.len();
// Giving away the memory owned by vs. Don't free it!
// Reclaimed in `device_collection_destroy`.
mem::forget(vs);
Ok(coll)
Ok(())
}
fn device_collection_destroy(&self, collection: *mut ffi::cubeb_device_collection) {
fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
assert_not_in_callback();
unsafe {
let coll = &*collection;
let coll = &mut *collection.as_ptr();
let mut devices = Vec::from_raw_parts(
coll.device as *mut ffi::cubeb_device_info,
coll.count,
coll.count
coll.count,
);
for dev in &mut devices {
if !dev.device_id.is_null() {
@ -195,36 +202,37 @@ impl Context for ClientContext {
let _ = CString::from_raw(dev.friendly_name as *mut _);
}
}
coll.device = ptr::null_mut();
coll.count = 0;
Ok(())
}
}
fn stream_init(
&self,
&mut self,
stream_name: Option<&CStr>,
input_device: DeviceId,
input_stream_params: Option<&ffi::cubeb_stream_params>,
input_stream_params: Option<&StreamParamsRef>,
output_device: DeviceId,
output_stream_params: Option<&ffi::cubeb_stream_params>,
output_stream_params: Option<&StreamParamsRef>,
latency_frame: u32,
// These params aren't sent to the server
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void
) -> Result<*mut ffi::cubeb_stream> {
user_ptr: *mut c_void,
) -> Result<Stream> {
assert_not_in_callback();
fn opt_stream_params(
p: Option<&ffi::cubeb_stream_params>
) -> Option<messages::StreamParams> {
fn opt_stream_params(p: Option<&StreamParamsRef>) -> Option<messages::StreamParams> {
match p {
Some(raw) => Some(messages::StreamParams::from(raw)),
None => None
Some(p) => Some(messages::StreamParams::from(p)),
None => None,
}
}
let stream_name = match stream_name {
Some(s) => Some(s.to_bytes().to_vec()),
None => None
None => None,
};
let input_stream_params = opt_stream_params(input_stream_params);
@ -232,20 +240,20 @@ impl Context for ClientContext {
let init_params = messages::StreamInitParams {
stream_name: stream_name,
input_device: input_device.raw() as _,
input_device: input_device as usize,
input_stream_params: input_stream_params,
output_device: output_device.raw() as _,
output_device: output_device as usize,
output_stream_params: output_stream_params,
latency_frames: latency_frame
latency_frames: latency_frame,
};
stream::init(self, init_params, data_callback, state_callback, user_ptr)
}
fn register_device_collection_changed(
&self,
&mut self,
_dev_type: DeviceType,
_collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
_user_ptr: *mut c_void
_user_ptr: *mut c_void,
) -> Result<()> {
assert_not_in_callback();
Ok(())
@ -254,7 +262,7 @@ impl Context for ClientContext {
impl Drop for ClientContext {
fn drop(&mut self) {
info!("ClientContext drop...");
debug!("ClientContext drop...");
let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
unsafe {
if super::G_SERVER_FD.is_some() {

Просмотреть файл

@ -6,7 +6,7 @@
extern crate audioipc;
#[macro_use]
extern crate cubeb_backend;
extern crate cubeb_core;
extern crate foreign_types;
extern crate futures;
extern crate futures_cpupool;
extern crate libc;
@ -21,8 +21,7 @@ mod context;
mod stream;
use context::ClientContext;
use cubeb_backend::capi;
use cubeb_core::ffi;
use cubeb_backend::{capi, ffi};
use std::os::raw::{c_char, c_int};
use std::os::unix::io::RawFd;
use stream::ClientStream;
@ -49,7 +48,7 @@ static mut G_SERVER_FD: Option<RawFd> = None;
pub unsafe extern "C" fn audioipc_client_init(
c: *mut *mut ffi::cubeb,
context_name: *const c_char,
server_connection: c_int
server_connection: c_int,
) -> c_int {
// TODO: Windows portability (for fd).
// TODO: Better way to pass extra parameters to Context impl.

Просмотреть файл

@ -1,14 +1,14 @@
use cubeb_core::Error;
use cubeb_core::ffi;
use cubeb_backend::Error;
use std::os::raw::c_int;
#[doc(hidden)]
pub fn _err<E>(e: E) -> Error
where
E: Into<Option<ffi::cubeb_error_code>>
E: Into<Option<c_int>>,
{
match e.into() {
Some(e) => unsafe { Error::from_raw(e) },
None => Error::new()
None => Error::error(),
}
}

Просмотреть файл

@ -10,8 +10,7 @@ use audioipc::frame::{framed, Framed};
use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
use audioipc::rpc;
use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
use cubeb_backend::Stream;
use cubeb_core::{ffi, Result};
use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
use futures::Future;
use futures_cpupool::{CpuFuture, CpuPool};
use std::ffi::CString;
@ -26,10 +25,27 @@ use tokio_uds::UnixStream;
// TODO: Remove and let caller allocate based on cubeb backend requirements.
const SHM_AREA_SIZE: usize = 2 * 1024 * 1024;
pub struct Device(ffi::cubeb_device);
impl Drop for Device {
fn drop(&mut self) {
unsafe {
if !self.0.input_name.is_null() {
let _ = CString::from_raw(self.0.input_name as *mut _);
}
if !self.0.output_name.is_null() {
let _ = CString::from_raw(self.0.output_name as *mut _);
}
}
}
}
#[derive(Debug)]
pub struct ClientStream<'ctx> {
// This must be a reference to Context for cubeb, cubeb accesses stream methods via stream->context->ops
// This must be a reference to Context for cubeb, cubeb accesses
// stream methods via stream->context->ops
context: &'ctx ClientContext,
token: usize
token: usize,
}
struct CallbackServer {
@ -38,7 +54,7 @@ struct CallbackServer {
data_cb: ffi::cubeb_data_callback,
state_cb: ffi::cubeb_state_callback,
user_ptr: usize,
cpu_pool: CpuPool
cpu_pool: CpuPool,
}
impl rpc::Server for CallbackServer {
@ -50,17 +66,16 @@ impl rpc::Server for CallbackServer {
fn process(&mut self, req: Self::Request) -> Self::Future {
match req {
CallbackReq::Data(nframes, frame_size) => {
info!(
debug!(
"stream_thread: Data Callback: nframes={} frame_size={}",
nframes,
frame_size
nframes, frame_size
);
// Clone values that need to be moved into the cpu pool thread.
let input_shm = unsafe { self.input_shm.clone_view() };
let mut output_shm = unsafe { self.output_shm.clone_view() };
let user_ptr = self.user_ptr;
let cb = self.data_cb;
let cb = self.data_cb.unwrap();
self.cpu_pool.spawn_fn(move || {
// TODO: This is proof-of-concept. Make it better.
@ -74,25 +89,29 @@ impl rpc::Server for CallbackServer {
.as_mut_ptr();
set_in_callback(true);
let nframes = cb(
ptr::null_mut(),
user_ptr as *mut c_void,
input_ptr as *const _,
output_ptr as *mut _,
nframes as _
);
let nframes = unsafe {
cb(
ptr::null_mut(),
user_ptr as *mut c_void,
input_ptr as *const _,
output_ptr as *mut _,
nframes as _,
)
};
set_in_callback(false);
Ok(CallbackResp::Data(nframes as isize))
})
},
}
CallbackReq::State(state) => {
info!("stream_thread: State Callback: {:?}", state);
debug!("stream_thread: State Callback: {:?}", state);
let user_ptr = self.user_ptr;
let cb = self.state_cb;
let cb = self.state_cb.unwrap();
self.cpu_pool.spawn_fn(move || {
set_in_callback(true);
cb(ptr::null_mut(), user_ptr as *mut _, state);
unsafe {
cb(ptr::null_mut(), user_ptr as *mut _, state);
}
set_in_callback(false);
Ok(CallbackResp::State)
@ -108,8 +127,8 @@ impl<'ctx> ClientStream<'ctx> {
init_params: messages::StreamInitParams,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void
) -> Result<*mut ffi::cubeb_stream> {
user_ptr: *mut c_void,
) -> Result<Stream> {
assert_not_in_callback();
let rpc = ctx.rpc();
@ -138,7 +157,7 @@ impl<'ctx> ClientStream<'ctx> {
data_cb: data_callback,
state_cb: state_callback,
user_ptr: user_data,
cpu_pool: cpu_pool
cpu_pool: cpu_pool,
};
let (wait_tx, wait_rx) = mpsc::channel();
@ -151,10 +170,11 @@ impl<'ctx> ClientStream<'ctx> {
});
wait_rx.recv().unwrap();
Ok(Box::into_raw(Box::new(ClientStream {
let stream = Box::into_raw(Box::new(ClientStream {
context: ctx,
token: data.token
})) as _)
token: data.token,
}));
Ok(unsafe { Stream::from_ptr(stream as *mut _) })
}
}
@ -166,79 +186,75 @@ impl<'ctx> Drop for ClientStream<'ctx> {
}
}
impl<'ctx> Stream for ClientStream<'ctx> {
fn start(&self) -> Result<()> {
impl<'ctx> StreamOps for ClientStream<'ctx> {
fn start(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamStart(self.token) => StreamStarted)
}
fn stop(&self) -> Result<()> {
fn stop(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamStop(self.token) => StreamStopped)
}
fn reset_default_device(&self) -> Result<()> {
fn reset_default_device(&mut self) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
}
fn position(&self) -> Result<u64> {
fn position(&mut self) -> Result<u64> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
}
fn latency(&self) -> Result<u32> {
fn latency(&mut self) -> Result<u32> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
}
fn set_volume(&self, volume: f32) -> Result<()> {
fn set_volume(&mut self, volume: f32) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
}
fn set_panning(&self, panning: f32) -> Result<()> {
fn set_panning(&mut self, panning: f32) -> Result<()> {
assert_not_in_callback();
let rpc = self.context.rpc();
send_recv!(rpc, StreamSetPanning(self.token, panning) => StreamPanningSet)
}
fn current_device(&self) -> Result<*const ffi::cubeb_device> {
fn current_device(&mut self) -> Result<&DeviceRef> {
assert_not_in_callback();
let rpc = self.context.rpc();
match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
Ok(d) => Ok(Box::into_raw(Box::new(d.into()))),
Err(e) => Err(e)
Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }),
Err(e) => Err(e),
}
}
fn device_destroy(&self, device: *const ffi::cubeb_device) -> Result<()> {
fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
assert_not_in_callback();
// It's all unsafe...
if !device.is_null() {
if device.as_ptr().is_null() {
Err(Error::error())
} else {
unsafe {
if !(*device).output_name.is_null() {
let _ = CString::from_raw((*device).output_name as *mut _);
}
if !(*device).input_name.is_null() {
let _ = CString::from_raw((*device).input_name as *mut _);
}
let _: Box<ffi::cubeb_device> = Box::from_raw(device as *mut _);
let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _);
}
Ok(())
}
Ok(())
}
// TODO: How do we call this back? On what thread?
fn register_device_changed_callback(
&self,
_device_changed_callback: ffi::cubeb_device_changed_callback
&mut self,
_device_changed_callback: ffi::cubeb_device_changed_callback,
) -> Result<()> {
assert_not_in_callback();
Ok(())
@ -250,7 +266,7 @@ pub fn init(
init_params: messages::StreamInitParams,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void
) -> Result<*mut ffi::cubeb_stream> {
user_ptr: *mut c_void,
) -> Result<Stream> {
ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)
}

Просмотреть файл

@ -12,49 +12,4 @@ index ede6064..d0a1979 100644
[workspace]
-members = ["audioipc", "client", "server", "ipctest"]
+members = ["audioipc", "client", "server"]
diff --git a/audioipc/Cargo.toml b/audioipc/Cargo.toml
index 669c6ff..308cb5c 100644
--- a/media/audioipc/audioipc/Cargo.toml
+++ b/media/audioipc/audioipc/Cargo.toml
@@ -8,7 +8,7 @@ authors = [
description = "Remote Cubeb IPC"
[dependencies]
-cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
bincode = "0.8"
bytes = "0.4"
# rayon-core in Gecko uses futures 0.1.13
diff --git a/client/Cargo.toml b/client/Cargo.toml
index c81b19a..9e3f8a5 100644
--- a/media/audioipc/client/Cargo.toml
+++ b/media/audioipc/client/Cargo.toml
@@ -9,8 +9,8 @@ description = "Cubeb Backend for talking to remote cubeb server."
[dependencies]
audioipc = { path="../audioipc" }
-cubeb-backend = { git="https://github.com/djg/cubeb-rs", version="^0.2" }
-cubeb-core = { git="https://github.com/djg/cubeb-rs", version="^0.1" }
+cubeb-backend = { path = "../../cubeb-rs/cubeb-backend" }
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
# rayon-core in Gecko uses futures 0.1.13
futures = { version="=0.1.13", default-features=false, features=["use_std"] }
# futures-cpupool 0.1.5 matches futures 0.1.13
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 5b79b83..01463be 100644
--- a/media/audioipc/server/Cargo.toml
+++ b/media/audioipc/server/Cargo.toml
@@ -9,8 +9,8 @@ description = "Remote cubeb server"
[dependencies]
audioipc = { path = "../audioipc" }
-cubeb-core = { git = "https://github.com/djg/cubeb-rs", version="^0.1" }
-cubeb = { git = "https://github.com/djg/cubeb-rs", version="^0.3.2" }
+cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
+cubeb = { path = "../../cubeb-rs/cubeb-api" }
bytes = "0.4"
lazycell = "^0.4"
libc = "0.2"
--
2.10.2

Просмотреть файл

@ -9,8 +9,7 @@ description = "Remote cubeb server"
[dependencies]
audioipc = { path = "../audioipc" }
cubeb-core = { path = "../../cubeb-rs/cubeb-core" }
cubeb = { path = "../../cubeb-rs/cubeb-api" }
cubeb = "0.4"
bytes = "0.4"
lazycell = "^0.4"
libc = "0.2"

Просмотреть файл

@ -7,7 +7,6 @@ extern crate log;
extern crate audioipc;
extern crate bytes;
extern crate cubeb;
extern crate cubeb_core;
extern crate futures;
extern crate lazycell;
extern crate libc;
@ -19,12 +18,10 @@ use audioipc::codec::LengthDelimitedCodec;
use audioipc::core;
use audioipc::fd_passing::{framed_with_fds, FramedWithFds};
use audioipc::frame::{framed, Framed};
use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, DeviceInfo, ServerMessage,
StreamCreate, StreamInitParams, StreamParams};
use audioipc::messages::{CallbackReq, CallbackResp, ClientMessage, Device, DeviceInfo,
ServerMessage, StreamCreate, StreamInitParams, StreamParams};
use audioipc::rpc;
use audioipc::shm::{SharedMemReader, SharedMemWriter};
use cubeb_core::binding::Binding;
use cubeb_core::ffi;
use futures::Future;
use futures::future::{self, FutureResult};
use futures::sync::oneshot;
@ -44,7 +41,7 @@ pub mod errors {
AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
}
foreign_links {
Cubeb(::cubeb_core::Error);
Cubeb(::cubeb::Error);
Io(::std::io::Error);
Canceled(::futures::sync::oneshot::Canceled);
}
@ -53,16 +50,17 @@ pub mod errors {
use errors::*;
thread_local!(static CONTEXT_KEY: RefCell<Option<cubeb::Result<cubeb::Context>>> = RefCell::new(None));
type ContextKey = RefCell<Option<cubeb::Result<cubeb::Context>>>;
thread_local!(static CONTEXT_KEY:ContextKey = RefCell::new(None));
fn with_local_context<T, F>(f: F) -> T
where
F: FnOnce(&cubeb::Result<cubeb::Context>) -> T
F: FnOnce(&cubeb::Result<cubeb::Context>) -> T,
{
CONTEXT_KEY.with(|k| {
let mut context = k.borrow_mut();
if context.is_none() {
*context = Some(cubeb::Context::init("AudioIPC Server", None));
*context = Some(cubeb::init("AudioIPC Server"));
}
f(context.as_ref().unwrap())
})
@ -82,97 +80,23 @@ impl rpc::Client for CallbackClient {
type Transport = Framed<UnixStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}
// TODO: this should forward to the client.
struct Callback {
/// Size of input frame in bytes
input_frame_size: u16,
/// Size of output frame in bytes
output_frame_size: u16,
input_shm: SharedMemWriter,
output_shm: SharedMemReader,
rpc: rpc::ClientProxy<CallbackReq, CallbackResp>
}
impl cubeb::StreamCallback for Callback {
type Frame = u8;
fn data_callback(&mut self, input: &[u8], output: &mut [u8]) -> isize {
trace!("Stream data callback: {} {}", input.len(), output.len());
// len is of input and output is frame len. Turn these into the real lengths.
let real_input = unsafe {
let size_bytes = input.len() * self.input_frame_size as usize;
slice::from_raw_parts(input.as_ptr(), size_bytes)
};
let real_output = unsafe {
let size_bytes = output.len() * self.output_frame_size as usize;
trace!("Resize output to {}", size_bytes);
slice::from_raw_parts_mut(output.as_mut_ptr(), size_bytes)
};
self.input_shm.write(real_input).unwrap();
let r = self.rpc
.call(CallbackReq::Data(
output.len() as isize,
self.output_frame_size as usize
))
.wait();
match r {
Ok(CallbackResp::Data(cb_result)) => if cb_result >= 0 {
let len = cb_result as usize * self.output_frame_size as usize;
self.output_shm.read(&mut real_output[..len]).unwrap();
cb_result
} else {
cb_result
},
_ => {
debug!("Unexpected message {:?} during data_callback", r);
-1
}
}
}
fn state_callback(&mut self, state: cubeb::State) {
info!("Stream state callback: {:?}", state);
// TODO: Share this conversion with the same code in cubeb-rs?
let state = match state {
cubeb::State::Started => ffi::CUBEB_STATE_STARTED,
cubeb::State::Stopped => ffi::CUBEB_STATE_STOPPED,
cubeb::State::Drained => ffi::CUBEB_STATE_DRAINED,
cubeb::State::Error => ffi::CUBEB_STATE_ERROR
};
let r = self.rpc.call(CallbackReq::State(state)).wait();
match r {
Ok(CallbackResp::State) => {},
_ => {
debug!("Unexpected message {:?} during callback", r);
}
};
}
}
type StreamSlab = slab::Slab<cubeb::Stream<Callback>, usize>;
type StreamSlab = slab::Slab<cubeb::Stream<u8>, usize>;
pub struct CubebServer {
cb_remote: Remote,
streams: StreamSlab
streams: StreamSlab,
}
impl rpc::Server for CubebServer {
type Request = ServerMessage;
type Response = ClientMessage;
type Future = FutureResult<Self::Response, ()>;
type Transport =
FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
type Transport = FramedWithFds<UnixStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
fn process(&mut self, req: Self::Request) -> Self::Future {
let resp = with_local_context(|context| match *context {
Err(_) => error(cubeb::Error::new()),
Ok(ref context) => self.process_msg(context, &req)
Err(_) => error(cubeb::Error::error()),
Ok(ref context) => self.process_msg(context, &req),
});
future::ok(resp)
}
@ -182,7 +106,7 @@ impl CubebServer {
pub fn new(cb_remote: Remote) -> Self {
CubebServer {
cb_remote: cb_remote,
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE)
streams: StreamSlab::with_capacity(STREAM_CONN_CHUNK_SIZE),
}
}
@ -195,7 +119,7 @@ impl CubebServer {
// TODO:
//self.connection.client_disconnect();
ClientMessage::ClientDisconnected
},
}
ServerMessage::ContextGetBackendId => ClientMessage::ContextBackendId(),
@ -219,7 +143,7 @@ impl CubebServer {
.min_latency(&params)
.map(ClientMessage::ContextMinLatency)
.unwrap_or_else(error)
},
}
ServerMessage::ContextGetPreferredSampleRate => context
.preferred_sample_rate()
@ -234,33 +158,33 @@ impl CubebServer {
ServerMessage::ContextGetDeviceEnumeration(device_type) => context
.enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
.map(|devices| {
let v: Vec<DeviceInfo> = devices.iter().map(|i| i.raw().into()).collect();
let v: Vec<DeviceInfo> = devices.iter().map(|i| i.as_ref().into()).collect();
ClientMessage::ContextEnumeratedDevices(v)
})
.unwrap_or_else(error),
ServerMessage::StreamInit(ref params) => self.process_stream_init(context, params)
.unwrap_or_else(|_| error(cubeb::Error::new())),
.unwrap_or_else(|_| error(cubeb::Error::error())),
ServerMessage::StreamDestroy(stm_tok) => {
self.streams.remove(stm_tok);
ClientMessage::StreamDestroyed
},
}
ServerMessage::StreamStart(stm_tok) => {
let _ = self.streams[stm_tok].start();
ClientMessage::StreamStarted
},
}
ServerMessage::StreamStop(stm_tok) => {
let _ = self.streams[stm_tok].stop();
ClientMessage::StreamStopped
},
}
ServerMessage::StreamResetDefaultDevice(stm_tok) => {
let _ = self.streams[stm_tok].reset_default_device();
ClientMessage::StreamDefaultDeviceReset
},
}
ServerMessage::StreamGetPosition(stm_tok) => self.streams[stm_tok]
.position()
@ -284,8 +208,8 @@ impl CubebServer {
ServerMessage::StreamGetCurrentDevice(stm_tok) => self.streams[stm_tok]
.current_device()
.map(|device| ClientMessage::StreamCurrentDevice(device.into()))
.unwrap_or_else(error)
.map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
.unwrap_or_else(error),
};
debug!("process_msg: req={:?}, resp={:?}", msg, resp);
@ -297,63 +221,37 @@ impl CubebServer {
fn process_stream_init(
&mut self,
context: &cubeb::Context,
params: &StreamInitParams
params: &StreamInitParams,
) -> Result<ClientMessage> {
fn opt_stream_params(params: Option<&StreamParams>) -> Option<cubeb::StreamParams> {
params.and_then(|p| {
let raw = ffi::cubeb_stream_params::from(p);
Some(unsafe { cubeb::StreamParams::from_raw(&raw as *const _) })
})
}
fn frame_size_in_bytes(params: Option<cubeb::StreamParams>) -> u16 {
fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
params
.map(|p| {
let sample_size = match p.format() {
let format = p.format.into();
let sample_size = match format {
cubeb::SampleFormat::S16LE
| cubeb::SampleFormat::S16BE
| cubeb::SampleFormat::S16NE => 2,
cubeb::SampleFormat::Float32LE
| cubeb::SampleFormat::Float32BE
| cubeb::SampleFormat::Float32NE => 4
| cubeb::SampleFormat::Float32NE => 4,
};
let channel_count = p.channels() as u16;
let channel_count = p.channels as u16;
sample_size * channel_count
})
.unwrap_or(0u16)
}
// TODO: Yuck!
let input_device =
unsafe { cubeb::DeviceId::from_raw(params.input_device as *const _) };
let output_device =
unsafe { cubeb::DeviceId::from_raw(params.output_device as *const _) };
let input_device = params.input_device as *const _;
let output_device = params.output_device as *const _;
let latency = params.latency_frames;
let mut builder = cubeb::StreamInitOptionsBuilder::new();
builder
.input_device(input_device)
.output_device(output_device)
.latency(latency);
if let Some(ref stream_name) = params.stream_name {
builder.stream_name(stream_name);
}
let input_stream_params = opt_stream_params(params.input_stream_params.as_ref());
if let Some(ref isp) = input_stream_params {
builder.input_stream_param(isp);
}
let output_stream_params = opt_stream_params(params.output_stream_params.as_ref());
if let Some(ref osp) = output_stream_params {
builder.output_stream_param(osp);
}
let params = builder.take();
let input_frame_size = frame_size_in_bytes(input_stream_params);
let output_frame_size = frame_size_in_bytes(output_stream_params);
let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
let (stm1, stm2) = net::UnixStream::pair()?;
info!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (input_shm, input_file) =
debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (mut input_shm, input_file) =
SharedMemWriter::new(&audioipc::get_shm_path("input"), SHM_AREA_SIZE)?;
let (output_shm, output_file) =
SharedMemReader::new(&audioipc::get_shm_path("output"), SHM_AREA_SIZE)?;
@ -377,22 +275,81 @@ impl CubebServer {
Ok(())
});
let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
let rpc_data: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
Ok(rpc) => rpc,
Err(_) => bail!("Failed to create callback rpc.")
Err(_) => bail!("Failed to create callback rpc."),
};
let rpc_state = rpc_data.clone();
context
.stream_init(
&params,
Callback {
input_frame_size: input_frame_size,
output_frame_size: output_frame_size,
input_shm: input_shm,
output_shm: output_shm,
rpc: rpc
let mut builder = cubeb::StreamBuilder::new();
if let Some(ref stream_name) = params.stream_name {
builder.name(stream_name.clone());
}
if let Some(ref isp) = params.input_stream_params {
let input_stream_params =
unsafe { cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _) };
builder.input(input_device, input_stream_params);
}
if let Some(ref osp) = params.output_stream_params {
let output_stream_params =
unsafe { cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _) };
builder.output(output_device, output_stream_params);
}
builder
.latency(latency)
.data_callback(move |input, output| {
trace!("Stream data callback: {} {}", input.len(), output.len());
// len is of input and output is frame len. Turn these into the real lengths.
let real_input = unsafe {
let nbytes = input.len() * input_frame_size as usize;
slice::from_raw_parts(input.as_ptr(), nbytes)
};
input_shm.write(real_input).unwrap();
let r = rpc_data
.call(CallbackReq::Data(
output.len() as isize,
output_frame_size as usize,
))
.wait();
match r {
Ok(CallbackResp::Data(frames)) => {
if frames >= 0 {
let nbytes = frames as usize * output_frame_size as usize;
let real_output = unsafe {
trace!("Resize output to {}", nbytes);
slice::from_raw_parts_mut(output.as_mut_ptr(), nbytes)
};
output_shm.read(&mut real_output[..nbytes]).unwrap();
}
frames
}
_ => {
debug!("Unexpected message {:?} during data_callback", r);
-1
}
}
)
})
.state_callback(move |state| {
trace!("Stream state callback: {:?}", state);
let r = rpc_state.call(CallbackReq::State(state.into())).wait();
match r {
Ok(CallbackResp::State) => {}
_ => {
debug!("Unexpected message {:?} during callback", r);
}
}
});
builder
.init(context)
.and_then(|stream| {
if !self.streams.has_available() {
trace!(
@ -407,7 +364,7 @@ impl CubebServer {
debug!("Registering stream {:?}", entry.index(),);
entry.insert(stream).index()
},
}
None => {
// TODO: Turn into error
panic!("Failed to insert stream into slab. No entries")
@ -420,7 +377,7 @@ impl CubebServer {
stm1.into_raw_fd(),
input_file.into_raw_fd(),
output_file.into_raw_fd(),
]
],
}))
})
.map_err(|e| e.into())
@ -429,7 +386,7 @@ impl CubebServer {
struct ServerWrapper {
core_thread: core::CoreThread,
callback_thread: core::CoreThread
callback_thread: core::CoreThread,
}
fn run() -> Result<ServerWrapper> {
@ -460,7 +417,7 @@ fn run() -> Result<ServerWrapper> {
Ok(ServerWrapper {
core_thread: core_thread,
callback_thread: callback_thread
callback_thread: callback_thread,
})
}
@ -468,7 +425,7 @@ fn run() -> Result<ServerWrapper> {
pub extern "C" fn audioipc_server_start() -> *mut c_void {
match run() {
Ok(server) => Box::into_raw(Box::new(server)) as *mut _,
Err(_) => ptr::null_mut() as *mut _
Err(_) => ptr::null_mut() as *mut _,
}
}