diff --git a/.cargo/config.in b/.cargo/config.in index 5dad42b6104e..e47e7a69c3c5 100644 --- a/.cargo/config.in +++ b/.cargo/config.in @@ -65,7 +65,7 @@ rev = "21c26326f5f45f415c49eac4ba5bc41a2f961321" [source."https://github.com/kinetiknz/audioipc-2"] git = "https://github.com/kinetiknz/audioipc-2" replace-with = "vendored-sources" -rev = "499b95580c8b276e52bd9757d735249504202e5c" +rev = "ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb" [source."https://github.com/jfkthame/mapped_hyph.git"] git = "https://github.com/jfkthame/mapped_hyph.git" diff --git a/Cargo.lock b/Cargo.lock index 3bd936c28dc7..cde03d2d38b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -318,7 +318,7 @@ dependencies = [ [[package]] name = "audioipc2" version = "0.5.0" -source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c" +source = "git+https://github.com/kinetiknz/audioipc-2?rev=ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb#ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb" dependencies = [ "arrayvec", "ashmem", @@ -327,6 +327,7 @@ dependencies = [ "byteorder", "bytes 1.2.1", "cc", + "crossbeam-channel", "cubeb", "error-chain", "iovec", @@ -345,7 +346,7 @@ dependencies = [ [[package]] name = "audioipc2-client" version = "0.5.0" -source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c" +source = "git+https://github.com/kinetiknz/audioipc-2?rev=ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb#ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb" dependencies = [ "audio_thread_priority", "audioipc2", @@ -356,7 +357,7 @@ dependencies = [ [[package]] name = "audioipc2-server" version = "0.5.0" -source = "git+https://github.com/kinetiknz/audioipc-2?rev=499b95580c8b276e52bd9757d735249504202e5c#499b95580c8b276e52bd9757d735249504202e5c" +source = "git+https://github.com/kinetiknz/audioipc-2?rev=ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb#ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb" dependencies = [ "audio_thread_priority", "audioipc2", diff --git a/third_party/rust/audioipc2-client/.cargo-checksum.json b/third_party/rust/audioipc2-client/.cargo-checksum.json index ed7ee847feee..44172d352965 100644 --- a/third_party/rust/audioipc2-client/.cargo-checksum.json +++ b/third_party/rust/audioipc2-client/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"31dc34fae9951183eaed3511cffe3d830d52ba3c046257454f09a06156d0716b","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"17bdf1dfc8d910b745f94d5bc74121850174c716f8a2eb6d1c4b075d42fa5df5","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"064a657c845762be1dbcbbfc18b3f8a51582eb540def8d2ceecf200184ad4f7a","src/stream.rs":"a6c07796e6fe704cfa6baf8b904e7ffe874d3c884d44d4ed307e668dec25452b"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"31dc34fae9951183eaed3511cffe3d830d52ba3c046257454f09a06156d0716b","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"4593aa41ee97b72622b572ad80f0e4c939d8be2ea45fed1f5587f5a109a67735","src/lib.rs":"c4a6797734489280f6b97dd72c9e51a7bd7be4104592eece3929e29d45cbca4a","src/send_recv.rs":"859abe75b521eb4297c84b30423814b5b87f3c7741ad16fe72189212e123e1ac","src/stream.rs":"a6c07796e6fe704cfa6baf8b904e7ffe874d3c884d44d4ed307e668dec25452b"},"package":null} \ No newline at end of file diff --git a/third_party/rust/audioipc2-client/src/context.rs b/third_party/rust/audioipc2-client/src/context.rs index 1e3d7fe7cbea..dc22159a257c 100644 --- a/third_party/rust/audioipc2-client/src/context.rs +++ b/third_party/rust/audioipc2-client/src/context.rs @@ -71,8 +71,7 @@ fn promote_thread(rpc: &rpccore::Proxy) { match get_current_thread_info() { Ok(info) => { let bytes = info.serialize(); - // Don't wait for the response, this is on the callback thread, which must not block. - rpc.call(ServerMessage::PromoteThreadToRealTime(bytes)); + let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes)); } Err(_) => { warn!("Could not remotely promote thread to RT."); diff --git a/third_party/rust/audioipc2-client/src/send_recv.rs b/third_party/rust/audioipc2-client/src/send_recv.rs index b0d41aca2ce1..1134c99a4996 100644 --- a/third_party/rust/audioipc2-client/src/send_recv.rs +++ b/third_party/rust/audioipc2-client/src/send_recv.rs @@ -43,7 +43,7 @@ macro_rules! send_recv { $rpc.call(ServerMessage::$smsg($($a),*)) }); (__recv $resp:expr, $rmsg:ident) => ({ - match $resp.wait() { + match $resp { Ok(ClientMessage::$rmsg) => Ok(()), Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), Ok(m) => { @@ -57,7 +57,7 @@ macro_rules! send_recv { } }); (__recv $resp:expr, $rmsg:ident __result) => ({ - match $resp.wait() { + match $resp { Ok(ClientMessage::$rmsg(v)) => Ok(v), Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), Ok(m) => { diff --git a/third_party/rust/audioipc2-server/.cargo-checksum.json b/third_party/rust/audioipc2-server/.cargo-checksum.json index 4b28668ef946..c474de6303b9 100644 --- a/third_party/rust/audioipc2-server/.cargo-checksum.json +++ b/third_party/rust/audioipc2-server/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"7feb495b23148ecc83ec7f480aefe19c9804a8900cdb4ceb005c049cdce82428","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"d9cc7ca311cceb70acbc63b2190d6205094152e582faaad1b4a6061019f5803f","src/server.rs":"00740854f3e4f64cbeabfdb04d1337a2fdb89122464ea64d23fe1272045aee7d"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"7feb495b23148ecc83ec7f480aefe19c9804a8900cdb4ceb005c049cdce82428","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/lib.rs":"d9cc7ca311cceb70acbc63b2190d6205094152e582faaad1b4a6061019f5803f","src/server.rs":"362ec34c541e43befb95204795622b5a2da036f8e417d524c64eb6c6550d094b"},"package":null} \ No newline at end of file diff --git a/third_party/rust/audioipc2-server/src/server.rs b/third_party/rust/audioipc2-server/src/server.rs index 9533af22dc64..d715c64aada3 100644 --- a/third_party/rust/audioipc2-server/src/server.rs +++ b/third_party/rust/audioipc2-server/src/server.rs @@ -252,14 +252,11 @@ impl ServerStreamCallbacks { return cubeb::ffi::CUBEB_ERROR.try_into().unwrap(); } - let r = self - .data_callback_rpc - .call(CallbackReq::Data { - nframes, - input_frame_size: self.input_frame_size as usize, - output_frame_size: self.output_frame_size as usize, - }) - .wait(); + let r = self.data_callback_rpc.call(CallbackReq::Data { + nframes, + input_frame_size: self.input_frame_size as usize, + output_frame_size: self.output_frame_size as usize, + }); match r { Ok(CallbackResp::Data(frames)) => { @@ -282,8 +279,7 @@ impl ServerStreamCallbacks { trace!("Stream state callback: {:?}", state); let r = self .state_callback_rpc - .call(CallbackReq::State(state.into())) - .wait(); + .call(CallbackReq::State(state.into())); match r { Ok(CallbackResp::State) => {} _ => { @@ -296,8 +292,7 @@ impl ServerStreamCallbacks { trace!("Stream device change callback"); let r = self .device_change_callback_rpc - .call(CallbackReq::DeviceChange) - .wait(); + .call(CallbackReq::DeviceChange); match r { Ok(CallbackResp::DeviceChange) => {} _ => { @@ -347,8 +342,7 @@ impl DeviceCollectionChangeCallback { ); let _ = self .rpc - .call(DeviceCollectionReq::DeviceChange(device_type)) - .wait(); + .call(DeviceCollectionReq::DeviceChange(device_type)); } } diff --git a/third_party/rust/audioipc2/.cargo-checksum.json b/third_party/rust/audioipc2/.cargo-checksum.json index 82f0db292104..8098377aeaff 100644 --- a/third_party/rust/audioipc2/.cargo-checksum.json +++ b/third_party/rust/audioipc2/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"4a6aaffaf15fc11d3c41fc399a6e36a1ac9016f0edd592d4ff4059a2092818af","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"65df9a97c6cdaa3faf72581f04ac289197b0b1797d69d22c1796e957ff1089e2","src/codec.rs":"4e029396765db803201249e90bcf724eb56deed3b2e455822d6673f40550a3e1","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"6d33898f5bc61963797d21b44d36a7ee52d3d0cf13a4f19fa2d59373720eead8","src/lib.rs":"9b107cb52081eeea3fa742d30361db70f7138baa423dfe21d37dcf5087afc338","src/messages.rs":"452362da2cace9a0f2e3134c190ecb6a9997f8be4036cde06643e17c6c238240","src/rpccore.rs":"9fa24cb6d487b436382e35f82d0809ad2b315ce049ebaa767b4f88d3d5637f2e","src/shm.rs":"1d88f19606899e3e477865d6b84bbe3e272f51618a1c2d57b6dab03a4787cde3","src/sys/mod.rs":"e6fa1d260abf093e1f7b50185195e2d3aee0eb8c9774c6f253953b5896d838f3","src/sys/unix/cmsg.rs":"8a27a20383c333c5d033e58a546a530e26b964942a4615793d1ca078c65efb75","src/sys/unix/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/sys/unix/mod.rs":"59835f0d5509940078b1820a54f49fc5514adeb3e45e7d21e3ab917431da2e74","src/sys/unix/msg.rs":"c0103cc058aeb890ab7aa023fcd6d3b9a0135d6b28fdecdec446650957210508","src/sys/windows/mod.rs":"7b1288e42b3ce34c7004b9fe3eeb6d9822c55e2688d3c2a40e55db46a2ca5d76"},"package":null} \ No newline at end of file +{"files":{"Cargo.toml":"8b2d3abbe023360a24d37b306dbec9e8bd0162025d38ca106ebcc8d7abab4039","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"65df9a97c6cdaa3faf72581f04ac289197b0b1797d69d22c1796e957ff1089e2","src/codec.rs":"4e029396765db803201249e90bcf724eb56deed3b2e455822d6673f40550a3e1","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"eda3629e363124c84d5b826dea03f5551a8adad6c8efbc61b98f7d6572fdfa18","src/lib.rs":"9b107cb52081eeea3fa742d30361db70f7138baa423dfe21d37dcf5087afc338","src/messages.rs":"452362da2cace9a0f2e3134c190ecb6a9997f8be4036cde06643e17c6c238240","src/rpccore.rs":"21568946ca59e0cf1cb0dc6254ebda577a014343438a4fde2556a22e44eea2bf","src/shm.rs":"1d88f19606899e3e477865d6b84bbe3e272f51618a1c2d57b6dab03a4787cde3","src/sys/mod.rs":"e6fa1d260abf093e1f7b50185195e2d3aee0eb8c9774c6f253953b5896d838f3","src/sys/unix/cmsg.rs":"8a27a20383c333c5d033e58a546a530e26b964942a4615793d1ca078c65efb75","src/sys/unix/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/sys/unix/mod.rs":"59835f0d5509940078b1820a54f49fc5514adeb3e45e7d21e3ab917431da2e74","src/sys/unix/msg.rs":"c0103cc058aeb890ab7aa023fcd6d3b9a0135d6b28fdecdec446650957210508","src/sys/windows/mod.rs":"7b1288e42b3ce34c7004b9fe3eeb6d9822c55e2688d3c2a40e55db46a2ca5d76"},"package":null} \ No newline at end of file diff --git a/third_party/rust/audioipc2/Cargo.toml b/third_party/rust/audioipc2/Cargo.toml index 13a92b196ac5..17ef5b94e844 100644 --- a/third_party/rust/audioipc2/Cargo.toml +++ b/third_party/rust/audioipc2/Cargo.toml @@ -21,6 +21,7 @@ serde_bytes = "0.11" mio = { version = "0.8", features = ["os-poll", "net", "os-ext"] } slab = "0.4" scopeguard = "1.1.0" +crossbeam-channel = "0.5" [target.'cfg(unix)'.dependencies] iovec = "0.1" diff --git a/third_party/rust/audioipc2/src/ipccore.rs b/third_party/rust/audioipc2/src/ipccore.rs index bfed10a42a87..6e607db2e89b 100644 --- a/third_party/rust/audioipc2/src/ipccore.rs +++ b/third_party/rust/audioipc2/src/ipccore.rs @@ -7,6 +7,7 @@ use std::io::{self, Result}; use std::sync::{mpsc, Arc}; use std::thread; +use crossbeam_channel::{self, Receiver, Sender}; use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker}; use slab::Slab; @@ -53,7 +54,7 @@ enum Request { #[derive(Clone, Debug)] pub struct EventLoopHandle { waker: Arc, - requests_tx: mpsc::Sender, + requests_tx: Sender, } impl EventLoopHandle { @@ -140,8 +141,8 @@ struct EventLoop { waker: Arc, name: String, connections: Slab, - requests_rx: mpsc::Receiver, - requests_tx: mpsc::Sender, + requests_rx: Receiver, + requests_tx: Sender, } const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow. @@ -151,7 +152,7 @@ impl EventLoop { fn new(name: String) -> Result { let poll = Poll::new()?; let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = crossbeam_channel::bounded(EVENT_LOOP_INITIAL_CLIENTS); let eventloop = EventLoop { poll, events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION), @@ -824,7 +825,7 @@ mod test { // RPC message from client to server. let response = client_proxy.call(TestServerMessage::TestRequest); - let response = response.wait().expect("client response"); + let response = response.expect("client response"); assert_eq!(response, TestClientMessage::TestResponse); // Explicit shutdown. @@ -840,7 +841,7 @@ mod test { // RPC message from client to server. let response = client_proxy.call(TestServerMessage::TestRequest); - let response = response.wait().expect("client response"); + let response = response.expect("client response"); assert_eq!(response, TestClientMessage::TestResponse); // Explicit shutdown. @@ -855,7 +856,7 @@ mod test { drop(server); let response = client_proxy.call(TestServerMessage::TestRequest); - response.wait().expect_err("sending on closed channel"); + response.expect_err("sending on closed channel"); } #[test] @@ -865,7 +866,7 @@ mod test { drop(client); let response = client_proxy.call(TestServerMessage::TestRequest); - response.wait().expect_err("sending on a closed channel"); + response.expect_err("sending on a closed channel"); } #[test] diff --git a/third_party/rust/audioipc2/src/rpccore.rs b/third_party/rust/audioipc2/src/rpccore.rs index f208ba3b1f7d..bb0a0bc7fe01 100644 --- a/third_party/rust/audioipc2/src/rpccore.rs +++ b/third_party/rust/audioipc2/src/rpccore.rs @@ -3,11 +3,14 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details +use std::collections::VecDeque; use std::io::{self, Result}; use std::mem::ManuallyDrop; -use std::{collections::VecDeque, sync::mpsc}; +use std::sync::{Arc, Mutex, Weak}; +use crossbeam_channel::{self, Receiver, Sender}; use mio::Token; +use slab::Slab; use crate::ipccore::EventLoopHandle; @@ -41,21 +44,57 @@ pub trait Server { fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage; } -// RPC Client Proxy implementation. ProxyRequest's Sender is connected to ProxyReceiver's Receiver, -// allowing the ProxyReceiver to wait on a response from the proxy. -type ProxyRequest = (Request, mpsc::Sender); -type ProxyReceiver = mpsc::Receiver>; +// RPC Client Proxy implementation. +type ProxyKey = usize; +type ProxyRequest = (ProxyKey, Request); -// Each RPC Proxy `call` returns a blocking waitable ProxyResponse. -// `wait` produces the response received over RPC from the associated -// Proxy `call`. -pub struct ProxyResponse { - inner: mpsc::Receiver, +// RPC Proxy that may be `clone`d for use by multiple owners/threads. +// A Proxy `call` arranges for the supplied request to be transmitted +// to the associated Server via RPC and blocks awaiting the response +// via `response_rx`. +// A Proxy is associated with the ClientHandler via `handler_tx` to send requests, +// `response_rx` to receive responses, and uses `key` to identify the Proxy with +// the sending side of `response_rx` ClientHandler. +// Each Proxy is registered with the ClientHandler on initialization via the +// ProxyManager and unregistered when dropped. +// A ClientHandler normally lives until the last Proxy is dropped, but if the ClientHandler +// encounters an internal error, `response_tx` will be closed and `proxy_mgr` can +// no longer be upgraded to register new Proxies. +#[derive(Debug)] +pub struct Proxy { + handle: Option<(EventLoopHandle, Token)>, + key: ProxyKey, + response_rx: Receiver, + handler_tx: ManuallyDrop>>, + proxy_mgr: Weak>, } -impl ProxyResponse { - pub fn wait(&self) -> Result { - match self.inner.recv() { +impl Proxy { + fn new( + handler_tx: Sender>, + proxy_mgr: Weak>, + ) -> Self { + let (tx, rx) = crossbeam_channel::bounded(1); + Self { + handle: None, + key: proxy_mgr.upgrade().unwrap().register_proxy(tx), + response_rx: rx, + handler_tx: ManuallyDrop::new(handler_tx), + proxy_mgr, + } + } + + pub fn call(&self, request: Request) -> Result { + match self.handler_tx.send((self.key, request)) { + Ok(_) => self.wake_connection(), + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "proxy send error", + )) + } + } + match self.response_rx.recv() { Ok(resp) => Ok(resp), Err(_) => Err(std::io::Error::new( std::io::ErrorKind::Other, @@ -63,27 +102,6 @@ impl ProxyResponse { )), } } -} - -// RPC Proxy that may be `clone`d for use by multiple owners/threads. -// A Proxy `call` arranges for the supplied request to be transmitted -// to the associated Server via RPC. The response can be retrieved by -// `wait`ing on the returned ProxyResponse. -#[derive(Debug)] -pub struct Proxy { - handle: Option<(EventLoopHandle, Token)>, - tx: ManuallyDrop>>, -} - -impl Proxy { - pub fn call(&self, request: Request) -> ProxyResponse { - let (tx, rx) = mpsc::channel(); - match self.tx.send((request, tx)) { - Ok(_) => self.wake_connection(), - Err(e) => debug!("Proxy::call error={:?}", e), - } - ProxyResponse { inner: rx } - } pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) { self.handle = Some((handle, token)); @@ -100,9 +118,13 @@ impl Proxy { impl Clone for Proxy { fn clone(&self) -> Self { - Proxy { + let (tx, rx) = crossbeam_channel::bounded(1); + Self { handle: self.handle.clone(), - tx: self.tx.clone(), + key: self.proxy_mgr.upgrade().unwrap().register_proxy(tx), + response_rx: rx, + handler_tx: self.handler_tx.clone(), + proxy_mgr: self.proxy_mgr.clone(), } } } @@ -110,15 +132,56 @@ impl Clone for Proxy { impl Drop for Proxy { fn drop(&mut self) { trace!("Proxy drop, waking EventLoop"); + if let Some(mgr) = self.proxy_mgr.upgrade() { + mgr.unregister_proxy(self.key) + } // Must drop Sender before waking the connection, otherwise // the wake may be processed before Sender is closed. - unsafe { ManuallyDrop::drop(&mut self.tx) } + unsafe { + ManuallyDrop::drop(&mut self.handler_tx); + } if self.handle.is_some() { self.wake_connection() } } } +const RPC_CLIENT_INITIAL_PROXIES: usize = 32; // Initial proxy pre-allocation per client. + +// Manage the Sender side of a ClientHandler's Proxies. Each Proxy registers itself with +// the manager on initialization. +#[derive(Debug)] +struct ProxyManager { + proxies: Mutex>>, +} + +impl ProxyManager { + fn new() -> Self { + Self { + proxies: Mutex::new(Slab::with_capacity(RPC_CLIENT_INITIAL_PROXIES)), + } + } + + // Register a Proxy's response Sender, returning a unique ID identifying + // the Proxy to the ClientHandler. + fn register_proxy(&self, tx: Sender) -> ProxyKey { + let mut proxies = self.proxies.lock().unwrap(); + let entry = proxies.vacant_entry(); + let key = entry.key(); + entry.insert(tx); + key + } + + fn unregister_proxy(&self, key: ProxyKey) { + let _ = self.proxies.lock().unwrap().remove(key); + } + + // Deliver ClientHandler's Response to the Proxy associated with `key`. + fn deliver(&self, key: ProxyKey, resp: Response) { + let _ = self.proxies.lock().unwrap()[key].send(resp); + } +} + // Client-specific Handler implementation. // The IPC EventLoop Driver calls this to execute client-specific // RPC handling. Serialized messages sent via a Proxy are queued @@ -127,8 +190,26 @@ impl Drop for Proxy { // trigger response completion by sending the response via a channel // connected to a ProxyResponse. pub(crate) struct ClientHandler { - messages: ProxyReceiver, - in_flight: VecDeque>, + messages: Receiver>, + // Proxies hold a Weak to register on initialization. + // When ClientHandler drops, any Proxies blocked on a response will + // error due to the Sender closing. + proxies: Arc>, + in_flight: VecDeque, +} + +impl ClientHandler { + fn new(rx: Receiver>) -> ClientHandler { + ClientHandler:: { + messages: rx, + proxies: Arc::new(ProxyManager::new()), + in_flight: VecDeque::with_capacity(RPC_CLIENT_INITIAL_PROXIES), + } + } + + fn proxy_manager(&self) -> Weak::ClientMessage>> { + Arc::downgrade(&self.proxies) + } } impl Handler for ClientHandler { @@ -137,8 +218,9 @@ impl Handler for ClientHandler { fn consume(&mut self, response: Self::In) -> Result<()> { trace!("ClientHandler::consume"); - if let Some(complete) = self.in_flight.pop_front() { - drop(complete.send(response)); + // `proxy` identifies the waiting Proxy expecting `response`. + if let Some(proxy) = self.in_flight.pop_front() { + self.proxies.deliver(proxy, response); } else { return Err(std::io::Error::new( std::io::ErrorKind::Other, @@ -154,12 +236,12 @@ impl Handler for ClientHandler { // Try to get a new message match self.messages.try_recv() { - Ok((request, response_tx)) => { + Ok((proxy, request)) => { trace!(" --> received request"); - self.in_flight.push_back(response_tx); + self.in_flight.push_back(proxy); Ok(Some(request)) } - Err(mpsc::TryRecvError::Empty) => { + Err(crossbeam_channel::TryRecvError::Empty) => { trace!(" --> no request"); Ok(None) } @@ -173,19 +255,12 @@ impl Handler for ClientHandler { pub(crate) fn make_client( ) -> (ClientHandler, Proxy) { - let (tx, rx) = mpsc::channel(); + let (tx, rx) = crossbeam_channel::bounded(RPC_CLIENT_INITIAL_PROXIES); - let handler = ClientHandler:: { - messages: rx, - in_flight: VecDeque::with_capacity(32), - }; + let handler = ClientHandler::new(rx); + let proxy_mgr = handler.proxy_manager(); - let proxy = Proxy { - handle: None, - tx: ManuallyDrop::new(tx), - }; - - (handler, proxy) + (handler, Proxy::new(tx, proxy_mgr)) } // Server-specific Handler implementation. @@ -226,9 +301,11 @@ impl Handler for ServerHandler { } } +const RPC_SERVER_INITIAL_CLIENTS: usize = 32; // Initial client allocation per server. + pub(crate) fn make_server(server: S) -> ServerHandler { ServerHandler:: { server, - in_flight: VecDeque::with_capacity(32), + in_flight: VecDeque::with_capacity(RPC_SERVER_INITIAL_CLIENTS), } } diff --git a/toolkit/library/rust/shared/Cargo.toml b/toolkit/library/rust/shared/Cargo.toml index 994e545f8a58..e025de9e3eb3 100644 --- a/toolkit/library/rust/shared/Cargo.toml +++ b/toolkit/library/rust/shared/Cargo.toml @@ -25,8 +25,8 @@ webrender_bindings = { path = "../../../../gfx/webrender_bindings" } cubeb-coreaudio = { git = "https://github.com/mozilla/cubeb-coreaudio-rs", rev = "44eca95823bb57e964cf7b6d9791ed2ccb4b2108", optional = true } cubeb-pulse = { git = "https://github.com/mozilla/cubeb-pulse-rs", rev="1f1fe1e08e01a9a534ec7f079702a583a0899ce7", optional = true, features=["pulse-dlopen"] } cubeb-sys = { version = "0.10", optional = true, features=["gecko-in-tree"] } -audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "499b95580c8b276e52bd9757d735249504202e5c", optional = true } # macos (v2) branch -audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "499b95580c8b276e52bd9757d735249504202e5c", optional = true } # macos (v2) branch +audioipc2-client = { git = "https://github.com/kinetiknz/audioipc-2", rev = "ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb", optional = true } # macos (v2) branch +audioipc2-server = { git = "https://github.com/kinetiknz/audioipc-2", rev = "ea7cabf8c9dc051a52ffb6cd7d2564b29b7428eb", optional = true } # macos (v2) branch audioipc-client = { git = "https://github.com/mozilla/audioipc", rev = "fb7a2b12ced3b43e6a268621989c6191d1ed7e39", optional = true } audioipc-server = { git = "https://github.com/mozilla/audioipc", rev = "fb7a2b12ced3b43e6a268621989c6191d1ed7e39", optional = true } # Force tokio-reactor on an old version to avoid new dependencies of newer versions.