Bug 1781993 - Update AudioIPC macOS branch to 499b955. r=cubeb-reviewers,chunmin

Differential Revision: https://phabricator.services.mozilla.com/D152983
This commit is contained in:
Matthew Gregan 2022-07-28 21:37:45 +00:00
Родитель bda5f100eb
Коммит 1ea9773d11
23 изменённых файлов: 528 добавлений и 469 удалений

Просмотреть файл

@ -60,7 +60,7 @@ rev = "21c26326f5f45f415c49eac4ba5bc41a2f961321"
[source."https://github.com/kinetiknz/audioipc-2"]
git = "https://github.com/kinetiknz/audioipc-2"
replace-with = "vendored-sources"
rev = "85e9839059f4bf8f68130825b8fd02c39a6a51b9"
rev = "499b95580c8b276e52bd9757d735249504202e5c"
[source."https://github.com/jfkthame/mapped_hyph.git"]
git = "https://github.com/jfkthame/mapped_hyph.git"

11
Cargo.lock сгенерированный
Просмотреть файл

@ -318,20 +318,21 @@ dependencies = [
[[package]]
name = "audioipc2"
version = "0.5.0"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=85e9839059f4bf8f68130825b8fd02c39a6a51b9#85e9839059f4bf8f68130825b8fd02c39a6a51b9"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c"
dependencies = [
"arrayvec",
"ashmem",
"audio_thread_priority",
"bincode",
"bytes 0.4.12",
"byteorder",
"bytes 1.1.0",
"cc",
"cubeb",
"error-chain",
"iovec",
"libc",
"log",
"memmap2 0.2.999",
"memmap2 0.5.4",
"mio 0.8.0",
"scopeguard",
"serde",
@ -344,7 +345,7 @@ dependencies = [
[[package]]
name = "audioipc2-client"
version = "0.5.0"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=85e9839059f4bf8f68130825b8fd02c39a6a51b9#85e9839059f4bf8f68130825b8fd02c39a6a51b9"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c"
dependencies = [
"audio_thread_priority",
"audioipc2",
@ -355,7 +356,7 @@ dependencies = [
[[package]]
name = "audioipc2-server"
version = "0.5.0"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=85e9839059f4bf8f68130825b8fd02c39a6a51b9#85e9839059f4bf8f68130825b8fd02c39a6a51b9"
source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c"
dependencies = [
"audio_thread_priority",
"audioipc2",

Просмотреть файл

@ -1 +1 @@
{"files":{"Cargo.toml":"722e0313d4de90477ae34dbb95ae7eeff27622329c5689bb4ce6bcd2006c6263","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"33231853817615e01fd1a0f67f69a176d19119bbebb7dc417e96ed88dd3a8d34","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"064a657c845762be1dbcbbfc18b3f8a51582eb540def8d2ceecf200184ad4f7a","src/stream.rs":"a6c07796e6fe704cfa6baf8b904e7ffe874d3c884d44d4ed307e668dec25452b"},"package":null}
{"files":{"Cargo.toml":"31dc34fae9951183eaed3511cffe3d830d52ba3c046257454f09a06156d0716b","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"17bdf1dfc8d910b745f94d5bc74121850174c716f8a2eb6d1c4b075d42fa5df5","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"064a657c845762be1dbcbbfc18b3f8a51582eb540def8d2ceecf200184ad4f7a","src/stream.rs":"a6c07796e6fe704cfa6baf8b904e7ffe874d3c884d44d4ed307e668dec25452b"},"package":null}

Просмотреть файл

@ -15,6 +15,6 @@ cubeb-backend = "0.10"
log = "0.4"
[dependencies.audio_thread_priority]
version = "0.26.0"
default_features = false
version = "0.26.1"
default-features = false
features = ["winapi"]

Просмотреть файл

@ -82,7 +82,7 @@ fn promote_thread(rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
#[cfg(not(target_os = "linux"))]
fn promote_thread(_rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
match promote_current_thread_to_real_time(0, 48000) {
match promote_current_thread_to_real_time(256, 48000) {
Ok(_) => {
info!("Audio thread promoted to real-time.");
}

Просмотреть файл

@ -1 +1 @@
{"files":{"Cargo.toml":"b1b8509154c7230cd0dca49d0dcc1aa6cabf1a02e0abbd62d7f25ccd3e301804","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"06aff4fd1326aeabb16b01f81a6f3c59c1717ebe96285a063724830cdf30303a","src/server.rs":"e675dfbb82b515a027262dd5291186771f81e5c0eb4842200321f81800d6a8dc"},"package":null}
{"files":{"Cargo.toml":"7feb495b23148ecc83ec7f480aefe19c9804a8900cdb4ceb005c049cdce82428","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"d9cc7ca311cceb70acbc63b2190d6205094152e582faaad1b4a6061019f5803f","src/server.rs":"00740854f3e4f64cbeabfdb04d1337a2fdb89122464ea64d23fe1272045aee7d"},"package":null}

Просмотреть файл

@ -21,6 +21,6 @@ version = "0.12.0"
default-features = false
[dependencies.audio_thread_priority]
version = "0.26.0"
default_features = false
version = "0.26.1"
default-features = false
features = ["winapi"]

Просмотреть файл

@ -98,7 +98,7 @@ fn init_threads(
None,
move || {
trace!("Starting {} thread", callback_name);
if let Err(e) = promote_current_thread_to_real_time(0, 48000) {
if let Err(e) = promote_current_thread_to_real_time(256, 48000) {
debug!(
"Failed to promote {} thread to real-time: {:?}",
callback_name, e

Просмотреть файл

@ -273,15 +273,12 @@ impl ServerStreamCallbacks {
}
_ => {
debug!("Unexpected message {:?} during data_callback", r);
// TODO: Return a CUBEB_ERROR result here once
// https://github.com/kinetiknz/cubeb/issues/553 is
// fixed.
0
cubeb::ffi::CUBEB_ERROR.try_into().unwrap()
}
}
}
fn state_callback(&mut self, state: cubeb::State) {
fn state_callback(&self, state: cubeb::State) {
trace!("Stream state callback: {:?}", state);
let r = self
.state_callback_rpc
@ -295,7 +292,7 @@ impl ServerStreamCallbacks {
}
}
fn device_change_callback(&mut self) {
fn device_change_callback(&self) {
trace!("Stream device change callback");
let r = self
.device_change_callback_rpc
@ -637,7 +634,7 @@ impl CubebServer {
#[cfg(target_os = "linux")]
ServerMessage::PromoteThreadToRealTime(thread_info) => {
let info = RtPriorityThreadInfo::deserialize(thread_info);
match promote_thread_to_real_time(info, 0, 48000) {
match promote_thread_to_real_time(info, 256, 48000) {
Ok(_) => {
info!("Promotion of content process thread to real-time OK");
}
@ -710,6 +707,7 @@ impl CubebServer {
let out_rate = params.output_stream_params.map(|p| p.rate).unwrap_or(0);
let rate = out_rate.max(in_rate);
// 1s of audio, rounded up to the nearest 64kB.
// Stream latency is capped at 1s in process_stream_init.
(((rate * frame_size) + 0xffff) & !0xffff) as usize
} else {
self.shm_area_size
@ -777,7 +775,26 @@ impl CubebServer {
cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
});
let latency = params.latency_frames;
// TODO: Manage stream latency requests with respect to the RT deadlines the callback_thread was configured for.
fn round_up_pow2(v: u32) -> u32 {
debug_assert!(v >= 1);
1 << (32 - (v - 1).leading_zeros())
}
let rate = params
.output_stream_params
.map(|p| p.rate)
.unwrap_or_else(|| params.input_stream_params.map(|p| p.rate).unwrap());
// Note: minimum latency supported by AudioIPC is currently ~5ms. This restriction may be reduced by later IPC improvements.
let min_latency = round_up_pow2(5 * rate / 1000);
// Note: maximum latency is restricted by the SharedMem size.
let max_latency = rate;
let latency = params.latency_frames.max(min_latency).min(max_latency);
trace!(
"stream rate={} latency requested={} calculated={}",
rate,
params.latency_frames,
latency
);
let server_stream = &mut self.streams[stm_tok];
assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
@ -842,9 +859,7 @@ unsafe extern "C" fn data_cb_c(
};
cbs.data_callback(input, output, nframes as isize) as c_long
});
// TODO: Return a CUBEB_ERROR result here once
// https://github.com/kinetiknz/cubeb/issues/553 is fixed.
ok.unwrap_or(0)
ok.unwrap_or(cubeb::ffi::CUBEB_ERROR as c_long)
}
unsafe extern "C" fn state_cb_c(

Просмотреть файл

@ -1 +1 @@
{"files":{"Cargo.toml":"d89d3cd7345174fef6ac44c488c3ec3fc34bac61af8c1fa60561eae5cbc2608a","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"3f061cf9a989f63a71c693a543d26f7003e8b643c39c23ea555110252a2c39d2","src/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/codec.rs":"58351c01b0414ec15f29d7dab4693508acfd4d7ca9df575e0eafffe4fe621e8e","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"74e91855ebfae112ecf93fd591987d7a9a7a36a7e19b364960e34455016e892d","src/lib.rs":"84d4b3db37309ca6dd735a59270a787049028d048458af514ef9b3aaf6a2dd58","src/messages.rs":"d5db81981851fec20c6b9ff86a97b360b6e8c4fba2106f5afa286cbada303a72","src/rpccore.rs":"9fa24cb6d487b436382e35f82d0809ad2b315ce049ebaa767b4f88d3d5637f2e","src/shm.rs":"1d88f19606899e3e477865d6b84bbe3e272f51618a1c2d57b6dab03a4787cde3","src/sys/mod.rs":"da4412ee630e53a0d3a79d9e18953280818bd58ed3fb3c6abedeeb8a092d3dfc","src/sys/unix/cmsg.rs":"e10e26cdfa92035ccb300dc4f2aef05eb1935833045cffb6b1107acc55889c8e","src/sys/unix/mod.rs":"3a2807c7b87ab5230d73fafd2f6417f6647e6d8ffdad7965d1d71bf511da0bcc","src/sys/unix/msg.rs":"d29d3974c145df8b1b931222f62aa64be0ec165b578f31b8f98555fa4d052b01","src/sys/windows/mod.rs":"50af90f17d9b61045ac009e0f53077f9a645c72c214c400b116c4eca2adce0d7"},"package":null}
{"files":{"Cargo.toml":"4a6aaffaf15fc11d3c41fc399a6e36a1ac9016f0edd592d4ff4059a2092818af","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"65df9a97c6cdaa3faf72581f04ac289197b0b1797d69d22c1796e957ff1089e2","src/codec.rs":"4e029396765db803201249e90bcf724eb56deed3b2e455822d6673f40550a3e1","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"6d33898f5bc61963797d21b44d36a7ee52d3d0cf13a4f19fa2d59373720eead8","src/lib.rs":"9b107cb52081eeea3fa742d30361db70f7138baa423dfe21d37dcf5087afc338","src/messages.rs":"452362da2cace9a0f2e3134c190ecb6a9997f8be4036cde06643e17c6c238240","src/rpccore.rs":"9fa24cb6d487b436382e35f82d0809ad2b315ce049ebaa767b4f88d3d5637f2e","src/shm.rs":"1d88f19606899e3e477865d6b84bbe3e272f51618a1c2d57b6dab03a4787cde3","src/sys/mod.rs":"e6fa1d260abf093e1f7b50185195e2d3aee0eb8c9774c6f253953b5896d838f3","src/sys/unix/cmsg.rs":"8a27a20383c333c5d033e58a546a530e26b964942a4615793d1ca078c65efb75","src/sys/unix/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/sys/unix/mod.rs":"59835f0d5509940078b1820a54f49fc5514adeb3e45e7d21e3ab917431da2e74","src/sys/unix/msg.rs":"c0103cc058aeb890ab7aa023fcd6d3b9a0135d6b28fdecdec446650957210508","src/sys/windows/mod.rs":"7b1288e42b3ce34c7004b9fe3eeb6d9822c55e2688d3c2a40e55db46a2ca5d76"},"package":null}

12
third_party/rust/audioipc2/Cargo.toml поставляемый
Просмотреть файл

@ -11,7 +11,8 @@ edition = "2018"
[dependencies]
bincode = "1.3"
bytes = "0.4"
byteorder = "1"
bytes = "1"
cubeb = "0.10"
log = "0.4"
serde = "1"
@ -24,16 +25,15 @@ scopeguard = "1.1.0"
[target.'cfg(unix)'.dependencies]
iovec = "0.1"
libc = "0.2"
memmap2 = "0.2"
memmap2 = "0.5"
arrayvec = "0.7"
[target.'cfg(target_os = "linux")'.dependencies.audio_thread_priority]
version = "0.26.0"
default_features = false
features = ["winapi"]
version = "0.26.1"
default-features = false
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["combaseapi", "memoryapi", "objbase"] }
winapi = { version = "0.3", features = ["combaseapi", "handleapi", "memoryapi", "objbase"] }
[target.'cfg(target_os = "android")'.dependencies]
ashmem = "0.1.2"

4
third_party/rust/audioipc2/build.rs поставляемый
Просмотреть файл

@ -1,5 +1,7 @@
fn main() {
if std::env::var_os("CARGO_CFG_UNIX").is_some() {
cc::Build::new().file("src/cmsghdr.c").compile("cmsghdr");
cc::Build::new()
.file("src/sys/unix/cmsghdr.c")
.compile("cmsghdr");
}
}

3
third_party/rust/audioipc2/src/codec.rs поставляемый
Просмотреть файл

@ -6,7 +6,8 @@
//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.
use bincode::{self, Options};
use bytes::{BufMut, ByteOrder, BytesMut, LittleEndian};
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, BytesMut};
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
use std::convert::TryInto;

61
third_party/rust/audioipc2/src/ipccore.rs поставляемый
Просмотреть файл

@ -21,13 +21,6 @@ use crate::{
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
#[cfg(windows)]
use crate::duplicate_platform_handle;
#[cfg(unix)]
use crate::sys::cmsg;
#[cfg(unix)]
use crate::PlatformHandle;
const WAKE_TOKEN: Token = Token(!0);
thread_local!(static IN_EVENTLOOP: std::cell::RefCell<Option<thread::ThreadId>> = std::cell::RefCell::new(None));
@ -608,16 +601,24 @@ where
// Repeatedly call `decode` as long as it produces items, passing each produced item to the handler to action.
#[allow(unused_mut)]
while let Some(mut item) = self.codec.decode(&mut inbound.buf)? {
#[cfg(unix)]
item.receive_owned_message_handle(|| {
if let Some(handle) = self.extra_handle.take() {
unsafe { handle.into_raw() }
} else {
let handles = cmsg::decode_handles(&mut inbound.cmsg);
self.extra_handle = handles.get(1).map(|h| PlatformHandle::new(*h));
handles[0]
if item.has_associated_handle() {
// On Unix, dequeue a handle from the connection and update the item's handle.
#[cfg(unix)]
{
let new = inbound
.pop_handle()
.expect("inbound handle expected for item");
unsafe { item.set_local_handle(new.take()) };
}
});
// On Windows, the deserialized item contains the correct handle value, so
// convert it to an owned handle on the item.
#[cfg(windows)]
{
assert!(inbound.pop_handle().is_none());
unsafe { item.set_local_handle() };
}
}
self.handler.consume(item)?;
}
@ -630,21 +631,25 @@ where
// Repeatedly grab outgoing items from the handler, passing each to `encode` for serialization into `outbound`.
while let Some(mut item) = self.handler.produce()? {
item.prepare_send_message_handle(|handle, _target| {
// On Unix, the handle is encoded into a cmsg buffer for out-of-band transport via sendmsg.
#[cfg(unix)]
{
cmsg::encode_handle(&mut outbound.cmsg, handle);
Ok(handle)
}
// On Windows, the handle is transferred by duplicating it into the target remote process during message send.
let handle = if item.has_associated_handle() {
#[allow(unused_mut)]
let mut handle = item.take_handle();
// On Windows, the handle is transferred by duplicating it into the target remote process.
#[cfg(windows)]
unsafe {
duplicate_platform_handle(handle, Some(_target))
item.set_remote_handle(handle.send_to_target()?);
}
})?;
Some(handle)
} else {
None
};
self.codec.encode(item, &mut outbound.buf)?;
if let Some(handle) = handle {
// `outbound` retains ownership of the handle until the associated
// encoded item in `outbound.buf` is sent to the remote process.
outbound.push_handle(handle);
}
}
Ok(())
}
@ -653,8 +658,6 @@ where
struct FramedDriver<T: Handler> {
codec: LengthDelimitedCodec<T::Out, T::In>,
handler: T,
#[cfg(unix)]
extra_handle: Option<PlatformHandle>,
}
impl<T: Handler> FramedDriver<T> {
@ -662,8 +665,6 @@ impl<T: Handler> FramedDriver<T> {
FramedDriver {
codec: Default::default(),
handler,
#[cfg(unix)]
extra_handle: None,
}
}
}

78
third_party/rust/audioipc2/src/lib.rs поставляемый
Просмотреть файл

@ -27,6 +27,8 @@ use std::os::unix::io::IntoRawFd;
#[cfg(windows)]
use std::os::windows::io::IntoRawHandle;
use std::io::Result;
// This must match the definition of
// ipc::FileDescriptor::PlatformHandleType in Gecko.
#[cfg(windows)]
@ -78,7 +80,7 @@ impl PlatformHandle {
}
#[cfg(unix)]
pub fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle, std::io::Error> {
pub fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle> {
unsafe {
let newfd = libc::dup(h);
if !valid_handle(newfd) {
@ -90,7 +92,7 @@ impl PlatformHandle {
#[allow(clippy::missing_safety_doc)]
#[cfg(windows)]
pub unsafe fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle, std::io::Error> {
pub unsafe fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle> {
let dup = duplicate_platform_handle(h, None)?;
Ok(PlatformHandle::new(dup))
}
@ -113,21 +115,19 @@ unsafe fn close_platform_handle(handle: PlatformHandleType) {
}
#[cfg(windows)]
use winapi::shared::minwindef::DWORD;
use winapi::shared::minwindef::{DWORD, FALSE};
#[cfg(windows)]
use winapi::um::{handleapi, processthreadsapi, winnt};
// Duplicate `source_handle`.
// - If `target_pid` is `Some(...)`, `source_handle` is closed.
// - If `target_pid` is `None`, `source_handle` is not closed.
// Duplicate `source_handle` to `target_pid`. Returns the value of the new handle inside the target process.
// If `target_pid` is `None`, `source_handle` is duplicated in the current process.
#[cfg(windows)]
pub(crate) unsafe fn duplicate_platform_handle(
source_handle: PlatformHandleType,
target_pid: Option<DWORD>,
) -> Result<PlatformHandleType, std::io::Error> {
use winapi::shared::minwindef::FALSE;
use winapi::um::{handleapi, processthreadsapi, winnt};
let source = processthreadsapi::GetCurrentProcess();
let (target, close_source) = if let Some(pid) = target_pid {
) -> Result<PlatformHandleType> {
let source_process = processthreadsapi::GetCurrentProcess();
let target_process = if let Some(pid) = target_pid {
let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, pid);
if !valid_handle(target) {
return Err(std::io::Error::new(
@ -135,26 +135,24 @@ pub(crate) unsafe fn duplicate_platform_handle(
"invalid target process",
));
}
(target, true)
Some(target)
} else {
(source, false)
None
};
let mut target_handle = std::ptr::null_mut();
let mut options = winnt::DUPLICATE_SAME_ACCESS;
if close_source {
options |= winnt::DUPLICATE_CLOSE_SOURCE;
}
let ok = handleapi::DuplicateHandle(
source,
source_process,
source_handle,
target,
target_process.unwrap_or(source_process),
&mut target_handle,
0,
FALSE,
options,
winnt::DUPLICATE_SAME_ACCESS,
);
handleapi::CloseHandle(target);
if let Some(target) = target_process {
handleapi::CloseHandle(target);
};
if ok == FALSE {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
@ -164,6 +162,42 @@ pub(crate) unsafe fn duplicate_platform_handle(
Ok(target_handle)
}
// Close `target_handle_to_close` inside target process `target_pid` using a
// special invocation of `DuplicateHandle`. See
// https://docs.microsoft.com/en-us/windows/win32/api/handleapi/nf-handleapi-duplicatehandle#:~:text=Normally%20the%20target,dwOptions%20to%20DUPLICATE_CLOSE_SOURCE.
#[cfg(windows)]
pub(crate) unsafe fn close_target_handle(
target_handle_to_close: PlatformHandleType,
target_pid: DWORD,
) -> Result<()> {
let target_process =
processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, target_pid);
if !valid_handle(target_process) {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"invalid target process",
));
}
let ok = handleapi::DuplicateHandle(
target_process,
target_handle_to_close,
std::ptr::null_mut(),
std::ptr::null_mut(),
0,
FALSE,
winnt::DUPLICATE_CLOSE_SOURCE,
);
handleapi::CloseHandle(target_process);
if ok == FALSE {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"DuplicateHandle failed",
));
}
Ok(())
}
#[cfg(windows)]
pub fn server_platform_init() {
use winapi::shared::winerror;

311
third_party/rust/audioipc2/src/messages.rs поставляемый
Просмотреть файл

@ -5,13 +5,13 @@
use crate::PlatformHandle;
use crate::PlatformHandleType;
use crate::INVALID_HANDLE_VALUE;
#[cfg(target_os = "linux")]
use audio_thread_priority::RtPriorityThreadInfo;
use cubeb::{self, ffi};
use serde_derive::Deserialize;
use serde_derive::Serialize;
use std::ffi::{CStr, CString};
use std::io;
use std::os::raw::{c_char, c_int, c_uint};
use std::ptr;
@ -295,30 +295,32 @@ pub enum DeviceCollectionResp {
DeviceChange,
}
// Represents a handle in various transitional states during serialization and remoting.
// Represents a platform handle in various transitional states during serialization and remoting.
// The process of serializing and remoting handles and the ownership during various states differs
// between Windows and Unix. SerializableHandle changes during IPC is as follows:
// between Windows and Unix. SerializableHandle changes during IPC as follows:
//
// 1. The initial state `Owned`, with a valid `target_pid`.
// 1. Created in the initial state `Owned`, with a valid `target_pid`.
// 2. Ownership is transferred out for processing during IPC send, becoming `Empty` temporarily.
// See `AssocRawPlatformHandle::take_handle_for_send`.
// 3. Message containing `SerializableHandleValue` is serialized and sent via IPC.
// See `AssociateHandleForMessage::take_handle`.
// - Windows: DuplicateHandle transfers the handle to the remote process.
// This produces a new value in the local process representing the remote handle.
// This value must be sent to the remote, so is recorded as `SerializableValue`.
// - Unix: Handle value (and ownership) is encoded into cmsg buffer via `builder`.
// The handle is converted to a `SerializableValue` for convenience, but is otherwise unused.
// This produces a new handle value in the local process representing the remote handle.
// This value must be sent to the remote, so `AssociateHandleForMessage::set_remote_handle`
// is used to transform the handle into a `SerializableValue`.
// - Unix: sendmsg transfers the handle to the remote process. The handle is left `Empty`.
// (Note: this occurs later, when the serialized message buffer is sent)
// 3. Message containing `SerializableValue` or `Empty` (depending on handle processing in step 2)
// is serialized and sent via IPC.
// 4. Message received and deserialized in target process.
// - Windows: Deserialization converts the `SerializableValue` into `Owned`, ready for use.
// - Windows: `AssociateHandleForMessage::set_local_handle converts the received `SerializableValue` into `Owned`, ready for use.
// - Unix: Handle (with a new value in the target process) is received out-of-band via `recvmsg`
// and converted to `Owned` via `AssocRawPlatformHandle::set_owned_handle`.
// and converted to `Owned` via `AssociateHandleForMessage::set_local_handle`.
#[derive(Debug)]
pub enum SerializableHandle {
// Owned handle, with optional target_pid on sending side.
Owned(PlatformHandle, Option<u32>),
// Transitional IPC states.
SerializableValue(PlatformHandleType),
Empty,
// Transitional IPC states:
SerializableValue(PlatformHandleType), // Windows
Empty, // Unix
}
// PlatformHandle is non-Send and contains a pointer (HANDLE) on Windows.
@ -330,6 +332,7 @@ impl SerializableHandle {
SerializableHandle::Owned(handle, Some(target_pid))
}
// Called on the receiving side to take ownership of the handle.
pub fn take_handle(&mut self) -> PlatformHandle {
match std::mem::replace(self, SerializableHandle::Empty) {
SerializableHandle::Owned(handle, target_pid) => {
@ -340,13 +343,17 @@ impl SerializableHandle {
}
}
unsafe fn take_handle_for_send(&mut self) -> (PlatformHandleType, u32) {
// Called on the sending side to take ownership of the handle for
// handling platform-specific remoting.
fn take_handle_for_send(&mut self) -> RemoteHandle {
match std::mem::replace(self, SerializableHandle::Empty) {
SerializableHandle::Owned(handle, target_pid) => (
handle.into_raw(),
target_pid.expect("need valid target_pid"),
),
_ => panic!("take_handle_with_target called in invalid state"),
SerializableHandle::Owned(handle, target_pid) => unsafe {
RemoteHandle::new(
handle.into_raw(),
target_pid.expect("target process required"),
)
},
_ => panic!("take_handle_for_send called in invalid state"),
}
}
@ -354,6 +361,15 @@ impl SerializableHandle {
SerializableHandle::Owned(PlatformHandle::new(handle), None)
}
#[cfg(windows)]
fn make_owned(&mut self) {
if let SerializableHandle::SerializableValue(handle) = self {
*self = SerializableHandle::new_owned(*handle);
} else {
panic!("make_owned called in invalid state")
}
}
fn new_serializable_value(handle: PlatformHandleType) -> SerializableHandle {
SerializableHandle::SerializableValue(handle)
}
@ -361,6 +377,7 @@ impl SerializableHandle {
fn get_serializable_value(&self) -> PlatformHandleType {
match *self {
SerializableHandle::SerializableValue(handle) => handle,
SerializableHandle::Empty => INVALID_HANDLE_VALUE,
_ => panic!("get_remote_handle called in invalid state"),
}
}
@ -399,104 +416,182 @@ impl serde::de::Visitor<'_> for SerializableHandleVisitor {
where
E: serde::de::Error,
{
let value = value as PlatformHandleType;
Ok(if cfg!(windows) {
SerializableHandle::new_owned(value)
} else {
// On Unix, SerializableHandle becomes owned once `set_owned_handle` is called
// with the new local handle value during `recvmsg`.
SerializableHandle::new_serializable_value(value)
})
Ok(SerializableHandle::new_serializable_value(
value as PlatformHandleType,
))
}
}
pub trait AssociateHandleForMessage {
// Prepare message's handle to be sent.
// `_f` takes the local handle and target process ID, performs any OS-specific work
// required to send the handle, and returns the value of the handle in the remote,
// which the message is then updated with before being serialized by the caller.
fn prepare_send_message_handle<F>(&mut self, _f: F) -> io::Result<()>
where
F: FnOnce(PlatformHandleType, u32) -> io::Result<PlatformHandleType>,
{
Ok(())
// Represents a PlatformHandle in-flight between processes.
// On Unix platforms, this is just a plain owned Handle, closed on drop.
// On Windows, `RemoteHandle` also retains ownership of the `target_handle`
// in the `target` process. Once the handle has been successfully sent
// to the remote, the sender should call `mark_sent()` to relinquish
// ownership of `target_handle` in the remote.
#[derive(Debug)]
pub struct RemoteHandle {
pub(crate) handle: PlatformHandleType,
#[cfg(windows)]
pub(crate) target: u32,
#[cfg(windows)]
pub(crate) target_handle: Option<PlatformHandleType>,
}
impl RemoteHandle {
#[allow(clippy::missing_safety_doc)]
pub unsafe fn new(handle: PlatformHandleType, _target: u32) -> Self {
RemoteHandle {
handle,
#[cfg(windows)]
target: _target,
#[cfg(windows)]
target_handle: None,
}
}
#[cfg(windows)]
pub fn mark_sent(&mut self) {
self.target_handle.take();
}
#[cfg(windows)]
#[allow(clippy::missing_safety_doc)]
pub unsafe fn send_to_target(&mut self) -> std::io::Result<PlatformHandleType> {
let target_handle = crate::duplicate_platform_handle(self.handle, Some(self.target))?;
self.target_handle = Some(target_handle);
Ok(target_handle)
}
// Update the item's handle with the received value, making it a valid owned handle.
// Called on the receiving side after deserialization.
// Implementations must only call `F` for message types expecting a handle.
#[cfg(unix)]
fn receive_owned_message_handle<F>(&mut self, _: F)
where
F: FnOnce() -> PlatformHandleType,
{
#[allow(clippy::missing_safety_doc)]
pub unsafe fn take(self) -> PlatformHandleType {
let h = self.handle;
std::mem::forget(self);
h
}
}
impl Drop for RemoteHandle {
fn drop(&mut self) {
unsafe {
crate::close_platform_handle(self.handle);
}
#[cfg(windows)]
unsafe {
if let Some(target_handle) = self.target_handle {
if let Err(e) = crate::close_target_handle(target_handle, self.target) {
trace!("RemoteHandle failed to close target handle: {:?}", e);
}
}
}
}
}
unsafe impl Send for RemoteHandle {}
pub trait AssociateHandleForMessage {
// True if this item has an associated handle attached for remoting.
fn has_associated_handle(&self) -> bool {
false
}
// Take ownership of the associated handle, leaving the item's
// associated handle empty.
fn take_handle(&mut self) -> RemoteHandle {
panic!("take_handle called on item without associated handle");
}
#[allow(clippy::missing_safety_doc)]
// Replace an empty associated handle with a non-owning serializable value
// indicating the value of the handle in the remote process.
#[cfg(windows)]
unsafe fn set_remote_handle(&mut self, _: PlatformHandleType) {
panic!("set_remote_handle called on item without associated handle");
}
#[allow(clippy::missing_safety_doc)]
// Replace a serialized associated handle value with an owned local handle.
#[cfg(windows)]
unsafe fn set_local_handle(&mut self) {
panic!("set_local_handle called on item without associated handle");
}
#[allow(clippy::missing_safety_doc)]
// Replace an empty associated handle with an owned local handle.
#[cfg(unix)]
unsafe fn set_local_handle(&mut self, _: PlatformHandleType) {
panic!("set_local_handle called on item without associated handle");
}
}
impl AssociateHandleForMessage for ClientMessage {
fn has_associated_handle(&self) -> bool {
matches!(
*self,
ClientMessage::StreamCreated(_)
| ClientMessage::StreamInitialized(_)
| ClientMessage::ContextSetupDeviceCollectionCallback(_)
)
}
fn take_handle(&mut self) -> RemoteHandle {
match *self {
ClientMessage::StreamCreated(ref mut data) => data.shm_handle.take_handle_for_send(),
ClientMessage::StreamInitialized(ref mut data) => data.take_handle_for_send(),
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
data.platform_handle.take_handle_for_send()
}
_ => panic!("take_handle called on item without associated handle"),
}
}
#[cfg(windows)]
unsafe fn set_remote_handle(&mut self, handle: PlatformHandleType) {
match *self {
ClientMessage::StreamCreated(ref mut data) => {
data.shm_handle = SerializableHandle::new_serializable_value(handle);
}
ClientMessage::StreamInitialized(ref mut data) => {
*data = SerializableHandle::new_serializable_value(handle);
}
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
data.platform_handle = SerializableHandle::new_serializable_value(handle);
}
_ => panic!("set_remote_handle called on item without associated handle"),
}
}
#[cfg(windows)]
unsafe fn set_local_handle(&mut self) {
match *self {
ClientMessage::StreamCreated(ref mut data) => data.shm_handle.make_owned(),
ClientMessage::StreamInitialized(ref mut data) => data.make_owned(),
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
data.platform_handle.make_owned()
}
_ => panic!("set_local_handle called on item without associated handle"),
}
}
#[cfg(unix)]
unsafe fn set_local_handle(&mut self, handle: PlatformHandleType) {
match *self {
ClientMessage::StreamCreated(ref mut data) => {
data.shm_handle = SerializableHandle::new_owned(handle);
}
ClientMessage::StreamInitialized(ref mut data) => {
*data = SerializableHandle::new_owned(handle);
}
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
data.platform_handle = SerializableHandle::new_owned(handle);
}
_ => panic!("set_local_handle called on item without associated handle"),
}
}
}
impl AssociateHandleForMessage for ServerMessage {}
impl AssociateHandleForMessage for ClientMessage {
fn prepare_send_message_handle<F>(&mut self, f: F) -> io::Result<()>
where
F: FnOnce(PlatformHandleType, u32) -> io::Result<PlatformHandleType>,
{
unsafe {
match *self {
ClientMessage::StreamCreated(ref mut data) => {
let handle = data.shm_handle.take_handle_for_send();
data.shm_handle =
SerializableHandle::new_serializable_value(f(handle.0, handle.1)?);
trace!(
"StreamCreated handle: {:?} remote_handle: {:?}",
handle,
data.shm_handle
);
}
ClientMessage::StreamInitialized(ref mut data) => {
let handle = data.take_handle_for_send();
*data = SerializableHandle::new_serializable_value(f(handle.0, handle.1)?);
trace!(
"StreamInitialized handle: {:?} remote_handle: {:?}",
handle,
data
);
}
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
let handle = data.platform_handle.take_handle_for_send();
data.platform_handle =
SerializableHandle::new_serializable_value(f(handle.0, handle.1)?);
trace!(
"ContextSetupDeviceCollectionCallback handle: {:?} remote_handle: {:?}",
handle,
data.platform_handle
);
}
_ => {}
}
}
Ok(())
}
#[cfg(unix)]
fn receive_owned_message_handle<F>(&mut self, f: F)
where
F: FnOnce() -> PlatformHandleType,
{
match *self {
ClientMessage::StreamCreated(ref mut data) => {
data.shm_handle = SerializableHandle::new_owned(f());
}
ClientMessage::StreamInitialized(ref mut data) => {
*data = SerializableHandle::new_owned(f());
}
ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
data.platform_handle = SerializableHandle::new_owned(f());
}
_ => {}
}
}
}
impl AssociateHandleForMessage for DeviceCollectionReq {}
impl AssociateHandleForMessage for DeviceCollectionResp {}

41
third_party/rust/audioipc2/src/sys/mod.rs поставляемый
Просмотреть файл

@ -3,12 +3,15 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use std::io::Result;
use std::{collections::VecDeque, io::Result};
use bytes::BytesMut;
use mio::{event::Source, Interest, Registry, Token};
#[cfg(unix)]
mod unix;
use crate::messages::RemoteHandle;
#[cfg(unix)]
pub use self::unix::*;
@ -19,15 +22,45 @@ pub use self::windows::*;
impl Source for Pipe {
fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
self.0.register(registry, token, interests)
self.io.register(registry, token, interests)
}
fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
self.0.reregister(registry, token, interests)
self.io.reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> Result<()> {
self.0.deregister(registry)
self.io.deregister(registry)
}
}
const HANDLE_QUEUE_LIMIT: usize = 16;
#[derive(Debug)]
pub struct ConnectionBuffer {
pub buf: BytesMut,
handles: VecDeque<RemoteHandle>,
}
impl ConnectionBuffer {
pub fn with_capacity(cap: usize) -> Self {
ConnectionBuffer {
buf: BytesMut::with_capacity(cap),
handles: VecDeque::with_capacity(HANDLE_QUEUE_LIMIT),
}
}
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
pub fn push_handle(&mut self, handle: RemoteHandle) {
assert!(self.handles.len() < self.handles.capacity());
self.handles.push_back(handle)
}
pub fn pop_handle(&mut self) -> Option<RemoteHandle> {
self.handles.pop_front()
}
}

Просмотреть файл

@ -3,173 +3,12 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details
use bytes::{BufMut, Bytes, BytesMut};
use crate::sys::HANDLE_QUEUE_LIMIT;
use bytes::{BufMut, BytesMut};
use libc::{self, cmsghdr};
use std::convert::TryInto;
use std::mem::size_of;
use std::os::unix::io::RawFd;
use std::{convert, mem, ops, slice};
#[derive(Clone, Debug)]
struct Fds {
fds: Bytes,
}
impl convert::AsRef<[RawFd]> for Fds {
fn as_ref(&self) -> &[RawFd] {
let n = self.fds.len() / mem::size_of::<RawFd>();
unsafe { slice::from_raw_parts(self.fds.as_ptr() as *const _, n) }
}
}
impl ops::Deref for Fds {
type Target = [RawFd];
#[inline]
fn deref(&self) -> &[RawFd] {
self.as_ref()
}
}
struct ControlMsgIter {
control: Bytes,
}
pub fn encode_handle(cmsg: &mut BytesMut, handle: RawFd) {
// TODO: Rework builder to commit directly to outbound buffer.
match builder(cmsg).rights(&[handle]).finish() {
Ok(handle_bytes) => cmsg.extend_from_slice(&handle_bytes),
Err(e) => debug!("cmsg::builder failed: {:?}", e),
}
}
// Decode one cmsghdr containing handle(s), and adjust the `cmsg` buffer cursor past
// the decoded handle(s).
// Note: ideally this would be a single handle, but due to buffering multiple
// sendmsgs can coalesce into a single recvmsg. On some (64-bit) systems, the
// minimum alignment of the cmsghdr buffer provides capacity for 2 handles, so
// this code must expect 1 or 2 handles per decode call.
pub fn decode_handles(cmsg: &mut BytesMut) -> arrayvec::ArrayVec<RawFd, 2> {
let mut fds = arrayvec::ArrayVec::<RawFd, 2>::new();
let b = cmsg.split_to(space(size_of::<i32>())).freeze();
let fd = iterator(b).next().unwrap();
assert!(fd.len() == 1 || fd.len() == 2);
fds.try_extend_from_slice(&fd).unwrap();
fds
}
fn iterator(c: Bytes) -> ControlMsgIter {
ControlMsgIter { control: c }
}
impl Iterator for ControlMsgIter {
type Item = Fds;
fn next(&mut self) -> Option<Self::Item> {
loop {
let control = self.control.clone();
let cmsghdr_len = len(0);
if control.len() < cmsghdr_len {
// No more entries---not enough data in `control` for a
// complete message.
return None;
}
let cmsg: &cmsghdr = unsafe { &*(control.as_ptr() as *const _) };
// The offset to the next cmsghdr in control. This must be
// aligned to a boundary that matches the type used to
// represent the length of the message.
let cmsg_len = cmsg.cmsg_len as usize;
let cmsg_space = space(cmsg_len - cmsghdr_len);
self.control = if cmsg_space > control.len() {
// No more entries---not enough data in `control` for a
// complete message.
Bytes::new()
} else {
control.slice_from(cmsg_space)
};
match (cmsg.cmsg_level, cmsg.cmsg_type) {
(libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
trace!("Found SCM_RIGHTS...");
return Some(Fds {
fds: control.slice(cmsghdr_len, cmsg_len as _),
});
}
(level, kind) => {
trace!("Skipping cmsg level, {}, type={}...", level, kind);
}
}
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
enum Error {
/// Not enough space in storage to insert control mesage.
NoSpace,
}
#[must_use]
struct ControlMsgBuilder {
result: Result<BytesMut, Error>,
}
fn builder(buf: &mut BytesMut) -> ControlMsgBuilder {
let buf = aligned(buf);
ControlMsgBuilder { result: Ok(buf) }
}
impl ControlMsgBuilder {
fn msg(mut self, level: libc::c_int, kind: libc::c_int, msg: &[u8]) -> Self {
self.result = self.result.and_then(|mut cmsg| {
let cmsg_space = space(msg.len());
if cmsg.remaining_mut() < cmsg_space {
return Err(Error::NoSpace);
}
// Some definitions of cmsghdr contain padding. Rather
// than try to keep an up-to-date #cfg list to handle
// that, just use a pre-zeroed struct to fill out any
// fields we don't care about.
let zeroed = unsafe { mem::zeroed() };
#[allow(clippy::needless_update)]
// `cmsg_len` is `usize` on some platforms, `u32` on others.
#[allow(clippy::useless_conversion)]
let cmsghdr = cmsghdr {
cmsg_len: len(msg.len()).try_into().unwrap(),
cmsg_level: level,
cmsg_type: kind,
..zeroed
};
unsafe {
let cmsghdr_ptr = cmsg.bytes_mut().as_mut_ptr();
std::ptr::copy_nonoverlapping(
&cmsghdr as *const _ as *const _,
cmsghdr_ptr,
mem::size_of::<cmsghdr>(),
);
let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _);
std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len());
cmsg.advance_mut(cmsg_space);
}
Ok(cmsg)
});
self
}
pub fn rights(self, fds: &[RawFd]) -> Self {
self.msg(libc::SOL_SOCKET, libc::SCM_RIGHTS, fds.as_bytes())
}
pub fn finish(self) -> Result<Bytes, Error> {
self.result.map(|mut cmsg| cmsg.take().freeze())
}
}
use std::{mem, slice};
trait AsBytes {
fn as_bytes(&self) -> &[u8];
@ -183,16 +22,76 @@ impl<'a, T: Sized> AsBytes for &'a [T] {
}
}
fn aligned(buf: &BytesMut) -> BytesMut {
let mut aligned_buf = buf.clone();
aligned_buf.reserve(buf.remaining_mut());
let cmsghdr_align = mem::align_of::<cmsghdr>();
let n = unsafe { aligned_buf.bytes_mut().as_ptr() } as usize & (cmsghdr_align - 1);
if n != 0 {
unsafe { aligned_buf.advance_mut(n) };
drop(aligned_buf.take());
// Encode `handles` into a cmsghdr in `buf`.
pub fn encode_handles(cmsg: &mut BytesMut, handles: &[RawFd]) {
assert!(handles.len() <= HANDLE_QUEUE_LIMIT);
let msg = handles.as_bytes();
let cmsg_space = space(msg.len());
assert!(cmsg.remaining_mut() >= cmsg_space);
// Some definitions of cmsghdr contain padding. Rather
// than try to keep an up-to-date #cfg list to handle
// that, just use a pre-zeroed struct to fill out any
// fields we don't care about.
let zeroed = unsafe { mem::zeroed() };
#[allow(clippy::needless_update)]
// `cmsg_len` is `usize` on some platforms, `u32` on others.
#[allow(clippy::useless_conversion)]
let cmsghdr = cmsghdr {
cmsg_len: len(msg.len()).try_into().unwrap(),
cmsg_level: libc::SOL_SOCKET,
cmsg_type: libc::SCM_RIGHTS,
..zeroed
};
unsafe {
let cmsghdr_ptr = cmsg.chunk_mut().as_mut_ptr();
std::ptr::copy_nonoverlapping(
&cmsghdr as *const _ as *const _,
cmsghdr_ptr,
mem::size_of::<cmsghdr>(),
);
let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _);
std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len());
cmsg.advance_mut(cmsg_space);
}
aligned_buf
}
// Decode `buf` containing a cmsghdr with one or more handle(s).
pub fn decode_handles(buf: &mut BytesMut) -> arrayvec::ArrayVec<RawFd, HANDLE_QUEUE_LIMIT> {
let mut fds = arrayvec::ArrayVec::<RawFd, HANDLE_QUEUE_LIMIT>::new();
let cmsghdr_len = len(0);
if buf.len() < cmsghdr_len {
// No more entries---not enough data in `buf` for a
// complete message.
return fds;
}
let cmsg: &cmsghdr = unsafe { &*(buf.as_ptr() as *const _) };
let cmsg_len = cmsg.cmsg_len as usize;
match (cmsg.cmsg_level, cmsg.cmsg_type) {
(libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
trace!("Found SCM_RIGHTS...");
let slice = &buf[cmsghdr_len..cmsg_len];
let slice = unsafe {
slice::from_raw_parts(
slice.as_ptr() as *const _,
slice.len() / mem::size_of::<i32>(),
)
};
fds.try_extend_from_slice(slice).unwrap();
}
(level, kind) => {
trace!("Skipping cmsg level, {}, type={}...", level, kind);
}
}
assert!(fds.len() <= HANDLE_QUEUE_LIMIT);
fds
}
fn len(len: usize) -> usize {

Просмотреть файл

116
third_party/rust/audioipc2/src/sys/unix/mod.rs поставляемый
Просмотреть файл

@ -4,36 +4,50 @@
// accompanying file LICENSE for details
use std::io::Result;
use std::os::unix::prelude::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::prelude::{AsRawFd, FromRawFd};
use bytes::{BufMut, BytesMut};
use bytes::{Buf, BufMut, BytesMut};
use iovec::IoVec;
use mio::net::UnixStream;
use crate::{close_platform_handle, PlatformHandle};
use crate::PlatformHandle;
use super::{RecvMsg, SendMsg};
use super::{ConnectionBuffer, RecvMsg, SendMsg, HANDLE_QUEUE_LIMIT};
pub mod cmsg;
mod msg;
pub struct Pipe(pub UnixStream);
pub struct Pipe {
pub(crate) io: UnixStream,
cmsg: BytesMut,
}
impl Pipe {
fn new(io: UnixStream) -> Self {
Pipe {
io,
cmsg: BytesMut::with_capacity(cmsg::space(
std::mem::size_of::<i32>() * HANDLE_QUEUE_LIMIT,
)),
}
}
}
// Create a connected "pipe" pair. The `Pipe` is the server end,
// the `PlatformHandle` is the client end to be remoted.
pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> {
let (server, client) = UnixStream::pair()?;
Ok((Pipe(server), PlatformHandle::from(client)))
Ok((Pipe::new(server), PlatformHandle::from(client)))
}
impl Pipe {
#[allow(clippy::missing_safety_doc)]
pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe {
Pipe(UnixStream::from_raw_fd(handle.into_raw()))
Pipe::new(UnixStream::from_raw_fd(handle.into_raw()))
}
pub fn shutdown(&mut self) -> Result<()> {
self.0.shutdown(std::net::Shutdown::Both)
self.io.shutdown(std::net::Shutdown::Both)
}
}
@ -42,7 +56,6 @@ impl RecvMsg for Pipe {
// the `ConnectionBuffer` members has been adjusted appropriate by the caller.
fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
assert!(buf.buf.remaining_mut() > 0);
assert!(buf.cmsg.remaining_mut() > 0);
// TODO: MSG_CMSG_CLOEXEC not portable.
// TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead.
#[cfg(target_os = "linux")]
@ -50,14 +63,27 @@ impl RecvMsg for Pipe {
#[cfg(not(target_os = "linux"))]
let flags = 0;
let r = unsafe {
let mut iovec = [<&mut IoVec>::from(buf.buf.bytes_mut())];
msg::recv_msg_with_flags(self.0.as_raw_fd(), &mut iovec, buf.cmsg.bytes_mut(), flags)
let chunk = buf.buf.chunk_mut();
let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len());
let mut iovec = [<&mut IoVec>::from(slice)];
msg::recv_msg_with_flags(
self.io.as_raw_fd(),
&mut iovec,
self.cmsg.chunk_mut(),
flags,
)
};
match r {
Ok((n, cmsg_n, msg_flags)) => unsafe {
trace!("recv_msg_with_flags flags={}", msg_flags);
buf.buf.advance_mut(n);
buf.cmsg.advance_mut(cmsg_n);
self.cmsg.advance_mut(cmsg_n);
let handles = cmsg::decode_handles(&mut self.cmsg);
self.cmsg.clear();
let unused = 0;
for h in handles {
buf.push_handle(super::RemoteHandle::new(h, unused));
}
Ok(n)
},
Err(e) => Err(e),
@ -70,6 +96,13 @@ impl SendMsg for Pipe {
// `ConnectionBuffer` members based on the size of the successful send operation.
fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
assert!(!buf.buf.is_empty());
if !buf.handles.is_empty() {
let mut handles = [-1i32; HANDLE_QUEUE_LIMIT];
for (i, h) in buf.handles.iter().enumerate() {
handles[i] = h.handle;
}
cmsg::encode_handles(&mut self.cmsg, &handles[..buf.handles.len()]);
}
let r = {
// TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead.
#[cfg(target_os = "linux")]
@ -77,68 +110,17 @@ impl SendMsg for Pipe {
#[cfg(not(target_os = "linux"))]
let flags = 0;
let iovec = [<&IoVec>::from(&buf.buf[..buf.buf.len()])];
msg::send_msg_with_flags(
self.0.as_raw_fd(),
&iovec,
&buf.cmsg[..buf.cmsg.len()],
flags,
)
msg::send_msg_with_flags(self.io.as_raw_fd(), &iovec, &self.cmsg, flags)
};
match r {
Ok(n) => {
buf.buf.advance(n);
// Close sent fds.
close_fds(&mut buf.cmsg);
// Discard sent handles.
while buf.handles.pop_front().is_some() {}
self.cmsg.clear();
Ok(n)
}
Err(e) => Err(e),
}
}
}
// Platform-specific wrapper around `BytesMut`.
// `cmsg` is a secondary buffer used for sending/receiving
// fds via `sendmsg`/`recvmsg` on a Unix Domain Socket.
#[derive(Debug)]
pub struct ConnectionBuffer {
pub buf: BytesMut,
pub cmsg: BytesMut,
}
impl ConnectionBuffer {
pub fn with_capacity(cap: usize) -> Self {
ConnectionBuffer {
buf: BytesMut::with_capacity(cap),
cmsg: BytesMut::with_capacity(cmsg::space(std::mem::size_of::<RawFd>())),
}
}
pub fn is_empty(&self) -> bool {
self.buf.is_empty() && self.cmsg.is_empty()
}
}
// Close any unprocessed fds in cmsg buffer.
impl Drop for ConnectionBuffer {
fn drop(&mut self) {
if !self.cmsg.is_empty() {
debug!(
"ConnectionBuffer dropped with {} bytes in cmsg",
self.cmsg.len()
);
close_fds(&mut self.cmsg);
}
}
}
fn close_fds(cmsg: &mut BytesMut) {
while !cmsg.is_empty() {
let fds = cmsg::decode_handles(cmsg);
for fd in fds {
unsafe {
close_platform_handle(fd);
}
}
}
assert!(cmsg.is_empty());
}

Просмотреть файл

@ -3,6 +3,7 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
use bytes::buf::UninitSlice;
use iovec::unix;
use iovec::IoVec;
use std::os::unix::io::RawFd;
@ -29,15 +30,15 @@ fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
pub(crate) fn recv_msg_with_flags(
socket: RawFd,
bufs: &mut [&mut IoVec],
cmsg: &mut [u8],
cmsg: &mut UninitSlice,
flags: libc::c_int,
) -> io::Result<(usize, usize, libc::c_int)> {
let slice = unix::as_os_slice_mut(bufs);
let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
let (control, controllen) = if cmsg.is_empty() {
let (control, controllen) = if cmsg.len() == 0 {
(ptr::null_mut(), 0)
} else {
(cmsg.as_ptr() as *mut _, cmsg.len())
(cmsg.as_mut_ptr() as _, cmsg.len())
};
let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };

Просмотреть файл

@ -12,15 +12,17 @@ use std::{
use std::io::Result;
use bytes::{BufMut, BytesMut};
use bytes::{Buf, BufMut};
use mio::windows::NamedPipe;
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
use crate::PlatformHandle;
use super::{RecvMsg, SendMsg};
use super::{ConnectionBuffer, RecvMsg, SendMsg};
pub struct Pipe(pub NamedPipe);
pub struct Pipe {
pub(crate) io: NamedPipe,
}
// Create a connected "pipe" pair. The `Pipe` is the server end,
// the `PlatformHandle` is the client end to be remoted.
@ -37,7 +39,7 @@ pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> {
PlatformHandle::from(file)
};
Ok((Pipe(server), client))
Ok((Pipe::new(server), client))
}
static PIPE_ID: AtomicUsize = AtomicUsize::new(0);
@ -49,13 +51,17 @@ fn get_pipe_name() -> String {
}
impl Pipe {
pub fn new(io: NamedPipe) -> Self {
Self { io }
}
#[allow(clippy::missing_safety_doc)]
pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe {
Pipe(NamedPipe::from_raw_handle(handle.into_raw()))
Pipe::new(NamedPipe::from_raw_handle(handle.into_raw()))
}
pub fn shutdown(&mut self) -> Result<()> {
self.0.disconnect()
self.io.disconnect()
}
}
@ -64,7 +70,11 @@ impl RecvMsg for Pipe {
// the `ConnectionBuffer` members has been adjusted appropriate by the caller.
fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
assert!(buf.buf.remaining_mut() > 0);
let r = unsafe { self.0.read(buf.buf.bytes_mut()) };
let r = unsafe {
let chunk = buf.buf.chunk_mut();
let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len());
self.io.read(slice)
};
match r {
Ok(n) => unsafe {
buf.buf.advance_mut(n);
@ -80,28 +90,13 @@ impl SendMsg for Pipe {
// `ConnectionBuffer` members based on the size of the successful send operation.
fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
assert!(!buf.buf.is_empty());
let r = self.0.write(&buf.buf[..buf.buf.len()]);
let r = self.io.write(&buf.buf[..buf.buf.len()]);
if let Ok(n) = r {
buf.buf.advance(n);
while let Some(mut handle) = buf.pop_handle() {
handle.mark_sent()
}
}
r
}
}
// Platform-specific wrapper around `BytesMut`.
#[derive(Debug)]
pub struct ConnectionBuffer {
pub buf: BytesMut,
}
impl ConnectionBuffer {
pub fn with_capacity(cap: usize) -> Self {
ConnectionBuffer {
buf: BytesMut::with_capacity(cap),
}
}
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
}

Просмотреть файл

@ -25,8 +25,8 @@ webrender_bindings = { path = "../../../../gfx/webrender_bindings" }
cubeb-coreaudio = { git = "https://github.com/mozilla/cubeb-coreaudio-rs", rev = "44eca95823bb57e964cf7b6d9791ed2ccb4b2108", optional = true }
cubeb-pulse = { git = "https://github.com/mozilla/cubeb-pulse-rs", rev="1f1fe1e08e01a9a534ec7f079702a583a0899ce7", optional = true, features=["pulse-dlopen"] }
cubeb-sys = { version = "0.10", optional = true, features=["gecko-in-tree"] }
audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "85e9839059f4bf8f68130825b8fd02c39a6a51b9", optional = true } # macos (v2) branch
audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "85e9839059f4bf8f68130825b8fd02c39a6a51b9", optional = true } # macos (v2) branch
audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "499b95580c8b276e52bd9757d735249504202e5c", optional = true } # macos (v2) branch
audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "499b95580c8b276e52bd9757d735249504202e5c", optional = true } # macos (v2) branch
audioipc-client = { git = "https://github.com/mozilla/audioipc", rev = "6848974a4a91ebd3e18333c9a96a421d619a2dc8", optional = true }
audioipc-server = { git = "https://github.com/mozilla/audioipc", rev = "6848974a4a91ebd3e18333c9a96a421d619a2dc8", optional = true }
# Force tokio-reactor on an old version to avoid new dependencies of newer versions.