Bug 1590249 - Update audioipc to 86d49ddf. r=chunmin

Differential Revision: https://phabricator.services.mozilla.com/D60615

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Matthew Gregan 2020-01-25 21:31:32 +00:00
Родитель def1199352
Коммит 02e4693897
17 изменённых файлов: 302 добавлений и 129 удалений

Просмотреть файл

@ -1 +1 @@
# Cubeb Audio Remoting Prototype
# Cubeb Audio Remoting Prototype

Просмотреть файл

@ -5,4 +5,4 @@ Makefile.in build files for the Mozilla build system.
The audioipc-2 git repository is: https://github.com/djg/audioipc-2.git
The git commit ID used was 8af8083a9a6179bc9fe041c9e5059cea0a0e6fe0 (2019-10-22 14:11:23 +1300)
The git commit ID used was 86d49ddfca8b016a4b60b0b3fef76052194b8aa3 (2020-01-25 20:43:03 +1300)

Просмотреть файл

@ -19,7 +19,7 @@ serde = "1.*.*"
serde_derive = "1.*.*"
tokio = "0.1"
tokio-io = "0.1"
audio_thread_priority = "0.20.2"
audio_thread_priority = "0.21"
[target.'cfg(unix)'.dependencies]
iovec = "0.1"
@ -29,9 +29,9 @@ mio-uds = "0.6.7"
tokio-reactor = "0.1"
[target.'cfg(windows)'.dependencies]
mio = "0.6.19"
miow = "0.3.3"
mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" }
tokio-named-pipes = { git = "https://github.com/NikVolf/tokio-named-pipes", branch = "stable" }
winapi = { version = "0.3.6", features = ["combaseapi", "objbase"] }
[dependencies.error-chain]

Просмотреть файл

