refactor: Convert server.rs to use async await

This commit is contained in:
Markus Westerlind 2020-04-29 15:48:55 +02:00 коммит произвёл Sylvestre Ledru
Родитель 72755c6942
Коммит 905299fa65
6 изменённых файлов: 484 добавлений и 523 удалений

272
Cargo.lock сгенерированный
Просмотреть файл

@ -184,8 +184,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
]
[[package]]
@ -262,17 +262,6 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bincode"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e103c8b299b28a9c6990458b7013dc4a8356a9b854c51b9883241f5866fac36e"
dependencies = [
"byteorder",
"num-traits 0.1.43",
"serde",
]
[[package]]
name = "bincode"
version = "1.3.1"
@ -401,12 +390,6 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]]
name = "case"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88b166b48e29667f5443df64df3c61dc07dc2b1a0b0d231800e07f09a33ecc1"
[[package]]
name = "cc"
version = "1.0.66"
@ -436,7 +419,7 @@ checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits 0.2.14",
"num-traits",
"time",
"winapi 0.3.9",
]
@ -683,17 +666,6 @@ dependencies = [
"libc",
]
[[package]]
name = "derive-error"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629f1bb3abce791912ca85a24676fff54464f7deb122906adabc90fb96e876d3"
dependencies = [
"case",
"quote 0.3.15",
"syn 0.11.11",
]
[[package]]
name = "difference"
version = "2.0.0"
@ -812,8 +784,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
"synstructure",
]
@ -856,7 +828,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
dependencies = [
"num-traits 0.2.14",
"num-traits",
]
[[package]]
@ -1000,8 +972,8 @@ checksum = "0f8719ca0e1f3c5e34f3efe4570ef2c0610ca6da85ae7990d472e9cbfba13664"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
]
[[package]]
@ -1698,7 +1670,7 @@ checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
dependencies = [
"autocfg 1.0.1",
"num-integer",
"num-traits 0.2.14",
"num-traits",
]
[[package]]
@ -1708,16 +1680,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg 1.0.1",
"num-traits 0.2.14",
]
[[package]]
name = "num-traits"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31"
dependencies = [
"num-traits 0.2.14",
"num-traits",
]
[[package]]
@ -1890,6 +1853,26 @@ dependencies = [
"unicase 1.4.2",
]
[[package]]
name = "pin-project"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "918192b5c59119d51e0cd221f4d49dde9112824ba717369e903c97d076083d0f"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3be26700300be6d9d23264c73211d8190e755b6b5ca7a1b28230025511b52a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.1.11"
@ -1980,7 +1963,7 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
dependencies = [
"unicode-xid 0.2.1",
"unicode-xid",
]
[[package]]
@ -2011,12 +1994,6 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"
[[package]]
name = "quote"
version = "1.0.8"
@ -2411,9 +2388,10 @@ dependencies = [
"assert_cmd",
"atty",
"base64 0.13.0",
"bincode 1.3.1",
"bincode",
"blake3",
"byteorder",
"bytes 0.5.6",
"cc",
"chrono",
"clap",
@ -2464,15 +2442,16 @@ dependencies = [
"syslog",
"tar",
"tempfile",
"tokio 0.2.24",
"tokio-compat",
"tokio-io",
"tokio-named-pipes",
"tokio-process",
"tokio-reactor",
"tokio-serde-bincode",
"tokio-tcp",
"tokio-serde",
"tokio-timer",
"tokio-uds",
"tokio-util",
"toml",
"tower",
"untrusted",
@ -2570,8 +2549,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
]
[[package]]
@ -2646,7 +2625,7 @@ checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b"
dependencies = [
"chrono",
"num-bigint",
"num-traits 0.2.14",
"num-traits",
]
[[package]]
@ -2727,17 +2706,6 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]]
name = "syn"
version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad"
dependencies = [
"quote 0.3.15",
"synom",
"unicode-xid 0.0.4",
]
[[package]]
name = "syn"
version = "1.0.58"
@ -2745,17 +2713,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc60a3d73ea6594cd712d830cc1f0390fd71542d8c8cd24e70cc54cdfd5e05d5"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"unicode-xid 0.2.1",
]
[[package]]
name = "synom"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6"
dependencies = [
"unicode-xid 0.0.4",
"quote",
"unicode-xid",
]
[[package]]
@ -2765,9 +2724,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"unicode-xid 0.2.1",
"quote",
"syn",
"unicode-xid",
]
[[package]]
@ -2863,8 +2822,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
]
[[package]]
@ -3103,26 +3062,13 @@ dependencies = [
[[package]]
name = "tokio-serde"
version = "0.1.0"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "894168193c4f80862a2244ff953b69145a9961a9efba39500e0970b083d0649c"
checksum = "ebdd897b01021779294eb09bb3b52b6e11b0747f9f7e333a84bef532b656de99"
dependencies = [
"bytes 0.4.12",
"futures 0.1.30",
]
[[package]]
name = "tokio-serde-bincode"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02e35c8d60a5e87cfb30dd562a309e56f8a6d36617b0a76c87f04d5466607ca8"
dependencies = [
"bincode 0.8.0",
"bytes 0.4.12",
"derive-error",
"futures 0.1.30",
"serde",
"tokio-serde",
"bytes 0.5.6",
"futures 0.3.9",
"pin-project",
]
[[package]]
@ -3253,11 +3199,11 @@ dependencies = [
[[package]]
name = "tower"
version = "0.1.1"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc72f33b6a72c75c9df0037afce313018bae845f0ec7fdb9201b8768427a917f"
checksum = "fd3169017c090b7a28fce80abaad0ab4f5566423677c9331bb320af7e49cfe62"
dependencies = [
"futures 0.1.30",
"futures-core",
"tower-buffer",
"tower-discover",
"tower-layer",
@ -3271,13 +3217,13 @@ dependencies = [
[[package]]
name = "tower-buffer"
version = "0.1.2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c7b83e1ccf5b23dd109dd6ae2c07b8e2beec7a51a21f29da2dba576317370e0"
checksum = "c4887dc2a65d464c8b9b66e0e4d51c2fd6cf5b3373afc72805b0a60bce00446a"
dependencies = [
"futures 0.1.30",
"tokio-executor",
"tokio-sync",
"futures-core",
"pin-project",
"tokio 0.2.24",
"tower-layer",
"tower-service",
"tracing",
@ -3285,91 +3231,101 @@ dependencies = [
[[package]]
name = "tower-discover"
version = "0.1.0"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73a7632286f78164d65d18fd0e570307acde9362489aa5c8c53e6315cc2bde47"
checksum = "0f6b5000c3c54d269cc695dff28136bb33d08cbf1df2c48129e143ab65bf3c2a"
dependencies = [
"futures 0.1.30",
"futures-core",
"pin-project",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.1.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ddf07e10c07dcc8f41da6de036dc66def1a85b70eb8a385159e3908bb258328"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
[[package]]
name = "tower-limit"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92c3040c5dbed68abffaa0d4517ac1a454cd741044f33ab0eefab6b8d1361404"
dependencies = [
"futures 0.1.30",
"futures-core",
"pin-project",
"tokio 0.2.24",
"tower-layer",
"tower-load",
"tower-service",
]
[[package]]
name = "tower-limit"
version = "0.1.3"
name = "tower-load"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c21ba835a08fd54b63cd91ae0548a7b6e2a91075147dfa3dc8e1a940c1b6f18f"
checksum = "8cc79fc3afd07492b7966d7efa7c6c50f8ed58d768a6075dd7ae6591c5d2017b"
dependencies = [
"futures 0.1.30",
"tokio-sync",
"tokio-timer",
"tower-layer",
"futures-core",
"log 0.4.11",
"pin-project",
"tokio 0.2.24",
"tower-discover",
"tower-service",
"tracing",
]
[[package]]
name = "tower-load-shed"
version = "0.1.0"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04fbaf5bfb63d84204db87b9b2aeec61549613f2bbb8706dcc36f5f3ea8cd769"
checksum = "9f021e23900173dc315feb4b6922510dae3e79c689b74c089112066c11f0ae4e"
dependencies = [
"futures 0.1.30",
"futures-core",
"pin-project",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-retry"
version = "0.1.0"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09e80588125061f276ed2a7b0939988b411e570a2dbb2965b1382ef4f71036f7"
checksum = "e6727956aaa2f8957d4d9232b308fe8e4e65d99db30f42b225646e86c9b6a952"
dependencies = [
"futures 0.1.30",
"tokio-timer",
"futures-core",
"pin-project",
"tokio 0.2.24",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-service"
version = "0.2.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cc0c98637d23732f8de6dfd16494c9f1559c3b9e20b4a46462c8f9b9e827bfa"
dependencies = [
"futures 0.1.30",
]
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tower-timeout"
version = "0.1.1"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c06bbc2fbd056f810940a8c6f0cc194557d36da3c22999a755a7a6612447da9"
checksum = "127b8924b357be938823eaaec0608c482d40add25609481027b96198b2e4b31e"
dependencies = [
"futures 0.1.30",
"tokio-timer",
"pin-project",
"tokio 0.2.24",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-util"
version = "0.1.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4792342fac093db5d2558655055a89a04ca909663467a4310c7739d9f8b64698"
checksum = "d1093c19826d33807c72511e68f73b4a0469a3f22c2bd5f7d5212178b4b89674"
dependencies = [
"futures 0.1.30",
"tokio-io",
"tower-layer",
"futures-core",
"futures-util",
"pin-project",
"tower-service",
]
@ -3393,8 +3349,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e0ccfc3378da0cce270c946b676a376943f5cd16aeba64568e7939806f4ada"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
]
[[package]]
@ -3484,12 +3440,6 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
[[package]]
name = "unicode-xid"
version = "0.2.1"
@ -3679,8 +3629,8 @@ dependencies = [
"lazy_static",
"log 0.4.11",
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
"wasm-bindgen-shared",
]
@ -3702,7 +3652,7 @@ version = "0.2.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6ac8995ead1f084a8dea1e65f194d0973800c7f571f6edd70adf06ecf77084"
dependencies = [
"quote 1.0.8",
"quote",
"wasm-bindgen-macro-support",
]
@ -3713,8 +3663,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a48c72f299d80557c7c62e37e7225369ecc0c963964059509fbafe917c7549"
dependencies = [
"proc-macro2",
"quote 1.0.8",
"syn 1.0.58",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]

Просмотреть файл

@ -29,6 +29,7 @@ base64 = "0.13"
bincode = "1"
blake3 = "0.3"
byteorder = "1.0"
bytes = "0.5"
chrono = { version = "0.4", optional = true }
clap = "2.23.0"
counted-array = "0.1"
@ -69,12 +70,13 @@ serde_json = "1.0"
strip-ansi-escapes = "0.1"
tar = "0.4"
tempfile = "3"
tokio = { version = "0.2", features = ["tcp"] }
tokio-compat = "0.1"
tokio-io = "0.1"
tokio-process = "0.2"
tokio-serde-bincode = "0.1"
tower = "0.1"
tokio-tcp = "0.1"
tokio-serde = "0.6"
tokio-util = { version = "0.3", features = ["codec"] }
tower = "0.3"
tokio-timer = "0.2"
toml = "0.5"
untrusted = { version = "0.7", optional = true }

Просмотреть файл

@ -67,6 +67,7 @@ pub type Result<T> = anyhow::Result<T>;
pub type SFuture<T> = Box<dyn Future<Item = T, Error = Error>>;
pub type SFutureSend<T> = Box<dyn Future<Item = T, Error = Error> + Send>;
pub type SFutureStd<T> = Box<dyn std::future::Future<Output = Result<T>>>;
pub trait FutureContext<T> {
fn fcontext<C>(self, context: C) -> SFuture<T>
@ -105,7 +106,7 @@ macro_rules! ftry {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Box::new($crate::futures::future::err(e.into())) as SFuture<_>,
Err(e) => return Box::new(futures::future::err(e.into())) as SFuture<_>,
}
};
}
@ -115,7 +116,7 @@ macro_rules! ftry_send {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return Box::new($crate::futures::future::err(e)) as SFutureSend<_>,
Err(e) => return Box::new(futures::future::err(e)) as SFutureSend<_>,
}
};
}

Просмотреть файл

@ -20,8 +20,6 @@
extern crate clap;
#[macro_use]
extern crate counted_array;
#[macro_use]
extern crate futures;
#[cfg(feature = "jsonwebtoken")]
use jsonwebtoken as jwt;
#[macro_use]

Просмотреть файл

@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// For tokio_io::codec::length_delimited::Framed;
#![allow(deprecated)]
use crate::cache::{storage_from_config, Storage};
use crate::compiler::{
get_compiler_info, CacheControl, CompileResult, Compiler, CompilerArguments, CompilerHasher,
@ -30,11 +27,11 @@ use crate::protocol::{Compile, CompileFinished, CompileResponse, Request, Respon
use crate::util;
#[cfg(feature = "dist-client")]
use anyhow::Context as _;
use bytes::{buf::ext::BufMutExt, Bytes, BytesMut};
use filetime::FileTime;
use futures::sync::mpsc;
use futures::{future, stream, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use futures_03::compat::Compat;
use futures::Future as _;
use futures_03::executor::ThreadPool;
use futures_03::{channel::mpsc, compat::*, future, prelude::*, stream};
use number_prefix::NumberPrefix;
use std::cell::RefCell;
use std::collections::HashMap;
@ -42,6 +39,7 @@ use std::env;
use std::ffi::{OsStr, OsString};
use std::fs::metadata;
use std::io::{self, Write};
use std::marker::Unpin;
#[cfg(feature = "dist-client")]
use std::mem;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
@ -52,17 +50,18 @@ use std::rc::Rc;
use std::sync::Arc;
#[cfg(feature = "dist-client")]
use std::sync::Mutex;
use std::task::{Context, Waker};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use std::time::Instant;
use std::u64;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
time::{self, delay_for, Delay},
};
use tokio_compat::runtime::current_thread::Runtime;
use tokio_io::codec::length_delimited;
use tokio_io::codec::length_delimited::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_serde_bincode::{ReadBincode, WriteBincode};
use tokio_tcp::TcpListener;
use tokio_timer::{Delay, Timeout};
use tokio_serde::Framed;
use tokio_util::codec::{length_delimited, LengthDelimitedCodec};
use tower::Service;
use crate::errors::*;
@ -410,7 +409,7 @@ pub fn start_server(config: &Config, port: u16) -> Result<()> {
let port = srv.port();
info!("server started, listening on port {}", port);
notify_server_startup(&notify, ServerStartup::Ok { port })?;
srv.run(future::empty::<(), ()>())?;
srv.run(future::pending::<()>())?;
Ok(())
}
Err(e) => {
@ -442,13 +441,13 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
pub fn new(
port: u16,
pool: ThreadPool,
runtime: Runtime,
mut runtime: Runtime,
client: Client,
dist_client: DistClientContainer,
storage: Arc<dyn Storage>,
) -> Result<SccacheServer<C>> {
let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
let listener = TcpListener::bind(&SocketAddr::V4(addr))?;
let listener = runtime.block_on_std(TcpListener::bind(&SocketAddr::V4(addr)))?;
// Prepare the service which we'll use to service all incoming TCP
// connections.
@ -505,13 +504,9 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
where
F: Future,
{
self._run(Box::new(shutdown.then(|_| Ok(()))))
}
fn _run<'a>(self, shutdown: Box<dyn Future<Item = (), Error = ()> + 'a>) -> io::Result<()> {
let SccacheServer {
mut runtime,
listener,
mut listener,
rx,
service,
timeout,
@ -520,14 +515,20 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
// Create our "server future" which will simply handle all incoming
// connections in separate tasks.
let server = listener.incoming().for_each(move |socket| {
trace!("incoming connection");
tokio_compat::runtime::current_thread::TaskExecutor::current()
.spawn_local(Box::new(service.clone().bind(socket).map_err(|err| {
error!("{}", err);
})))
.unwrap();
Ok(())
let server = listener.incoming().try_for_each(move |socket| {
let service = service.clone();
async move {
trace!("incoming connection");
tokio_compat::runtime::current_thread::TaskExecutor::current()
.spawn_local(Box::new(
Box::pin(service.bind(socket).map_err(|err| {
error!("{}", err);
}))
.compat(),
))
.unwrap();
Ok(())
}
});
// Right now there's a whole bunch of ways to shut down this server for
@ -542,35 +543,32 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
// inactivity, and this is then select'd with the `shutdown` future
// passed to this function.
let shutdown = shutdown.map(|a| {
let shutdown = shutdown.map(|_| {
info!("shutting down due to explicit signal");
a
});
let mut futures = vec![
Box::new(server) as Box<dyn Future<Item = _, Error = _>>,
Box::new(
shutdown
.map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown signal failed")),
),
Box::pin(server) as Pin<Box<dyn Future<Output = _>>>,
Box::pin(shutdown.map(Ok)),
];
let shutdown_idle = ShutdownOrInactive {
rx,
timeout: if timeout != Duration::new(0, 0) {
Some(Delay::new(Instant::now() + timeout))
} else {
None
},
timeout_dur: timeout,
};
futures.push(Box::new(shutdown_idle.map(|a| {
futures.push(Box::pin(async {
ShutdownOrInactive {
rx,
timeout: if timeout != Duration::new(0, 0) {
Some(delay_for(timeout))
} else {
None
},
timeout_dur: timeout,
}
.await;
info!("shutting down due to being idle or request");
a
})));
Ok(())
}));
let server = future::select_all(futures);
runtime.block_on(server).map_err(|p| p.0)?;
let server = future::select_all(futures).map(|t| t.0);
runtime.block_on_std(server)?;
info!(
"moving into the shutdown phase now, waiting at most 10 seconds \
@ -585,14 +583,13 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
// Note that we cap the amount of time this can take, however, as we
// don't want to wait *too* long.
runtime
.block_on(Timeout::new(Compat::new(wait), Duration::new(30, 0)))
.map_err(|e| {
if e.is_inner() {
e.into_inner().unwrap()
} else {
io::Error::new(io::ErrorKind::Other, e)
}
})?;
.block_on_std(async {
time::timeout(Duration::new(30, 0), wait)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.unwrap_or_else(|e| Err(io::Error::new(io::ErrorKind::Other, e)))
})
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
info!("ok, fully shutting down now");
@ -685,13 +682,13 @@ pub enum ServerMessage {
Shutdown,
}
impl<C> Service<SccacheRequest> for SccacheService<C>
impl<C> Service<SccacheRequest> for Arc<SccacheService<C>>
where
C: CommandCreatorSync + 'static,
{
type Response = SccacheResponse;
type Error = Error;
type Future = SFuture<Self::Response>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response>>>>;
fn call(&mut self, req: SccacheRequest) -> Self::Future {
trace!("handle_client");
@ -701,44 +698,60 @@ where
// that every message is received.
drop(self.tx.clone().start_send(ServerMessage::Request));
let res: SFuture<Response> = match req.into_inner() {
Request::Compile(compile) => {
debug!("handle_client: compile");
self.stats.borrow_mut().compile_requests += 1;
return self.handle_compile(compile);
let self_ = self.clone();
Box::pin(async move {
match req.into_inner() {
Request::Compile(compile) => {
debug!("handle_client: compile");
self_.stats.borrow_mut().compile_requests += 1;
self_.handle_compile(compile).await
}
Request::GetStats => {
debug!("handle_client: get_stats");
self_
.get_info()
.await
.map(|i| Response::Stats(Box::new(i)))
.map(Message::WithoutBody)
}
Request::DistStatus => {
debug!("handle_client: dist_status");
self_
.get_dist_status()
.await
.map(Response::DistStatus)
.map(Message::WithoutBody)
}
Request::ZeroStats => {
debug!("handle_client: zero_stats");
self_.zero_stats();
self_
.get_info()
.await
.map(|i| Response::Stats(Box::new(i)))
.map(Message::WithoutBody)
}
Request::Shutdown => {
debug!("handle_client: shutdown");
let mut tx = self_.tx.clone();
future::try_join(
async {
let _ = tx.send(ServerMessage::Shutdown).await;
Ok(())
},
self_.get_info(),
)
.await
.map(move |(_, info)| {
Message::WithoutBody(Response::ShuttingDown(Box::new(info)))
})
}
}
Request::GetStats => {
debug!("handle_client: get_stats");
Box::new(self.get_info().map(|i| Response::Stats(Box::new(i))))
}
Request::DistStatus => {
debug!("handle_client: dist_status");
Box::new(self.get_dist_status().map(Response::DistStatus))
}
Request::ZeroStats => {
debug!("handle_client: zero_stats");
self.zero_stats();
Box::new(self.get_info().map(|i| Response::Stats(Box::new(i))))
}
Request::Shutdown => {
debug!("handle_client: shutdown");
let future = self
.tx
.clone()
.send(ServerMessage::Shutdown)
.then(|_| Ok(()));
let info_future = self.get_info();
return Box::new(future.join(info_future).map(move |(_, info)| {
Message::WithoutBody(Response::ShuttingDown(Box::new(info)))
}));
}
};
Box::new(res.map(Message::WithoutBody))
})
}
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}
@ -767,9 +780,9 @@ where
}
}
fn bind<T>(mut self, socket: T) -> impl Future<Item = (), Error = Error>
fn bind<T>(self, socket: T) -> impl Future<Output = Result<()>>
where
T: AsyncRead + AsyncWrite + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
let mut builder = length_delimited::Builder::new();
if let Ok(max_frame_length_str) = env::var("SCCACHE_MAX_FRAME_LENGTH") {
@ -782,56 +795,53 @@ where
let io = builder.new_framed(socket);
let (sink, stream) = SccacheTransport {
inner: WriteBincode::new(ReadBincode::new(io)),
inner: Framed::new(io.sink_err_into().err_into(), BincodeCodec),
}
.split();
let sink = sink.sink_from_err::<Error>();
let sink = sink.sink_err_into::<Error>();
let mut self_ = Arc::new(self);
stream
.from_err::<Error>()
.and_then(move |input| self.call(input))
.and_then(|message| {
let f: Box<dyn Stream<Item = _, Error = _>> = match message {
Message::WithoutBody(message) => Box::new(stream::once(Ok(Frame::Message {
message,
body: false,
}))),
Message::WithBody(message, body) => Box::new(
stream::once(Ok(Frame::Message {
message,
body: true,
}))
.chain(Compat::new(body).map(|chunk| Frame::Body { chunk: Some(chunk) }))
.chain(stream::once(Ok(Frame::Body { chunk: None }))),
.err_into::<Error>()
.and_then(move |input| self_.call(input))
.and_then(|message| async move {
let f: Pin<Box<dyn Stream<Item = _>>> = match message {
Message::WithoutBody(message) => {
Box::pin(stream::once(async { Ok(Frame::Message { message }) }))
}
Message::WithBody(message, body) => Box::pin(
stream::once(async { Ok(Frame::Message { message }) })
.chain(body.map_ok(|chunk| Frame::Body { chunk: Some(chunk) }))
.chain(stream::once(async { Ok(Frame::Body { chunk: None }) })),
),
};
Ok(f.from_err::<Error>())
Ok(f.err_into::<Error>())
})
.flatten()
.try_flatten()
.forward(sink)
.map(|_| ())
.map_ok(|_| ())
}
/// Get dist status.
fn get_dist_status(&self) -> SFuture<DistInfo> {
f_ok(self.dist_client.get_status())
async fn get_dist_status(&self) -> Result<DistInfo> {
Ok(self.dist_client.get_status())
}
/// Get info and stats about the cache.
fn get_info(&self) -> SFuture<ServerInfo> {
async fn get_info(&self) -> Result<ServerInfo> {
let stats = self.stats.borrow().clone();
let cache_location = self.storage.location();
Box::new(
self.storage
.current_size()
.join(self.storage.max_size())
.map(move |(cache_size, max_cache_size)| ServerInfo {
stats,
cache_location,
cache_size,
max_cache_size,
}),
future::try_join(
self.storage.current_size().compat(),
self.storage.max_size().compat(),
)
.await
.map(move |(cache_size, max_cache_size)| ServerInfo {
stats,
cache_location,
cache_size,
max_cache_size,
})
}
/// Zero stats about the cache.
@ -844,27 +854,25 @@ where
/// This will handle a compile request entirely, generating a response with
/// the inital information and an optional body which will eventually
/// contain the results of the compilation.
fn handle_compile(&self, compile: Compile) -> SFuture<SccacheResponse> {
async fn handle_compile(&self, compile: Compile) -> Result<SccacheResponse> {
let exe = compile.exe;
let cmd = compile.args;
let cwd: PathBuf = compile.cwd.into();
let env_vars = compile.env_vars;
let me = self.clone();
Box::new(
self.compiler_info(exe.into(), cwd.clone(), &env_vars)
.map(move |info| me.check_compiler(info, cmd, cwd, env_vars)),
)
let info = self.compiler_info(exe.into(), cwd.clone(), &env_vars).await;
Ok(me.check_compiler(info, cmd, cwd, env_vars))
}
/// Look up compiler info from the cache for the compiler `path`.
/// If not cached, determine the compiler type and cache the result.
fn compiler_info(
async fn compiler_info(
&self,
path: PathBuf,
cwd: PathBuf,
env: &[(OsString, OsString)],
) -> SFuture<Result<Box<dyn Compiler<C>>>> {
) -> Result<Box<dyn Compiler<C>>> {
trace!("compiler_info");
let me = self.clone();
@ -876,152 +884,127 @@ where
let path1 = path.clone();
let env = env.to_vec();
let resolve_w_proxy = {
let res: Option<(PathBuf, FileTime)> = {
let compiler_proxies_borrow = self.compiler_proxies.borrow();
if let Some((compiler_proxy, _filetime)) = compiler_proxies_borrow.get(&path) {
let fut = compiler_proxy.resolve_proxied_executable(
self.creator.clone(),
cwd.clone(),
env.as_slice(),
);
Box::new(fut.then(|res: Result<_>| Ok(res.ok())))
let fut = compiler_proxy
.resolve_proxied_executable(self.creator.clone(), cwd.clone(), env.as_slice())
.compat();
Box::pin(fut.map(|res: Result<_>| res.ok())) as Pin<Box<dyn Future<Output = _>>>
} else {
f_ok(None)
Box::pin(async { None })
}
}
.await;
// use the supplied compiler path as fallback, lookup its modification time too
let (resolved_compiler_path, mtime) = match res {
Some(x) => x, // TODO resolve the path right away
None => {
// fallback to using the path directly
metadata(&path2)
.map(|attr| FileTime::from_last_modification_time(&attr))
.ok()
.map(move |filetime| (path2.clone(), filetime))
.expect("Must contain sane data, otherwise mtime is not avail")
}
};
// use the supplied compiler path as fallback, lookup its modification time too
let w_fallback = resolve_w_proxy.then(move |res: Result<Option<(PathBuf, FileTime)>>| {
let opt = match res {
Ok(Some(x)) => Some(x), // TODO resolve the path right away
_ => {
// fallback to using the path directly
metadata(&path2)
let dist_info = match me1.dist_client.get_client() {
Ok(Some(ref client)) => {
if let Some(archive) = client.get_custom_toolchain(&resolved_compiler_path) {
match metadata(&archive)
.map(|attr| FileTime::from_last_modification_time(&attr))
.ok()
.map(move |filetime| (path2, filetime))
{
Ok(mtime) => Some((archive, mtime)),
_ => None,
}
} else {
None
}
};
f_ok(opt)
});
}
_ => None,
};
let lookup_compiler = w_fallback.and_then(move |opt: Option<(PathBuf, FileTime)>| {
let (resolved_compiler_path, mtime) =
opt.expect("Must contain sane data, otherwise mtime is not avail");
let opt = match me1.compilers.borrow().get(&resolved_compiler_path) {
// It's a hit only if the mtime and dist archive data matches.
Some(&Some(ref entry)) => {
if entry.mtime == mtime && entry.dist_info == dist_info {
Some(entry.compiler.clone())
} else {
None
}
}
_ => None,
};
let dist_info = match me1.dist_client.get_client() {
Ok(Some(ref client)) => {
if let Some(archive) = client.get_custom_toolchain(&resolved_compiler_path) {
match metadata(&archive)
.map(|attr| FileTime::from_last_modification_time(&attr))
{
Ok(mtime) => Some((archive, mtime)),
_ => None,
match opt {
Some(info) => {
trace!("compiler_info cache hit");
Ok(info)
}
None => {
trace!("compiler_info cache miss");
// Check the compiler type and return the result when
// finished. This generally involves invoking the compiler,
// so do it asynchronously.
// the compiler path might be compiler proxy, so it is important to use
// `path` (or its clone `path1`) to resolve using that one, not using `resolved_compiler_path`
let info: Result<(Box<dyn Compiler<C>>, Option<Box<dyn CompilerProxy<C>>>)> =
get_compiler_info::<C>(
me.creator.clone(),
&path1,
&cwd,
env.as_slice(),
&me.pool,
dist_info.clone().map(|(p, _)| p),
)
.compat()
.await;
match info {
Ok((ref c, ref proxy)) => {
// register the proxy for this compiler, so it will be used directly from now on
// and the true/resolved compiler will create table hits in the hash map
// based on the resolved path
if let Some(proxy) = proxy {
trace!(
"Inserting new path proxy {:?} @ {:?} -> {:?}",
&path,
&cwd,
resolved_compiler_path
);
let proxy: Box<dyn CompilerProxy<C>> = proxy.box_clone();
me.compiler_proxies
.borrow_mut()
.insert(path, (proxy, mtime.clone()));
}
} else {
None
}
}
_ => None,
};
// TODO add some safety checks in case a proxy exists, that the initial `path` is not
// TODO the same as the resolved compiler binary
let opt = match me1.compilers.borrow().get(&resolved_compiler_path) {
// It's a hit only if the mtime and dist archive data matches.
Some(&Some(ref entry)) => {
if entry.mtime == mtime && entry.dist_info == dist_info {
Some(entry.compiler.clone())
} else {
None
}
}
_ => None,
};
f_ok((resolved_compiler_path, mtime, opt, dist_info))
});
let obtain = lookup_compiler.and_then(
move |(resolved_compiler_path, mtime, opt, dist_info): (
PathBuf,
FileTime,
Option<Box<dyn Compiler<C>>>,
Option<(PathBuf, FileTime)>,
)| {
match opt {
Some(info) => {
trace!("compiler_info cache hit");
f_ok(Ok(info))
}
None => {
trace!("compiler_info cache miss");
// Check the compiler type and return the result when
// finished. This generally involves invoking the compiler,
// so do it asynchronously.
// the compiler path might be compiler proxy, so it is important to use
// `path` (or its clone `path1`) to resolve using that one, not using `resolved_compiler_path`
let x = get_compiler_info::<C>(
me.creator.clone(),
&path1,
&cwd,
env.as_slice(),
&me.pool,
dist_info.clone().map(|(p, _)| p),
// cache
let map_info = CompilerCacheEntry::new(c.clone(), mtime, dist_info);
trace!(
"Inserting POSSIBLY PROXIED cache map info for {:?}",
&resolved_compiler_path
);
Box::new(x.then(
move |info: Result<(
Box<dyn Compiler<C>>,
Option<Box<dyn CompilerProxy<C>>>,
)>| {
match info {
Ok((ref c, ref proxy)) => {
// register the proxy for this compiler, so it will be used directly from now on
// and the true/resolved compiler will create table hits in the hash map
// based on the resolved path
if let Some(proxy) = proxy {
trace!(
"Inserting new path proxy {:?} @ {:?} -> {:?}",
&path,
&cwd,
resolved_compiler_path
);
let proxy: Box<dyn CompilerProxy<C>> =
proxy.box_clone();
me.compiler_proxies
.borrow_mut()
.insert(path, (proxy, mtime));
}
// TODO add some safety checks in case a proxy exists, that the initial `path` is not
// TODO the same as the resolved compiler binary
// cache
let map_info =
CompilerCacheEntry::new(c.clone(), mtime, dist_info);
trace!(
"Inserting POSSIBLY PROXIED cache map info for {:?}",
&resolved_compiler_path
);
me.compilers
.borrow_mut()
.insert(resolved_compiler_path, Some(map_info));
}
Err(_) => {
trace!("Inserting PLAIN cache map info for {:?}", &path);
me.compilers.borrow_mut().insert(path, None);
}
}
// drop the proxy information, response is compiler only
let r: Result<Box<dyn Compiler<C>>> = info.map(|info| info.0);
f_ok(r)
},
))
me.compilers
.borrow_mut()
.insert(resolved_compiler_path, Some(map_info));
}
Err(_) => {
trace!("Inserting PLAIN cache map info for {:?}", &path);
me.compilers.borrow_mut().insert(path, None);
}
}
},
);
Box::new(obtain)
// drop the proxy information, response is compiler only
let r: Result<Box<dyn Compiler<C>>> = info.map(|info| info.0);
r
}
}
}
/// Check that we can handle and cache `cmd` when run with `compiler`.
@ -1089,7 +1072,7 @@ where
arguments: Vec<OsString>,
cwd: PathBuf,
env_vars: Vec<(OsString, OsString)>,
tx: mpsc::Sender<Result<Response>>,
mut tx: mpsc::Sender<Result<Response>>,
) {
let force_recache = env_vars
.iter()
@ -1101,27 +1084,30 @@ where
};
let out_pretty = hasher.output_pretty().into_owned();
let color_mode = hasher.color_mode();
let result = hasher.get_cached_or_compile(
self.dist_client.get_client(),
self.creator.clone(),
self.storage.clone(),
arguments,
cwd,
env_vars,
cache_control,
self.pool.clone(),
);
let result = hasher
.get_cached_or_compile(
self.dist_client.get_client(),
self.creator.clone(),
self.storage.clone(),
arguments,
cwd,
env_vars,
cache_control,
self.pool.clone(),
)
.compat();
let me = self.clone();
let kind = compiler.kind();
let task = result.then(move |result| {
let task = async move {
let result = result.await;
let mut cache_write = None;
let mut stats = me.stats.borrow_mut();
let mut res = CompileFinished {
color_mode,
..Default::default()
};
match result {
Ok((compiled, out)) => {
let mut stats = me.stats.borrow_mut();
match compiled {
CompileResult::Error => {
stats.cache_errors.increment(&kind);
@ -1155,7 +1141,7 @@ where
}
stats.cache_misses.increment(&kind);
stats.cache_read_miss_duration += duration;
cache_write = Some(future);
cache_write = Some(future.compat());
}
CompileResult::NotCacheable => {
stats.cache_misses.increment(&kind);
@ -1179,6 +1165,7 @@ where
res.stderr = stderr;
}
Err(err) => {
let mut stats = me.stats.borrow_mut();
match err.downcast::<ProcessError>() {
Ok(ProcessError(output)) => {
debug!("Compilation failed: {:?}", output);
@ -1219,36 +1206,36 @@ where
}
}
};
let send = tx.send(Ok(Response::CompileFinished(res)));
let send = Box::pin(async move { tx.send(Ok(Response::CompileFinished(res))).await });
let me = me.clone();
let cache_write = cache_write.then(move |result| {
match result {
Err(e) => {
debug!("Error executing cache write: {}", e);
me.stats.borrow_mut().cache_write_errors += 1;
let cache_write = async {
if let Some(cache_write) = cache_write {
match cache_write.await {
Err(e) => {
debug!("Error executing cache write: {}", e);
me.stats.borrow_mut().cache_write_errors += 1;
}
//TODO: save cache stats!
Ok(info) => {
debug!(
"[{}]: Cache write finished in {}",
info.object_file_pretty,
util::fmt_duration_as_secs(&info.duration)
);
me.stats.borrow_mut().cache_writes += 1;
me.stats.borrow_mut().cache_write_duration += info.duration;
}
}
//TODO: save cache stats!
Ok(Some(info)) => {
debug!(
"[{}]: Cache write finished in {}",
info.object_file_pretty,
util::fmt_duration_as_secs(&info.duration)
);
me.stats.borrow_mut().cache_writes += 1;
me.stats.borrow_mut().cache_write_duration += info.duration;
}
Ok(None) => {}
}
Ok(())
});
};
send.join(cache_write).then(|_| Ok(()))
});
future::try_join(send, cache_write).map(|_| Ok(())).await
};
tokio_compat::runtime::current_thread::TaskExecutor::current()
.spawn_local(Box::new(task))
.spawn_local(Box::new(Box::pin(task).compat()))
.unwrap();
}
}
@ -1555,7 +1542,7 @@ impl ServerInfo {
enum Frame<R, R1> {
Body { chunk: Option<R1> },
Message { message: R, body: bool },
Message { message: R },
}
struct Body<R> {
@ -1573,12 +1560,9 @@ impl<R> futures_03::Stream for Body<R> {
type Item = Result<R>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
cx: &mut Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match Pin::new(&mut self.receiver).poll().unwrap() {
Async::Ready(item) => std::task::Poll::Ready(item),
Async::NotReady => std::task::Poll::Pending,
}
Pin::new(&mut self.receiver).poll_next(cx)
}
}
@ -1596,6 +1580,32 @@ impl<R, B> Message<R, B> {
}
}
struct BincodeCodec;
impl<T> tokio_serde::Serializer<T> for BincodeCodec
where
T: serde::Serialize,
{
type Error = Error;
fn serialize(self: Pin<&mut Self>, item: &T) -> std::result::Result<Bytes, Self::Error> {
let mut bytes = BytesMut::new();
bincode::serialize_into((&mut bytes).writer(), item)?;
Ok(bytes.freeze())
}
}
impl<T> tokio_serde::Deserializer<T> for BincodeCodec
where
T: serde::de::DeserializeOwned,
{
type Error = Error;
fn deserialize(self: Pin<&mut Self>, buf: &BytesMut) -> std::result::Result<T, Self::Error> {
let ret = bincode::deserialize(buf)?;
Ok(ret)
}
}
/// Implementation of `Stream + Sink` that tokio-proto is expecting
///
/// This type is composed of a few layers:
@ -1611,51 +1621,53 @@ impl<R, B> Message<R, B> {
/// `Sink` implementation to switch from `BytesMut` to `Response` meaning that
/// all `Response` types pushed in will be converted to `BytesMut` and pushed
/// below.
struct SccacheTransport<I: AsyncRead + AsyncWrite> {
inner: WriteBincode<ReadBincode<Framed<I>, Request>, Response>,
struct SccacheTransport<I: AsyncRead + AsyncWrite + Unpin> {
inner: Framed<
futures_03::stream::ErrInto<
futures_03::sink::SinkErrInto<
tokio_util::codec::Framed<I, LengthDelimitedCodec>,
Bytes,
Error,
>,
Error,
>,
Request,
Response,
BincodeCodec,
>,
}
impl<I: AsyncRead + AsyncWrite> Stream for SccacheTransport<I> {
type Item = Message<Request, Body<()>>;
type Error = io::Error;
impl<I: AsyncRead + AsyncWrite + Unpin> Stream for SccacheTransport<I> {
type Item = Result<Message<Request, Body<()>>>;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
let msg = try_ready!(self.inner.poll().map_err(|e| {
error!("SccacheTransport::poll failed: {}", e);
io::Error::new(io::ErrorKind::Other, e)
}));
Ok(msg.map(Message::WithoutBody).into())
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner)
.poll_next(cx)
.map(|r| r.map(|s| s.map(Message::WithoutBody)))
}
}
impl<I: AsyncRead + AsyncWrite> Sink for SccacheTransport<I> {
type SinkItem = Frame<Response, Response>;
type SinkError = io::Error;
impl<I: AsyncRead + AsyncWrite + Unpin> Sink<Frame<Response, Response>> for SccacheTransport<I> {
type Error = Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, io::Error> {
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: Frame<Response, Response>) -> Result<()> {
match item {
Frame::Message { message, body } => match self.inner.start_send(message)? {
AsyncSink::Ready => Ok(AsyncSink::Ready),
AsyncSink::NotReady(message) => {
Ok(AsyncSink::NotReady(Frame::Message { message, body }))
}
},
Frame::Body { chunk: Some(chunk) } => match self.inner.start_send(chunk)? {
AsyncSink::Ready => Ok(AsyncSink::Ready),
AsyncSink::NotReady(chunk) => {
Ok(AsyncSink::NotReady(Frame::Body { chunk: Some(chunk) }))
}
},
Frame::Body { chunk: None } => Ok(AsyncSink::Ready),
Frame::Message { message } => Pin::new(&mut self.inner).start_send(message),
Frame::Body { chunk: Some(chunk) } => Pin::new(&mut self.inner).start_send(chunk),
Frame::Body { chunk: None } => Ok(()),
}
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
self.inner.poll_complete()
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn close(&mut self) -> Poll<(), io::Error> {
self.inner.close()
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}
@ -1666,29 +1678,26 @@ struct ShutdownOrInactive {
}
impl Future for ShutdownOrInactive {
type Item = ();
type Error = io::Error;
type Output = ();
fn poll(&mut self) -> Poll<(), io::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.rx.poll().unwrap() {
Async::NotReady => break,
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Pending => break,
// Shutdown received!
Async::Ready(Some(ServerMessage::Shutdown)) => return Ok(().into()),
Async::Ready(Some(ServerMessage::Request)) => {
Poll::Ready(Some(ServerMessage::Shutdown)) => return Poll::Ready(()),
Poll::Ready(Some(ServerMessage::Request)) => {
if self.timeout_dur != Duration::new(0, 0) {
self.timeout = Some(Delay::new(Instant::now() + self.timeout_dur));
self.timeout = Some(delay_for(self.timeout_dur));
}
}
// All services have shut down, in theory this isn't possible...
Async::Ready(None) => return Ok(().into()),
Poll::Ready(None) => return Poll::Ready(()),
}
}
match self.timeout {
None => Ok(Async::NotReady),
Some(ref mut timeout) => timeout
.poll()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err)),
None => Poll::Pending,
Some(ref mut timeout) => Pin::new(timeout).poll(cx),
}
}
}

Просмотреть файл

@ -20,6 +20,7 @@ use crate::mock_command::*;
use crate::server::{DistClientContainer, SccacheServer, ServerMessage};
use crate::test::utils::*;
use futures::sync::oneshot::{self, Sender};
use futures_03::compat::*;
use futures_03::executor::ThreadPool;
use std::fs::File;
use std::io::{Cursor, Write};
@ -92,7 +93,7 @@ where
let port = srv.port();
let creator = srv.command_creator().clone();
tx.send((port, creator)).unwrap();
srv.run(shutdown_rx).unwrap();
srv.run(shutdown_rx.compat()).unwrap();
});
let (port, creator) = rx.recv().unwrap();
(port, shutdown_tx, creator, handle)