diff --git a/Cargo.lock b/Cargo.lock index c046caa0..90b8adbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,8 +184,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d" dependencies = [ "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", ] [[package]] @@ -262,17 +262,6 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" -[[package]] -name = "bincode" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e103c8b299b28a9c6990458b7013dc4a8356a9b854c51b9883241f5866fac36e" -dependencies = [ - "byteorder", - "num-traits 0.1.43", - "serde", -] - [[package]] name = "bincode" version = "1.3.1" @@ -401,12 +390,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" -[[package]] -name = "case" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88b166b48e29667f5443df64df3c61dc07dc2b1a0b0d231800e07f09a33ecc1" - [[package]] name = "cc" version = "1.0.66" @@ -436,7 +419,7 @@ checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" dependencies = [ "libc", "num-integer", - "num-traits 0.2.14", + "num-traits", "time", "winapi 0.3.9", ] @@ -683,17 +666,6 @@ dependencies = [ "libc", ] -[[package]] -name = "derive-error" -version = "0.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "629f1bb3abce791912ca85a24676fff54464f7deb122906adabc90fb96e876d3" -dependencies = [ - "case", - "quote 0.3.15", - "syn 0.11.11", -] - [[package]] name = "difference" version = "2.0.0" @@ -812,8 +784,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" dependencies = [ "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", "synstructure", ] @@ -856,7 +828,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" dependencies = [ - "num-traits 0.2.14", + "num-traits", ] [[package]] @@ -1000,8 +972,8 @@ checksum = "0f8719ca0e1f3c5e34f3efe4570ef2c0610ca6da85ae7990d472e9cbfba13664" dependencies = [ "proc-macro-hack", "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", ] [[package]] @@ -1698,7 +1670,7 @@ checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" dependencies = [ "autocfg 1.0.1", "num-integer", - "num-traits 0.2.14", + "num-traits", ] [[package]] @@ -1708,16 +1680,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" dependencies = [ "autocfg 1.0.1", - "num-traits 0.2.14", -] - -[[package]] -name = "num-traits" -version = "0.1.43" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31" -dependencies = [ - "num-traits 0.2.14", + "num-traits", ] [[package]] @@ -1890,6 +1853,26 @@ dependencies = [ "unicase 1.4.2", ] +[[package]] +name = "pin-project" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "918192b5c59119d51e0cd221f4d49dde9112824ba717369e903c97d076083d0f" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.1.11" @@ -1980,7 +1963,7 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" dependencies = [ - "unicode-xid 0.2.1", + "unicode-xid", ] [[package]] @@ -2011,12 +1994,6 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" -[[package]] -name = "quote" -version = "0.3.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" - [[package]] name = "quote" version = "1.0.8" @@ -2411,9 +2388,10 @@ dependencies = [ "assert_cmd", "atty", "base64 0.13.0", - "bincode 1.3.1", + "bincode", "blake3", "byteorder", + "bytes 0.5.6", "cc", "chrono", "clap", @@ -2464,15 +2442,16 @@ dependencies = [ "syslog", "tar", "tempfile", + "tokio 0.2.24", "tokio-compat", "tokio-io", "tokio-named-pipes", "tokio-process", "tokio-reactor", - "tokio-serde-bincode", - "tokio-tcp", + "tokio-serde", "tokio-timer", "tokio-uds", + "tokio-util", "toml", "tower", "untrusted", @@ -2570,8 +2549,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df" dependencies = [ "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", ] [[package]] @@ -2646,7 +2625,7 @@ checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b" dependencies = [ "chrono", "num-bigint", - "num-traits 0.2.14", + "num-traits", ] [[package]] @@ -2727,17 +2706,6 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2" -[[package]] -name = "syn" -version = "0.11.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" -dependencies = [ - "quote 0.3.15", - "synom", - "unicode-xid 0.0.4", -] - [[package]] name = "syn" version = "1.0.58" @@ -2745,17 +2713,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc60a3d73ea6594cd712d830cc1f0390fd71542d8c8cd24e70cc54cdfd5e05d5" dependencies = [ "proc-macro2", - "quote 1.0.8", - "unicode-xid 0.2.1", -] - -[[package]] -name = "synom" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" -dependencies = [ - "unicode-xid 0.0.4", + "quote", + "unicode-xid", ] [[package]] @@ -2765,9 +2724,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" dependencies = [ "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", - "unicode-xid 0.2.1", + "quote", + "syn", + "unicode-xid", ] [[package]] @@ -2863,8 +2822,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1" dependencies = [ "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", ] [[package]] @@ -3103,26 +3062,13 @@ dependencies = [ [[package]] name = "tokio-serde" -version = "0.1.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "894168193c4f80862a2244ff953b69145a9961a9efba39500e0970b083d0649c" +checksum = "ebdd897b01021779294eb09bb3b52b6e11b0747f9f7e333a84bef532b656de99" dependencies = [ - "bytes 0.4.12", - "futures 0.1.30", -] - -[[package]] -name = "tokio-serde-bincode" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e35c8d60a5e87cfb30dd562a309e56f8a6d36617b0a76c87f04d5466607ca8" -dependencies = [ - "bincode 0.8.0", - "bytes 0.4.12", - "derive-error", - "futures 0.1.30", - "serde", - "tokio-serde", + "bytes 0.5.6", + "futures 0.3.9", + "pin-project", ] [[package]] @@ -3253,11 +3199,11 @@ dependencies = [ [[package]] name = "tower" -version = "0.1.1" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc72f33b6a72c75c9df0037afce313018bae845f0ec7fdb9201b8768427a917f" +checksum = "fd3169017c090b7a28fce80abaad0ab4f5566423677c9331bb320af7e49cfe62" dependencies = [ - "futures 0.1.30", + "futures-core", "tower-buffer", "tower-discover", "tower-layer", @@ -3271,13 +3217,13 @@ dependencies = [ [[package]] name = "tower-buffer" -version = "0.1.2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c7b83e1ccf5b23dd109dd6ae2c07b8e2beec7a51a21f29da2dba576317370e0" +checksum = "c4887dc2a65d464c8b9b66e0e4d51c2fd6cf5b3373afc72805b0a60bce00446a" dependencies = [ - "futures 0.1.30", - "tokio-executor", - "tokio-sync", + "futures-core", + "pin-project", + "tokio 0.2.24", "tower-layer", "tower-service", "tracing", @@ -3285,91 +3231,101 @@ dependencies = [ [[package]] name = "tower-discover" -version = "0.1.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73a7632286f78164d65d18fd0e570307acde9362489aa5c8c53e6315cc2bde47" +checksum = "0f6b5000c3c54d269cc695dff28136bb33d08cbf1df2c48129e143ab65bf3c2a" dependencies = [ - "futures 0.1.30", + "futures-core", + "pin-project", "tower-service", ] [[package]] name = "tower-layer" -version = "0.1.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ddf07e10c07dcc8f41da6de036dc66def1a85b70eb8a385159e3908bb258328" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + +[[package]] +name = "tower-limit" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92c3040c5dbed68abffaa0d4517ac1a454cd741044f33ab0eefab6b8d1361404" dependencies = [ - "futures 0.1.30", + "futures-core", + "pin-project", + "tokio 0.2.24", + "tower-layer", + "tower-load", "tower-service", ] [[package]] -name = "tower-limit" -version = "0.1.3" +name = "tower-load" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c21ba835a08fd54b63cd91ae0548a7b6e2a91075147dfa3dc8e1a940c1b6f18f" +checksum = "8cc79fc3afd07492b7966d7efa7c6c50f8ed58d768a6075dd7ae6591c5d2017b" dependencies = [ - "futures 0.1.30", - "tokio-sync", - "tokio-timer", - "tower-layer", + "futures-core", + "log 0.4.11", + "pin-project", + "tokio 0.2.24", + "tower-discover", "tower-service", - "tracing", ] [[package]] name = "tower-load-shed" -version = "0.1.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04fbaf5bfb63d84204db87b9b2aeec61549613f2bbb8706dcc36f5f3ea8cd769" +checksum = "9f021e23900173dc315feb4b6922510dae3e79c689b74c089112066c11f0ae4e" dependencies = [ - "futures 0.1.30", + "futures-core", + "pin-project", "tower-layer", "tower-service", ] [[package]] name = "tower-retry" -version = "0.1.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09e80588125061f276ed2a7b0939988b411e570a2dbb2965b1382ef4f71036f7" +checksum = "e6727956aaa2f8957d4d9232b308fe8e4e65d99db30f42b225646e86c9b6a952" dependencies = [ - "futures 0.1.30", - "tokio-timer", + "futures-core", + "pin-project", + "tokio 0.2.24", "tower-layer", "tower-service", ] [[package]] name = "tower-service" -version = "0.2.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc0c98637d23732f8de6dfd16494c9f1559c3b9e20b4a46462c8f9b9e827bfa" -dependencies = [ - "futures 0.1.30", -] +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tower-timeout" -version = "0.1.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c06bbc2fbd056f810940a8c6f0cc194557d36da3c22999a755a7a6612447da9" +checksum = "127b8924b357be938823eaaec0608c482d40add25609481027b96198b2e4b31e" dependencies = [ - "futures 0.1.30", - "tokio-timer", + "pin-project", + "tokio 0.2.24", "tower-layer", "tower-service", ] [[package]] name = "tower-util" -version = "0.1.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4792342fac093db5d2558655055a89a04ca909663467a4310c7739d9f8b64698" +checksum = "d1093c19826d33807c72511e68f73b4a0469a3f22c2bd5f7d5212178b4b89674" dependencies = [ - "futures 0.1.30", - "tokio-io", - "tower-layer", + "futures-core", + "futures-util", + "pin-project", "tower-service", ] @@ -3393,8 +3349,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80e0ccfc3378da0cce270c946b676a376943f5cd16aeba64568e7939806f4ada" dependencies = [ "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", ] [[package]] @@ -3484,12 +3440,6 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" -[[package]] -name = "unicode-xid" -version = "0.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" - [[package]] name = "unicode-xid" version = "0.2.1" @@ -3679,8 +3629,8 @@ dependencies = [ "lazy_static", "log 0.4.11", "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", "wasm-bindgen-shared", ] @@ -3702,7 +3652,7 @@ version = "0.2.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a6ac8995ead1f084a8dea1e65f194d0973800c7f571f6edd70adf06ecf77084" dependencies = [ - "quote 1.0.8", + "quote", "wasm-bindgen-macro-support", ] @@ -3713,8 +3663,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a48c72f299d80557c7c62e37e7225369ecc0c963964059509fbafe917c7549" dependencies = [ "proc-macro2", - "quote 1.0.8", - "syn 1.0.58", + "quote", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index 13cbf696..d97aa144 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ base64 = "0.13" bincode = "1" blake3 = "0.3" byteorder = "1.0" +bytes = "0.5" chrono = { version = "0.4", optional = true } clap = "2.23.0" counted-array = "0.1" @@ -69,12 +70,13 @@ serde_json = "1.0" strip-ansi-escapes = "0.1" tar = "0.4" tempfile = "3" +tokio = { version = "0.2", features = ["tcp"] } tokio-compat = "0.1" tokio-io = "0.1" tokio-process = "0.2" -tokio-serde-bincode = "0.1" -tower = "0.1" -tokio-tcp = "0.1" +tokio-serde = "0.6" +tokio-util = { version = "0.3", features = ["codec"] } +tower = "0.3" tokio-timer = "0.2" toml = "0.5" untrusted = { version = "0.7", optional = true } diff --git a/src/errors.rs b/src/errors.rs index bc33f963..bb90d06c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -67,6 +67,7 @@ pub type Result = anyhow::Result; pub type SFuture = Box>; pub type SFutureSend = Box + Send>; +pub type SFutureStd = Box>>; pub trait FutureContext { fn fcontext(self, context: C) -> SFuture @@ -105,7 +106,7 @@ macro_rules! ftry { ($e:expr) => { match $e { Ok(v) => v, - Err(e) => return Box::new($crate::futures::future::err(e.into())) as SFuture<_>, + Err(e) => return Box::new(futures::future::err(e.into())) as SFuture<_>, } }; } @@ -115,7 +116,7 @@ macro_rules! ftry_send { ($e:expr) => { match $e { Ok(v) => v, - Err(e) => return Box::new($crate::futures::future::err(e)) as SFutureSend<_>, + Err(e) => return Box::new(futures::future::err(e)) as SFutureSend<_>, } }; } diff --git a/src/lib.rs b/src/lib.rs index 40602c07..36328aa4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,8 +20,6 @@ extern crate clap; #[macro_use] extern crate counted_array; -#[macro_use] -extern crate futures; #[cfg(feature = "jsonwebtoken")] use jsonwebtoken as jwt; #[macro_use] diff --git a/src/server.rs b/src/server.rs index 7364c3e8..56f709bb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// For tokio_io::codec::length_delimited::Framed; -#![allow(deprecated)] - use crate::cache::{storage_from_config, Storage}; use crate::compiler::{ get_compiler_info, CacheControl, CompileResult, Compiler, CompilerArguments, CompilerHasher, @@ -30,11 +27,11 @@ use crate::protocol::{Compile, CompileFinished, CompileResponse, Request, Respon use crate::util; #[cfg(feature = "dist-client")] use anyhow::Context as _; +use bytes::{buf::ext::BufMutExt, Bytes, BytesMut}; use filetime::FileTime; -use futures::sync::mpsc; -use futures::{future, stream, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; -use futures_03::compat::Compat; +use futures::Future as _; use futures_03::executor::ThreadPool; +use futures_03::{channel::mpsc, compat::*, future, prelude::*, stream}; use number_prefix::NumberPrefix; use std::cell::RefCell; use std::collections::HashMap; @@ -42,6 +39,7 @@ use std::env; use std::ffi::{OsStr, OsString}; use std::fs::metadata; use std::io::{self, Write}; +use std::marker::Unpin; #[cfg(feature = "dist-client")] use std::mem; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; @@ -52,17 +50,18 @@ use std::rc::Rc; use std::sync::Arc; #[cfg(feature = "dist-client")] use std::sync::Mutex; -use std::task::{Context, Waker}; +use std::task::{Context, Poll, Waker}; use std::time::Duration; use std::time::Instant; use std::u64; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + net::TcpListener, + time::{self, delay_for, Delay}, +}; use tokio_compat::runtime::current_thread::Runtime; -use tokio_io::codec::length_delimited; -use tokio_io::codec::length_delimited::Framed; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_serde_bincode::{ReadBincode, WriteBincode}; -use tokio_tcp::TcpListener; -use tokio_timer::{Delay, Timeout}; +use tokio_serde::Framed; +use tokio_util::codec::{length_delimited, LengthDelimitedCodec}; use tower::Service; use crate::errors::*; @@ -410,7 +409,7 @@ pub fn start_server(config: &Config, port: u16) -> Result<()> { let port = srv.port(); info!("server started, listening on port {}", port); notify_server_startup(¬ify, ServerStartup::Ok { port })?; - srv.run(future::empty::<(), ()>())?; + srv.run(future::pending::<()>())?; Ok(()) } Err(e) => { @@ -442,13 +441,13 @@ impl SccacheServer { pub fn new( port: u16, pool: ThreadPool, - runtime: Runtime, + mut runtime: Runtime, client: Client, dist_client: DistClientContainer, storage: Arc, ) -> Result> { let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port); - let listener = TcpListener::bind(&SocketAddr::V4(addr))?; + let listener = runtime.block_on_std(TcpListener::bind(&SocketAddr::V4(addr)))?; // Prepare the service which we'll use to service all incoming TCP // connections. @@ -505,13 +504,9 @@ impl SccacheServer { where F: Future, { - self._run(Box::new(shutdown.then(|_| Ok(())))) - } - - fn _run<'a>(self, shutdown: Box + 'a>) -> io::Result<()> { let SccacheServer { mut runtime, - listener, + mut listener, rx, service, timeout, @@ -520,14 +515,20 @@ impl SccacheServer { // Create our "server future" which will simply handle all incoming // connections in separate tasks. - let server = listener.incoming().for_each(move |socket| { - trace!("incoming connection"); - tokio_compat::runtime::current_thread::TaskExecutor::current() - .spawn_local(Box::new(service.clone().bind(socket).map_err(|err| { - error!("{}", err); - }))) - .unwrap(); - Ok(()) + let server = listener.incoming().try_for_each(move |socket| { + let service = service.clone(); + async move { + trace!("incoming connection"); + tokio_compat::runtime::current_thread::TaskExecutor::current() + .spawn_local(Box::new( + Box::pin(service.bind(socket).map_err(|err| { + error!("{}", err); + })) + .compat(), + )) + .unwrap(); + Ok(()) + } }); // Right now there's a whole bunch of ways to shut down this server for @@ -542,35 +543,32 @@ impl SccacheServer { // inactivity, and this is then select'd with the `shutdown` future // passed to this function. - let shutdown = shutdown.map(|a| { + let shutdown = shutdown.map(|_| { info!("shutting down due to explicit signal"); - a }); let mut futures = vec![ - Box::new(server) as Box>, - Box::new( - shutdown - .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown signal failed")), - ), + Box::pin(server) as Pin>>, + Box::pin(shutdown.map(Ok)), ]; - let shutdown_idle = ShutdownOrInactive { - rx, - timeout: if timeout != Duration::new(0, 0) { - Some(Delay::new(Instant::now() + timeout)) - } else { - None - }, - timeout_dur: timeout, - }; - futures.push(Box::new(shutdown_idle.map(|a| { + futures.push(Box::pin(async { + ShutdownOrInactive { + rx, + timeout: if timeout != Duration::new(0, 0) { + Some(delay_for(timeout)) + } else { + None + }, + timeout_dur: timeout, + } + .await; info!("shutting down due to being idle or request"); - a - }))); + Ok(()) + })); - let server = future::select_all(futures); - runtime.block_on(server).map_err(|p| p.0)?; + let server = future::select_all(futures).map(|t| t.0); + runtime.block_on_std(server)?; info!( "moving into the shutdown phase now, waiting at most 10 seconds \ @@ -585,14 +583,13 @@ impl SccacheServer { // Note that we cap the amount of time this can take, however, as we // don't want to wait *too* long. runtime - .block_on(Timeout::new(Compat::new(wait), Duration::new(30, 0))) - .map_err(|e| { - if e.is_inner() { - e.into_inner().unwrap() - } else { - io::Error::new(io::ErrorKind::Other, e) - } - })?; + .block_on_std(async { + time::timeout(Duration::new(30, 0), wait) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .unwrap_or_else(|e| Err(io::Error::new(io::ErrorKind::Other, e))) + }) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; info!("ok, fully shutting down now"); @@ -685,13 +682,13 @@ pub enum ServerMessage { Shutdown, } -impl Service for SccacheService +impl Service for Arc> where C: CommandCreatorSync + 'static, { type Response = SccacheResponse; type Error = Error; - type Future = SFuture; + type Future = Pin>>>; fn call(&mut self, req: SccacheRequest) -> Self::Future { trace!("handle_client"); @@ -701,44 +698,60 @@ where // that every message is received. drop(self.tx.clone().start_send(ServerMessage::Request)); - let res: SFuture = match req.into_inner() { - Request::Compile(compile) => { - debug!("handle_client: compile"); - self.stats.borrow_mut().compile_requests += 1; - return self.handle_compile(compile); + let self_ = self.clone(); + Box::pin(async move { + match req.into_inner() { + Request::Compile(compile) => { + debug!("handle_client: compile"); + self_.stats.borrow_mut().compile_requests += 1; + self_.handle_compile(compile).await + } + Request::GetStats => { + debug!("handle_client: get_stats"); + self_ + .get_info() + .await + .map(|i| Response::Stats(Box::new(i))) + .map(Message::WithoutBody) + } + Request::DistStatus => { + debug!("handle_client: dist_status"); + self_ + .get_dist_status() + .await + .map(Response::DistStatus) + .map(Message::WithoutBody) + } + Request::ZeroStats => { + debug!("handle_client: zero_stats"); + self_.zero_stats(); + self_ + .get_info() + .await + .map(|i| Response::Stats(Box::new(i))) + .map(Message::WithoutBody) + } + Request::Shutdown => { + debug!("handle_client: shutdown"); + let mut tx = self_.tx.clone(); + future::try_join( + async { + let _ = tx.send(ServerMessage::Shutdown).await; + Ok(()) + }, + self_.get_info(), + ) + .await + .map(move |(_, info)| { + Message::WithoutBody(Response::ShuttingDown(Box::new(info))) + }) + } } - Request::GetStats => { - debug!("handle_client: get_stats"); - Box::new(self.get_info().map(|i| Response::Stats(Box::new(i)))) - } - Request::DistStatus => { - debug!("handle_client: dist_status"); - Box::new(self.get_dist_status().map(Response::DistStatus)) - } - Request::ZeroStats => { - debug!("handle_client: zero_stats"); - self.zero_stats(); - Box::new(self.get_info().map(|i| Response::Stats(Box::new(i)))) - } - Request::Shutdown => { - debug!("handle_client: shutdown"); - let future = self - .tx - .clone() - .send(ServerMessage::Shutdown) - .then(|_| Ok(())); - let info_future = self.get_info(); - return Box::new(future.join(info_future).map(move |(_, info)| { - Message::WithoutBody(Response::ShuttingDown(Box::new(info))) - })); - } - }; - - Box::new(res.map(Message::WithoutBody)) + }) } - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } @@ -767,9 +780,9 @@ where } } - fn bind(mut self, socket: T) -> impl Future + fn bind(self, socket: T) -> impl Future> where - T: AsyncRead + AsyncWrite + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, { let mut builder = length_delimited::Builder::new(); if let Ok(max_frame_length_str) = env::var("SCCACHE_MAX_FRAME_LENGTH") { @@ -782,56 +795,53 @@ where let io = builder.new_framed(socket); let (sink, stream) = SccacheTransport { - inner: WriteBincode::new(ReadBincode::new(io)), + inner: Framed::new(io.sink_err_into().err_into(), BincodeCodec), } .split(); - let sink = sink.sink_from_err::(); + let sink = sink.sink_err_into::(); + let mut self_ = Arc::new(self); stream - .from_err::() - .and_then(move |input| self.call(input)) - .and_then(|message| { - let f: Box> = match message { - Message::WithoutBody(message) => Box::new(stream::once(Ok(Frame::Message { - message, - body: false, - }))), - Message::WithBody(message, body) => Box::new( - stream::once(Ok(Frame::Message { - message, - body: true, - })) - .chain(Compat::new(body).map(|chunk| Frame::Body { chunk: Some(chunk) })) - .chain(stream::once(Ok(Frame::Body { chunk: None }))), + .err_into::() + .and_then(move |input| self_.call(input)) + .and_then(|message| async move { + let f: Pin>> = match message { + Message::WithoutBody(message) => { + Box::pin(stream::once(async { Ok(Frame::Message { message }) })) + } + Message::WithBody(message, body) => Box::pin( + stream::once(async { Ok(Frame::Message { message }) }) + .chain(body.map_ok(|chunk| Frame::Body { chunk: Some(chunk) })) + .chain(stream::once(async { Ok(Frame::Body { chunk: None }) })), ), }; - Ok(f.from_err::()) + Ok(f.err_into::()) }) - .flatten() + .try_flatten() .forward(sink) - .map(|_| ()) + .map_ok(|_| ()) } /// Get dist status. - fn get_dist_status(&self) -> SFuture { - f_ok(self.dist_client.get_status()) + async fn get_dist_status(&self) -> Result { + Ok(self.dist_client.get_status()) } /// Get info and stats about the cache. - fn get_info(&self) -> SFuture { + async fn get_info(&self) -> Result { let stats = self.stats.borrow().clone(); let cache_location = self.storage.location(); - Box::new( - self.storage - .current_size() - .join(self.storage.max_size()) - .map(move |(cache_size, max_cache_size)| ServerInfo { - stats, - cache_location, - cache_size, - max_cache_size, - }), + future::try_join( + self.storage.current_size().compat(), + self.storage.max_size().compat(), ) + .await + .map(move |(cache_size, max_cache_size)| ServerInfo { + stats, + cache_location, + cache_size, + max_cache_size, + }) } /// Zero stats about the cache. @@ -844,27 +854,25 @@ where /// This will handle a compile request entirely, generating a response with /// the inital information and an optional body which will eventually /// contain the results of the compilation. - fn handle_compile(&self, compile: Compile) -> SFuture { + async fn handle_compile(&self, compile: Compile) -> Result { let exe = compile.exe; let cmd = compile.args; let cwd: PathBuf = compile.cwd.into(); let env_vars = compile.env_vars; let me = self.clone(); - Box::new( - self.compiler_info(exe.into(), cwd.clone(), &env_vars) - .map(move |info| me.check_compiler(info, cmd, cwd, env_vars)), - ) + let info = self.compiler_info(exe.into(), cwd.clone(), &env_vars).await; + Ok(me.check_compiler(info, cmd, cwd, env_vars)) } /// Look up compiler info from the cache for the compiler `path`. /// If not cached, determine the compiler type and cache the result. - fn compiler_info( + async fn compiler_info( &self, path: PathBuf, cwd: PathBuf, env: &[(OsString, OsString)], - ) -> SFuture>>> { + ) -> Result>> { trace!("compiler_info"); let me = self.clone(); @@ -876,152 +884,127 @@ where let path1 = path.clone(); let env = env.to_vec(); - let resolve_w_proxy = { + let res: Option<(PathBuf, FileTime)> = { let compiler_proxies_borrow = self.compiler_proxies.borrow(); if let Some((compiler_proxy, _filetime)) = compiler_proxies_borrow.get(&path) { - let fut = compiler_proxy.resolve_proxied_executable( - self.creator.clone(), - cwd.clone(), - env.as_slice(), - ); - Box::new(fut.then(|res: Result<_>| Ok(res.ok()))) + let fut = compiler_proxy + .resolve_proxied_executable(self.creator.clone(), cwd.clone(), env.as_slice()) + .compat(); + Box::pin(fut.map(|res: Result<_>| res.ok())) as Pin>> } else { - f_ok(None) + Box::pin(async { None }) + } + } + .await; + + // use the supplied compiler path as fallback, lookup its modification time too + + let (resolved_compiler_path, mtime) = match res { + Some(x) => x, // TODO resolve the path right away + None => { + // fallback to using the path directly + metadata(&path2) + .map(|attr| FileTime::from_last_modification_time(&attr)) + .ok() + .map(move |filetime| (path2.clone(), filetime)) + .expect("Must contain sane data, otherwise mtime is not avail") } }; - // use the supplied compiler path as fallback, lookup its modification time too - let w_fallback = resolve_w_proxy.then(move |res: Result>| { - let opt = match res { - Ok(Some(x)) => Some(x), // TODO resolve the path right away - _ => { - // fallback to using the path directly - metadata(&path2) + let dist_info = match me1.dist_client.get_client() { + Ok(Some(ref client)) => { + if let Some(archive) = client.get_custom_toolchain(&resolved_compiler_path) { + match metadata(&archive) .map(|attr| FileTime::from_last_modification_time(&attr)) - .ok() - .map(move |filetime| (path2, filetime)) + { + Ok(mtime) => Some((archive, mtime)), + _ => None, + } + } else { + None } - }; - f_ok(opt) - }); + } + _ => None, + }; - let lookup_compiler = w_fallback.and_then(move |opt: Option<(PathBuf, FileTime)>| { - let (resolved_compiler_path, mtime) = - opt.expect("Must contain sane data, otherwise mtime is not avail"); + let opt = match me1.compilers.borrow().get(&resolved_compiler_path) { + // It's a hit only if the mtime and dist archive data matches. + Some(&Some(ref entry)) => { + if entry.mtime == mtime && entry.dist_info == dist_info { + Some(entry.compiler.clone()) + } else { + None + } + } + _ => None, + }; - let dist_info = match me1.dist_client.get_client() { - Ok(Some(ref client)) => { - if let Some(archive) = client.get_custom_toolchain(&resolved_compiler_path) { - match metadata(&archive) - .map(|attr| FileTime::from_last_modification_time(&attr)) - { - Ok(mtime) => Some((archive, mtime)), - _ => None, + match opt { + Some(info) => { + trace!("compiler_info cache hit"); + Ok(info) + } + None => { + trace!("compiler_info cache miss"); + // Check the compiler type and return the result when + // finished. This generally involves invoking the compiler, + // so do it asynchronously. + + // the compiler path might be compiler proxy, so it is important to use + // `path` (or its clone `path1`) to resolve using that one, not using `resolved_compiler_path` + let info: Result<(Box>, Option>>)> = + get_compiler_info::( + me.creator.clone(), + &path1, + &cwd, + env.as_slice(), + &me.pool, + dist_info.clone().map(|(p, _)| p), + ) + .compat() + .await; + + match info { + Ok((ref c, ref proxy)) => { + // register the proxy for this compiler, so it will be used directly from now on + // and the true/resolved compiler will create table hits in the hash map + // based on the resolved path + if let Some(proxy) = proxy { + trace!( + "Inserting new path proxy {:?} @ {:?} -> {:?}", + &path, + &cwd, + resolved_compiler_path + ); + let proxy: Box> = proxy.box_clone(); + me.compiler_proxies + .borrow_mut() + .insert(path, (proxy, mtime.clone())); } - } else { - None - } - } - _ => None, - }; + // TODO add some safety checks in case a proxy exists, that the initial `path` is not + // TODO the same as the resolved compiler binary - let opt = match me1.compilers.borrow().get(&resolved_compiler_path) { - // It's a hit only if the mtime and dist archive data matches. - Some(&Some(ref entry)) => { - if entry.mtime == mtime && entry.dist_info == dist_info { - Some(entry.compiler.clone()) - } else { - None - } - } - _ => None, - }; - f_ok((resolved_compiler_path, mtime, opt, dist_info)) - }); - - let obtain = lookup_compiler.and_then( - move |(resolved_compiler_path, mtime, opt, dist_info): ( - PathBuf, - FileTime, - Option>>, - Option<(PathBuf, FileTime)>, - )| { - match opt { - Some(info) => { - trace!("compiler_info cache hit"); - f_ok(Ok(info)) - } - None => { - trace!("compiler_info cache miss"); - // Check the compiler type and return the result when - // finished. This generally involves invoking the compiler, - // so do it asynchronously. - - // the compiler path might be compiler proxy, so it is important to use - // `path` (or its clone `path1`) to resolve using that one, not using `resolved_compiler_path` - let x = get_compiler_info::( - me.creator.clone(), - &path1, - &cwd, - env.as_slice(), - &me.pool, - dist_info.clone().map(|(p, _)| p), + // cache + let map_info = CompilerCacheEntry::new(c.clone(), mtime, dist_info); + trace!( + "Inserting POSSIBLY PROXIED cache map info for {:?}", + &resolved_compiler_path ); - - Box::new(x.then( - move |info: Result<( - Box>, - Option>>, - )>| { - match info { - Ok((ref c, ref proxy)) => { - // register the proxy for this compiler, so it will be used directly from now on - // and the true/resolved compiler will create table hits in the hash map - // based on the resolved path - if let Some(proxy) = proxy { - trace!( - "Inserting new path proxy {:?} @ {:?} -> {:?}", - &path, - &cwd, - resolved_compiler_path - ); - let proxy: Box> = - proxy.box_clone(); - me.compiler_proxies - .borrow_mut() - .insert(path, (proxy, mtime)); - } - // TODO add some safety checks in case a proxy exists, that the initial `path` is not - // TODO the same as the resolved compiler binary - - // cache - let map_info = - CompilerCacheEntry::new(c.clone(), mtime, dist_info); - trace!( - "Inserting POSSIBLY PROXIED cache map info for {:?}", - &resolved_compiler_path - ); - me.compilers - .borrow_mut() - .insert(resolved_compiler_path, Some(map_info)); - } - Err(_) => { - trace!("Inserting PLAIN cache map info for {:?}", &path); - me.compilers.borrow_mut().insert(path, None); - } - } - // drop the proxy information, response is compiler only - let r: Result>> = info.map(|info| info.0); - f_ok(r) - }, - )) + me.compilers + .borrow_mut() + .insert(resolved_compiler_path, Some(map_info)); + } + Err(_) => { + trace!("Inserting PLAIN cache map info for {:?}", &path); + me.compilers.borrow_mut().insert(path, None); } } - }, - ); - - Box::new(obtain) + // drop the proxy information, response is compiler only + let r: Result>> = info.map(|info| info.0); + r + } + } } /// Check that we can handle and cache `cmd` when run with `compiler`. @@ -1089,7 +1072,7 @@ where arguments: Vec, cwd: PathBuf, env_vars: Vec<(OsString, OsString)>, - tx: mpsc::Sender>, + mut tx: mpsc::Sender>, ) { let force_recache = env_vars .iter() @@ -1101,27 +1084,30 @@ where }; let out_pretty = hasher.output_pretty().into_owned(); let color_mode = hasher.color_mode(); - let result = hasher.get_cached_or_compile( - self.dist_client.get_client(), - self.creator.clone(), - self.storage.clone(), - arguments, - cwd, - env_vars, - cache_control, - self.pool.clone(), - ); + let result = hasher + .get_cached_or_compile( + self.dist_client.get_client(), + self.creator.clone(), + self.storage.clone(), + arguments, + cwd, + env_vars, + cache_control, + self.pool.clone(), + ) + .compat(); let me = self.clone(); let kind = compiler.kind(); - let task = result.then(move |result| { + let task = async move { + let result = result.await; let mut cache_write = None; - let mut stats = me.stats.borrow_mut(); let mut res = CompileFinished { color_mode, ..Default::default() }; match result { Ok((compiled, out)) => { + let mut stats = me.stats.borrow_mut(); match compiled { CompileResult::Error => { stats.cache_errors.increment(&kind); @@ -1155,7 +1141,7 @@ where } stats.cache_misses.increment(&kind); stats.cache_read_miss_duration += duration; - cache_write = Some(future); + cache_write = Some(future.compat()); } CompileResult::NotCacheable => { stats.cache_misses.increment(&kind); @@ -1179,6 +1165,7 @@ where res.stderr = stderr; } Err(err) => { + let mut stats = me.stats.borrow_mut(); match err.downcast::() { Ok(ProcessError(output)) => { debug!("Compilation failed: {:?}", output); @@ -1219,36 +1206,36 @@ where } } }; - let send = tx.send(Ok(Response::CompileFinished(res))); + let send = Box::pin(async move { tx.send(Ok(Response::CompileFinished(res))).await }); let me = me.clone(); - let cache_write = cache_write.then(move |result| { - match result { - Err(e) => { - debug!("Error executing cache write: {}", e); - me.stats.borrow_mut().cache_write_errors += 1; + let cache_write = async { + if let Some(cache_write) = cache_write { + match cache_write.await { + Err(e) => { + debug!("Error executing cache write: {}", e); + me.stats.borrow_mut().cache_write_errors += 1; + } + //TODO: save cache stats! + Ok(info) => { + debug!( + "[{}]: Cache write finished in {}", + info.object_file_pretty, + util::fmt_duration_as_secs(&info.duration) + ); + me.stats.borrow_mut().cache_writes += 1; + me.stats.borrow_mut().cache_write_duration += info.duration; + } } - //TODO: save cache stats! - Ok(Some(info)) => { - debug!( - "[{}]: Cache write finished in {}", - info.object_file_pretty, - util::fmt_duration_as_secs(&info.duration) - ); - me.stats.borrow_mut().cache_writes += 1; - me.stats.borrow_mut().cache_write_duration += info.duration; - } - - Ok(None) => {} } Ok(()) - }); + }; - send.join(cache_write).then(|_| Ok(())) - }); + future::try_join(send, cache_write).map(|_| Ok(())).await + }; tokio_compat::runtime::current_thread::TaskExecutor::current() - .spawn_local(Box::new(task)) + .spawn_local(Box::new(Box::pin(task).compat())) .unwrap(); } } @@ -1555,7 +1542,7 @@ impl ServerInfo { enum Frame { Body { chunk: Option }, - Message { message: R, body: bool }, + Message { message: R }, } struct Body { @@ -1573,12 +1560,9 @@ impl futures_03::Stream for Body { type Item = Result; fn poll_next( mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, + cx: &mut Context<'_>, ) -> std::task::Poll> { - match Pin::new(&mut self.receiver).poll().unwrap() { - Async::Ready(item) => std::task::Poll::Ready(item), - Async::NotReady => std::task::Poll::Pending, - } + Pin::new(&mut self.receiver).poll_next(cx) } } @@ -1596,6 +1580,32 @@ impl Message { } } +struct BincodeCodec; +impl tokio_serde::Serializer for BincodeCodec +where + T: serde::Serialize, +{ + type Error = Error; + + fn serialize(self: Pin<&mut Self>, item: &T) -> std::result::Result { + let mut bytes = BytesMut::new(); + bincode::serialize_into((&mut bytes).writer(), item)?; + Ok(bytes.freeze()) + } +} + +impl tokio_serde::Deserializer for BincodeCodec +where + T: serde::de::DeserializeOwned, +{ + type Error = Error; + + fn deserialize(self: Pin<&mut Self>, buf: &BytesMut) -> std::result::Result { + let ret = bincode::deserialize(buf)?; + Ok(ret) + } +} + /// Implementation of `Stream + Sink` that tokio-proto is expecting /// /// This type is composed of a few layers: @@ -1611,51 +1621,53 @@ impl Message { /// `Sink` implementation to switch from `BytesMut` to `Response` meaning that /// all `Response` types pushed in will be converted to `BytesMut` and pushed /// below. -struct SccacheTransport { - inner: WriteBincode, Request>, Response>, +struct SccacheTransport { + inner: Framed< + futures_03::stream::ErrInto< + futures_03::sink::SinkErrInto< + tokio_util::codec::Framed, + Bytes, + Error, + >, + Error, + >, + Request, + Response, + BincodeCodec, + >, } -impl Stream for SccacheTransport { - type Item = Message>; - type Error = io::Error; +impl Stream for SccacheTransport { + type Item = Result>>; - fn poll(&mut self) -> Poll, io::Error> { - let msg = try_ready!(self.inner.poll().map_err(|e| { - error!("SccacheTransport::poll failed: {}", e); - io::Error::new(io::ErrorKind::Other, e) - })); - Ok(msg.map(Message::WithoutBody).into()) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner) + .poll_next(cx) + .map(|r| r.map(|s| s.map(Message::WithoutBody))) } } -impl Sink for SccacheTransport { - type SinkItem = Frame; - type SinkError = io::Error; +impl Sink> for SccacheTransport { + type Error = Error; - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_ready(cx) + } + + fn start_send(mut self: Pin<&mut Self>, item: Frame) -> Result<()> { match item { - Frame::Message { message, body } => match self.inner.start_send(message)? { - AsyncSink::Ready => Ok(AsyncSink::Ready), - AsyncSink::NotReady(message) => { - Ok(AsyncSink::NotReady(Frame::Message { message, body })) - } - }, - Frame::Body { chunk: Some(chunk) } => match self.inner.start_send(chunk)? { - AsyncSink::Ready => Ok(AsyncSink::Ready), - AsyncSink::NotReady(chunk) => { - Ok(AsyncSink::NotReady(Frame::Body { chunk: Some(chunk) })) - } - }, - Frame::Body { chunk: None } => Ok(AsyncSink::Ready), + Frame::Message { message } => Pin::new(&mut self.inner).start_send(message), + Frame::Body { chunk: Some(chunk) } => Pin::new(&mut self.inner).start_send(chunk), + Frame::Body { chunk: None } => Ok(()), } } - fn poll_complete(&mut self) -> Poll<(), io::Error> { - self.inner.poll_complete() + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) } - fn close(&mut self) -> Poll<(), io::Error> { - self.inner.close() + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) } } @@ -1666,29 +1678,26 @@ struct ShutdownOrInactive { } impl Future for ShutdownOrInactive { - type Item = (); - type Error = io::Error; + type Output = (); - fn poll(&mut self) -> Poll<(), io::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { loop { - match self.rx.poll().unwrap() { - Async::NotReady => break, + match Pin::new(&mut self.rx).poll_next(cx) { + Poll::Pending => break, // Shutdown received! - Async::Ready(Some(ServerMessage::Shutdown)) => return Ok(().into()), - Async::Ready(Some(ServerMessage::Request)) => { + Poll::Ready(Some(ServerMessage::Shutdown)) => return Poll::Ready(()), + Poll::Ready(Some(ServerMessage::Request)) => { if self.timeout_dur != Duration::new(0, 0) { - self.timeout = Some(Delay::new(Instant::now() + self.timeout_dur)); + self.timeout = Some(delay_for(self.timeout_dur)); } } // All services have shut down, in theory this isn't possible... - Async::Ready(None) => return Ok(().into()), + Poll::Ready(None) => return Poll::Ready(()), } } match self.timeout { - None => Ok(Async::NotReady), - Some(ref mut timeout) => timeout - .poll() - .map_err(|err| io::Error::new(io::ErrorKind::Other, err)), + None => Poll::Pending, + Some(ref mut timeout) => Pin::new(timeout).poll(cx), } } } diff --git a/src/test/tests.rs b/src/test/tests.rs index a48ed853..74e3d470 100644 --- a/src/test/tests.rs +++ b/src/test/tests.rs @@ -20,6 +20,7 @@ use crate::mock_command::*; use crate::server::{DistClientContainer, SccacheServer, ServerMessage}; use crate::test::utils::*; use futures::sync::oneshot::{self, Sender}; +use futures_03::compat::*; use futures_03::executor::ThreadPool; use std::fs::File; use std::io::{Cursor, Write}; @@ -92,7 +93,7 @@ where let port = srv.port(); let creator = srv.command_creator().clone(); tx.send((port, creator)).unwrap(); - srv.run(shutdown_rx).unwrap(); + srv.run(shutdown_rx.compat()).unwrap(); }); let (port, creator) = rx.recv().unwrap(); (port, shutdown_tx, creator, handle)