@ -5,6 +5,7 @@
use bytes::{BufMut, Bytes, BytesMut};
use libc::{self, cmsghdr};
use std::convert::TryInto;
use std::os::unix::io::RawFd;
use std::{convert, mem, ops, slice};
@ -40,12 +41,10 @@ pub fn iterator(c: Bytes) -> ControlMsgIter {
impl Iterator for ControlMsgIter {
type Item = Fds;
// This follows the logic in __cmsg_nxthdr from glibc
// /usr/include/bits/socket.h
fn next(&mut self) -> Option<Self::Item> {
loop {
let control = self.control.clone();
let cmsghdr_len = align(mem::size_of::<cmsghdr>());
let cmsghdr_len = len(0);
if control.len() < cmsghdr_len {
// No more entries---not enough data in `control` for a
@ -57,14 +56,14 @@ impl Iterator for ControlMsgIter {
// The offset to the next cmsghdr in control. This must be
// aligned to a boundary that matches the type used to
// represent the length of the message.
let cmsg_len = cmsg.cmsg_len;
let next_cmsghdr = align(cmsg_len as _);
self.control = if next_cmsghdr > control.len() {
let cmsg_len = cmsg.cmsg_len as usize;
let cmsg_space = space(cmsg_len - cmsghdr_len);
self.control = if cmsg_space > control.len() {
// No more entries---not enough data in `control` for a
// complete message.
Bytes::new()
} else {
control.slice_from(next_cmsghdr)
control.slice_from(cmsg_space)
};
match (cmsg.cmsg_level, cmsg.cmsg_type) {
@ -100,9 +99,9 @@ pub fn builder(buf: &mut BytesMut) -> ControlMsgBuilder {
impl ControlMsgBuilder {
fn msg(mut self, level: libc::c_int, kind: libc::c_int, msg: &[u8]) -> Self {
self.result = self.result.and_then(align_buf).and_then(|mut cmsg| {
let cmsg_len = len(msg.len());
if cmsg.remaining_mut() < cmsg_len {
self.result = self.result.and_then(|mut cmsg| {
let cmsg_space = space(msg.len());
if cmsg.remaining_mut() < cmsg_space {
return Err(Error::NoSpace);
}
@ -113,18 +112,19 @@ impl ControlMsgBuilder {
let zeroed = unsafe { mem::zeroed() };
#[allow(clippy::needless_update)]
let cmsghdr = cmsghdr {
cmsg_len: cmsg_len as _,
cmsg_len: len(msg.len()).try_into().unwrap(),
cmsg_level: level,
cmsg_type: kind,
..zeroed
};
let cmsghdr = unsafe {
slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>())
};
cmsg.put_slice(cmsghdr);
let mut cmsg = align_buf(cmsg)?;
cmsg.put_slice(msg);
unsafe {
let cmsghdr_ptr = cmsg.bytes_mut().as_mut_ptr();
std::ptr::copy_nonoverlapping(&cmsghdr as *const _ as *const _, cmsghdr_ptr, mem::size_of::<cmsghdr>());
let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _);
std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len());
cmsg.advance_mut(cmsg_space);
}
Ok(cmsg)
});
@ -141,7 +141,7 @@ impl ControlMsgBuilder {
}
}
pub trait AsBytes {
trait AsBytes {
fn as_bytes(&self) -> &[u8];
}
@ -165,28 +165,14 @@ fn aligned(buf: &BytesMut) -> BytesMut {
aligned_buf
}
fn align_buf(mut cmsg: BytesMut) -> Result<BytesMut, Error> {
let offset = unsafe { cmsg.bytes_mut().as_ptr() } as usize;
let adjust = align(offset) - offset;
if cmsg.remaining_mut() < adjust {
return Err(Error::NoSpace);
fn len(len: usize) -> usize {
unsafe {
libc::CMSG_LEN(len.try_into().unwrap()) as usize
}
for _ in 0..adjust {
cmsg.put_u8(0);
}
Ok(cmsg)
}
fn align(len: usize) -> usize {
let cmsghdr_align = mem::align_of::<cmsghdr>();
(len + cmsghdr_align - 1) & !(cmsghdr_align - 1)
}
pub fn len(len: usize) -> usize {
align(mem::size_of::<cmsghdr>()) + len
}
pub fn space(len: usize) -> usize {
align(mem::size_of::<cmsghdr>()) + align(len)
unsafe {
libc::CMSG_SPACE(len.try_into().unwrap()) as usize
}
}

Просмотреть файл

@ -41,6 +41,9 @@ pub mod shm;
#[cfg(unix)]
mod tokio_uds_stream;
#[cfg(windows)]
mod tokio_named_pipes;
pub use crate::messages::{ClientMessage, ServerMessage};
use std::env::temp_dir;
use std::path::PathBuf;
@ -53,6 +56,8 @@ use std::os::unix::io::{FromRawFd, IntoRawFd};
#[cfg(windows)]
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
use std::cell::RefCell;
// This must match the definition of
// ipc::FileDescriptor::PlatformHandleType in Gecko.
#[cfg(windows)]
@ -61,11 +66,19 @@ pub type PlatformHandleType = std::os::windows::raw::HANDLE;
pub type PlatformHandleType = libc::c_int;
// This stands in for RawFd/RawHandle.
#[derive(Copy, Clone, Debug)]
pub struct PlatformHandle(PlatformHandleType);
#[derive(Clone, Debug)]
pub struct PlatformHandle(RefCell<Inner>);
#[derive(Clone, Debug)]
struct Inner {
handle: PlatformHandleType,
owned: bool,
}
unsafe impl Send for PlatformHandle {}
pub const INVALID_HANDLE_VALUE: PlatformHandleType = -1isize as PlatformHandleType;
// Custom serialization to treat HANDLEs as i64. This is not valid in
// general, but after sending the HANDLE value to a remote process we
// use it to create a valid HANDLE via DuplicateHandle.
@ -76,7 +89,8 @@ impl serde::Serialize for PlatformHandle {
where
S: serde::Serializer,
{
serializer.serialize_i64(self.0 as i64)
let h = self.0.borrow();
serializer.serialize_i64(h.handle as i64)
}
}
@ -92,7 +106,8 @@ impl<'de> serde::de::Visitor<'de> for PlatformHandleVisitor {
where
E: serde::de::Error,
{
Ok(PlatformHandle::new(value as PlatformHandleType))
let owned = cfg!(windows);
Ok(PlatformHandle::new(value as PlatformHandleType, owned))
}
}
@ -112,49 +127,54 @@ fn valid_handle(handle: PlatformHandleType) -> bool {
#[cfg(windows)]
fn valid_handle(handle: PlatformHandleType) -> bool {
const INVALID_HANDLE_VALUE: PlatformHandleType = -1isize as PlatformHandleType;
const NULL_HANDLE_VALUE: PlatformHandleType = 0isize as PlatformHandleType;
handle != INVALID_HANDLE_VALUE && handle != NULL_HANDLE_VALUE
}
impl PlatformHandle {
pub fn new(raw: PlatformHandleType) -> PlatformHandle {
PlatformHandle(raw)
}
pub fn try_new(raw: PlatformHandleType) -> Option<PlatformHandle> {
if !valid_handle(raw) {
return None;
}
Some(PlatformHandle::new(raw))
pub fn new(raw: PlatformHandleType, owned: bool) -> PlatformHandle {
assert!(valid_handle(raw));
let inner = Inner {
handle: raw,
owned: owned,
};
PlatformHandle(RefCell::new(inner))
}
#[cfg(windows)]
pub fn from<T: IntoRawHandle>(from: T) -> PlatformHandle {
PlatformHandle::new(from.into_raw_handle())
PlatformHandle::new(from.into_raw_handle(), true)
}
#[cfg(unix)]
pub fn from<T: IntoRawFd>(from: T) -> PlatformHandle {
PlatformHandle::new(from.into_raw_fd())
PlatformHandle::new(from.into_raw_fd(), true)
}
#[cfg(windows)]
pub unsafe fn into_file(self) -> std::fs::File {
std::fs::File::from_raw_handle(self.0)
pub unsafe fn into_file(&self) -> std::fs::File {
std::fs::File::from_raw_handle(self.into_raw())
}
#[cfg(unix)]
pub unsafe fn into_file(self) -> std::fs::File {
std::fs::File::from_raw_fd(self.0)
pub unsafe fn into_file(&self) -> std::fs::File {
std::fs::File::from_raw_fd(self.into_raw())
}
pub fn as_raw(self) -> PlatformHandleType {
self.0
pub unsafe fn into_raw(&self) -> PlatformHandleType {
let mut h = self.0.borrow_mut();
assert!(h.owned);
h.owned = false;
h.handle
}
}
pub unsafe fn close(self) {
close_platformhandle(self.0);
impl Drop for PlatformHandle {
fn drop(&mut self) {
let inner = self.0.borrow();
if inner.owned {
unsafe { close_platformhandle(inner.handle) }
}
}
}

Просмотреть файл

@ -292,24 +292,26 @@ impl AssocRawPlatformHandle for ServerMessage {}
impl AssocRawPlatformHandle for ClientMessage {
fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> {
match *self {
ClientMessage::StreamCreated(ref data) => Some((
[
data.platform_handles[0].as_raw(),
data.platform_handles[1].as_raw(),
data.platform_handles[2].as_raw(),
],
data.target_pid,
)),
ClientMessage::ContextSetupDeviceCollectionCallback(ref data) => Some((
[
data.platform_handles[0].as_raw(),
data.platform_handles[1].as_raw(),
data.platform_handles[2].as_raw(),
],
data.target_pid,
)),
_ => None,
unsafe {
match *self {
ClientMessage::StreamCreated(ref data) => Some((
[
data.platform_handles[0].into_raw(),
data.platform_handles[1].into_raw(),
data.platform_handles[2].into_raw(),
],
data.target_pid,
)),
ClientMessage::ContextSetupDeviceCollectionCallback(ref data) => Some((
[
data.platform_handles[0].into_raw(),
data.platform_handles[1].into_raw(),
data.platform_handles[2].into_raw(),
],
data.target_pid,
)),
_ => None,
}
}
}
@ -317,22 +319,23 @@ impl AssocRawPlatformHandle for ClientMessage {
where
F: FnOnce() -> Option<[PlatformHandleType; 3]>,
{
let owned = cfg!(unix);
match *self {
ClientMessage::StreamCreated(ref mut data) => {
let handles =
f().expect("platform_handles must be available when processing StreamCreated");
data.platform_handles = [
PlatformHandle::new(handles[0]),
PlatformHandle::new(handles[1]),
PlatformHandle::new(handles[2]),
PlatformHandle::new(handles[0], owned),
PlatformHandle::new(handles[1], owned),
PlatformHandle::new(handles[2], owned),
]
}
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
let handles = f().expect("platform_handles must be available when processing ContextSetupDeviceCollectionCallback");
data.platform_handles = [
PlatformHandle::new(handles[0]),
PlatformHandle::new(handles[1]),
PlatformHandle::new(handles[2]),
PlatformHandle::new(handles[0], owned),
PlatformHandle::new(handles[1], owned),
PlatformHandle::new(handles[2], owned),
]
}
_ => {}

Просмотреть файл

@ -39,6 +39,12 @@ impl MessageStream {
}
}
impl IntoRawFd for MessageStream {
fn into_raw_fd(self) -> RawFd {
self.0.into_raw_fd()
}
}
impl AsyncMessageStream {
fn new(stream: tokio_uds::UnixStream) -> AsyncMessageStream {
AsyncMessageStream(stream)
@ -97,9 +103,3 @@ impl AsRawFd for AsyncMessageStream {
self.0.as_raw_fd()
}
}
impl IntoRawFd for MessageStream {
fn into_raw_fd(self) -> RawFd {
self.0.into_raw_fd()
}
}

Просмотреть файл

@ -8,7 +8,7 @@ use std::os::windows::fs::*;
use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_named_pipes;
use super::tokio_named_pipes;
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
#[derive(Debug)]
@ -23,8 +23,8 @@ impl MessageStream {
pub fn anonymous_ipc_pair(
) -> std::result::Result<(MessageStream, MessageStream), std::io::Error> {
let pipe_name = get_pipe_name();
let pipe1 = miow::pipe::NamedPipe::new(&pipe_name)?;
let pipe2 = {
let pipe_server = miow::pipe::NamedPipe::new(&pipe_name)?;
let pipe_client = {
let mut opts = std::fs::OpenOptions::new();
opts.read(true)
.write(true)
@ -32,7 +32,7 @@ impl MessageStream {
let file = opts.open(&pipe_name)?;
unsafe { miow::pipe::NamedPipe::from_raw_handle(file.into_raw_handle()) }
};
Ok((MessageStream::new(pipe1), MessageStream::new(pipe2)))
Ok((MessageStream::new(pipe_server), MessageStream::new(pipe_client)))
}
pub unsafe fn from_raw_fd(raw: super::PlatformHandleType) -> MessageStream {
@ -50,6 +50,12 @@ impl MessageStream {
}
}
impl IntoRawHandle for MessageStream {
fn into_raw_handle(self) -> RawHandle {
self.0.into_raw_handle()
}
}
impl AsyncMessageStream {
fn new(stream: tokio_named_pipes::NamedPipe) -> AsyncMessageStream {
AsyncMessageStream(stream)
@ -93,12 +99,6 @@ impl AsRawHandle for AsyncMessageStream {
}
}
impl IntoRawHandle for MessageStream {
fn into_raw_handle(self) -> RawHandle {
self.0.into_raw_handle()
}
}
static PIPE_ID: AtomicUsize = AtomicUsize::new(0);
fn get_pipe_name() -> String {

Просмотреть файл

@ -153,3 +153,10 @@ where
!self.in_flight.is_empty()
}
}
impl<C: Client> Drop for ClientHandler<C> {
fn drop(&mut self) {
let _ = self.transport.close();
self.in_flight.clear();
}
}

Просмотреть файл

@ -155,6 +155,13 @@ where
}
}
impl<S: Server> Drop for ServerHandler<S> {
fn drop(&mut self) {
let _ = self.transport.close();
self.in_flight.clear();
}
}
////////////////////////////////////////////////////////////////////////////////
enum InFlight<F: Future<Error = ()>> {

Просмотреть файл

@ -0,0 +1,153 @@
// Copied from tokio-named-pipes/src/lib.rs revision 49ec1ba8bbc94ab6fc9636af2a00dfb3204080c8 (tokio-named-pipes 0.2.0)
// This file is dual licensed under the MIT and Apache-2.0 per upstream: https://github.com/NikVolf/tokio-named-pipes/blob/stable/LICENSE-MIT and https://github.com/NikVolf/tokio-named-pipes/blob/stable/LICENSE-APACHE
// - Implement AsyncWrite::shutdown
#![cfg(windows)]
use std::ffi::OsStr;
use std::fmt;
use std::io::{Read, Write};
use std::os::windows::io::*;
use futures::{Async, Poll};
use bytes::{BufMut, Buf};
use mio::Ready;
use tokio::reactor::{Handle, PollEvented2};
use tokio::io::{AsyncRead, AsyncWrite};
pub struct NamedPipe {
io: PollEvented2<mio_named_pipes::NamedPipe>,
}
impl NamedPipe {
pub fn new<P: AsRef<OsStr>>(p: P, handle: &Handle) -> std::io::Result<NamedPipe> {
NamedPipe::_new(p.as_ref(), handle)
}
fn _new(p: &OsStr, handle: &Handle) -> std::io::Result<NamedPipe> {
let inner = mio_named_pipes::NamedPipe::new(p)?;
NamedPipe::from_pipe(inner, handle)
}
pub fn from_pipe(pipe: mio_named_pipes::NamedPipe, handle: &Handle)
-> std::io::Result<NamedPipe> {
Ok(NamedPipe {
io: PollEvented2::new_with_handle(pipe, handle)?,
})
}
pub fn connect(&self) -> std::io::Result<()> {
self.io.get_ref().connect()
}
pub fn disconnect(&self) -> std::io::Result<()> {
self.io.get_ref().disconnect()
}
pub fn poll_read_ready_readable(&mut self) -> tokio::io::Result<Async<Ready>> {
self.io.poll_read_ready(Ready::readable())
}
pub fn poll_write_ready(&mut self) -> tokio::io::Result<Async<Ready>> {
self.io.poll_write_ready()
}
fn io_mut(&mut self) -> &mut PollEvented2<mio_named_pipes::NamedPipe> {
&mut self.io
}
}
impl Read for NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.io.read(buf)
}
}
impl Write for NamedPipe {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.io.flush()
}
}
impl<'a> Read for &'a NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a NamedPipe {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
(&self.io).flush()
}
}
impl AsyncRead for NamedPipe {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std::io::Error> {
if let Async::NotReady = self.io.poll_read_ready(Ready::readable())? {
return Ok(Async::NotReady)
}
let mut stack_buf = [0u8; 1024];
let bytes_read = self.io_mut().read(&mut stack_buf);
match bytes_read {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io_mut().clear_read_ready(Ready::readable())?;
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_read) => {
buf.put_slice(&stack_buf[0..bytes_read]);
Ok(Async::Ready(bytes_read))
}
}
}
}
impl AsyncWrite for NamedPipe {
fn shutdown(&mut self) -> Poll<(), std::io::Error> {
let _ = self.disconnect();
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std::io::Error> {
if let Async::NotReady = self.io.poll_write_ready()? {
return Ok(Async::NotReady)
}
let bytes_wrt = self.io_mut().write(buf.bytes());
match bytes_wrt {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.io_mut().clear_write_ready()?;
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_wrt) => {
buf.advance(bytes_wrt);
Ok(Async::Ready(bytes_wrt))
}
}
}
}
impl fmt::Debug for NamedPipe {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> RawHandle {
self.io.get_ref().as_raw_handle()
}
}

Просмотреть файл

@ -9,7 +9,7 @@ description = "Cubeb Backend for talking to remote cubeb server."
edition = "2018"
[dependencies]
audio_thread_priority = "0.20.2"
audio_thread_priority = "0.21"
audioipc = { path="../audioipc" }
cubeb-backend = "0.6.0"
futures = { version="0.1.18", default-features=false, features=["use_std"] }

Просмотреть файл

@ -372,7 +372,7 @@ impl ContextOps for ClientContext {
ContextSetupDeviceCollectionCallback())?;
let stream =
unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].as_raw()) };
unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].into_raw()) };
// TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only
// need one here. Drop the dummy handles the other side sent us to discard.

Просмотреть файл

@ -178,19 +178,16 @@ impl<'ctx> ClientStream<'ctx> {
data.token, data.platform_handles
);
let stm = data.platform_handles[0];
let stream = unsafe { audioipc::MessageStream::from_raw_fd(stm.as_raw()) };
let stream = unsafe { audioipc::MessageStream::from_raw_fd(data.platform_handles[0].into_raw()) };
let input = data.platform_handles[1];
let input_file = unsafe { input.into_file() };
let input_file = unsafe { data.platform_handles[1].into_file() };
let input_shm = if has_input {
Some(SharedMemSlice::from(&input_file, audioipc::SHM_AREA_SIZE).unwrap())
} else {
None
};
let output = data.platform_handles[2];
let output_file = unsafe { output.into_file() };
let output_file = unsafe { data.platform_handles[2].into_file() };
let output_shm = if has_output {
Some(SharedMemMutSlice::from(&output_file, audioipc::SHM_AREA_SIZE).unwrap())
} else {

Просмотреть файл

@ -9,7 +9,7 @@ description = "Remote cubeb server"
edition = "2018"
[dependencies]
audio_thread_priority = "0.20.2"
audio_thread_priority = "0.21"
audioipc = { path = "../audioipc" }
cubeb-core = "0.6.0"
futures = "0.1.18"

Просмотреть файл

@ -124,7 +124,7 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
// is registered with the reactor core, the other side is returned
// to the caller.
MessageStream::anonymous_ipc_pair()
.and_then(|(sock1, sock2)| {
.and_then(|(ipc_server, ipc_client)| {
// Spawn closure to run on same thread as reactor::Core
// via remote handle.
wrapper
@ -133,22 +133,22 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy
.spawn(futures::future::lazy(|| {
trace!("Incoming connection");
let handle = reactor::Handle::default();
sock2.into_tokio_ipc(&handle)
ipc_server.into_tokio_ipc(&handle)
.and_then(|sock| {
let transport = framed_with_platformhandles(sock, Default::default());
rpc::bind_server(transport, server::CubebServer::new(core_handle));
Ok(())
}).map_err(|_| ())
// Notify waiting thread that sock2 has been registered.
// Notify waiting thread that server has been registered.
.and_then(|_| wait_tx.send(()))
}))
.expect("Failed to spawn CubebServer");
// Wait for notification that sock2 has been registered
// Wait for notification that server has been registered
// with reactor::Core.
let _ = wait_rx.wait();
Ok(PlatformHandle::from(sock1).as_raw())
Ok(unsafe { PlatformHandle::from(ipc_client).into_raw() })
})
.unwrap_or(-1isize as PlatformHandleType)
.unwrap_or(audioipc::INVALID_HANDLE_VALUE)
}
#[no_mangle]

Просмотреть файл

@ -465,8 +465,8 @@ impl CubebServer {
.unwrap_or_else(error),
ServerMessage::ContextSetupDeviceCollectionCallback => {
if let Ok((stm1, stm2)) = MessageStream::anonymous_ipc_pair() {
debug!("Created device collection RPC pair: {:?}-{:?}", stm1, stm2);
if let Ok((ipc_server, ipc_client)) = MessageStream::anonymous_ipc_pair() {
debug!("Created device collection RPC pair: {:?}-{:?}", ipc_server, ipc_client);
// This code is currently running on the Client/Server RPC
// handling thread. We need to move the registration of the
@ -477,7 +477,7 @@ impl CubebServer {
self.handle
.spawn(futures::future::lazy(move || {
let handle = reactor::Handle::default();
let stream = stm2.into_tokio_ipc(&handle).unwrap();
let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<DeviceCollectionClient>(transport);
drop(tx.send(rpc));
@ -496,7 +496,7 @@ impl CubebServer {
})));
let fds = RegisterDeviceCollectionChanged {
platform_handles: [
PlatformHandle::from(stm1),
PlatformHandle::from(ipc_client),
PlatformHandle::from(dummy1),
PlatformHandle::from(dummy2),
],
@ -595,8 +595,8 @@ impl CubebServer {
let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
let (stm1, stm2) = MessageStream::anonymous_ipc_pair()?;
debug!("Created callback pair: {:?}-{:?}", stm1, stm2);
let (ipc_server, ipc_client) = MessageStream::anonymous_ipc_pair()?;
debug!("Created callback pair: {:?}-{:?}", ipc_server, ipc_client);
let mut shm_path = audioipc::get_shm_path();
shm_path.set_extension("input");
let (input_shm, input_file) = SharedMemWriter::new(&shm_path, audioipc::SHM_AREA_SIZE)?;
@ -612,7 +612,7 @@ impl CubebServer {
self.handle
.spawn(futures::future::lazy(move || {
let handle = reactor::Handle::default();
let stream = stm2.into_tokio_ipc(&handle).unwrap();
let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<CallbackClient>(transport);
drop(tx.send(rpc));
@ -687,7 +687,7 @@ impl CubebServer {
Ok(ClientMessage::StreamCreated(StreamCreate {
token: key,
platform_handles: [
PlatformHandle::from(stm1),
PlatformHandle::from(ipc_client),
PlatformHandle::from(input_file),
PlatformHandle::from(output_file),
],