From a8606b1203bfbfb36a43c586d27d42d2d9ea3869 Mon Sep 17 00:00:00 2001 From: Ted Mielczarek Date: Fri, 27 Jul 2018 14:16:01 -0400 Subject: [PATCH] Make the server send more info in the startup notification message. Currently the server notifies the client launching it with a simple boolean success value. Since we added serde as a dependency long ago we can do better now and send more information, so reuse the `ServerStartup` enum for that purpose by making it serializable. This way the client will be able to print the actual error message encountered by the server during startup. Additionally, we'll return the port the server is using when startup is successful, so that callers could feasibly start a server with `SCCACHE_SERVER_PORT=0`, let the server choose an arbitrary free port, and get the port number from the client's stdout. This would be useful for parallelizing the sccache system tests so that we can run more than one server at a time. --- src/client.rs | 9 ++---- src/commands.rs | 70 ++++++++++++++++++++++------------------------- src/lib.rs | 1 + src/server.rs | 43 +++++++++++++++++++---------- src/test/tests.rs | 5 ++-- src/util.rs | 17 ++++++++++++ 6 files changed, 82 insertions(+), 63 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4cebdd62..cfea387a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -22,9 +22,9 @@ use std::io::{ BufReader, BufWriter, Read, - Write, }; use std::net::TcpStream; +use util; /// A connection to an sccache server. pub struct ServerConnection { @@ -47,12 +47,7 @@ impl ServerConnection { /// Send `request` to the server, read and return a `Response`. pub fn request(&mut self, request: Request) -> Result { trace!("ServerConnection::request"); - let bytes = bincode::serialize(&request, bincode::Infinite)?; - let mut len = [0; 4]; - BigEndian::write_u32(&mut len, bytes.len() as u32); - self.writer.write_all(&len)?; - self.writer.write_all(&bytes)?; - self.writer.flush()?; + util::write_length_prefixed_bincode(&mut self.writer, request)?; trace!("ServerConnection::request: sent request"); self.read_one_response() } diff --git a/src/commands.rs b/src/commands.rs index e394eec9..ba7f3446 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -13,6 +13,8 @@ // limitations under the License. use atty::{self, Stream}; +use bincode; +use byteorder::{ByteOrder, BigEndian}; use client::{ connect_to_server, connect_with_retry, @@ -20,6 +22,7 @@ use client::{ }; use cmdline::{Command, StatsFormat}; use compiler::ColorMode; +use futures::Future; use jobserver::Client; use log::LogLevel::Trace; use mock_command::{ @@ -29,7 +32,7 @@ use mock_command::{ }; use protocol::{Request, Response, CompileResponse, CompileFinished, Compile}; use serde_json; -use server::{self, ServerInfo}; +use server::{self, ServerInfo, ServerStartup}; use std::env; use std::ffi::{OsStr,OsString}; use std::fs::{File, OpenOptions}; @@ -45,6 +48,8 @@ use std::path::{ use std::process; use strip_ansi_escapes::Writer; use tokio_core::reactor::Core; +use tokio_io::AsyncRead; +use tokio_io::io::read_exact; use util::run_input_output; use which::which_in; @@ -56,17 +61,6 @@ pub const DEFAULT_PORT: u16 = 4226; /// The number of milliseconds to wait for server startup. const SERVER_STARTUP_TIMEOUT_MS: u32 = 5000; -// Should this just be a Result? -/// Result of background server startup. -enum ServerStartup { - /// Server started successfully. - Ok, - /// Timed out waiting for server startup. - TimedOut, - /// Server encountered an error. - Err(Error), -} - /// Get the port on which the server should listen. fn get_port() -> u16 { env::var("SCCACHE_SERVER_PORT") @@ -75,16 +69,26 @@ fn get_port() -> u16 { .unwrap_or(DEFAULT_PORT) } +fn read_server_startup_status(server: R) -> impl Future { + // This is an async equivalent of ServerConnection::read_one_response + read_exact(server, [0u8; 4]).map_err(Error::from).and_then(|(server, bytes)| { + let len = BigEndian::read_u32(&bytes); + let data = vec![0; len as usize]; + read_exact(server, data).map_err(Error::from).and_then(|(_server, data)| { + Ok(bincode::deserialize(&data)?) + }) + }) +} + /// Re-execute the current executable as a background server, and wait /// for it to start up. #[cfg(not(windows))] fn run_server_process() -> Result { extern crate tokio_uds; - use futures::{Future, Stream}; + use futures::Stream; use std::time::Duration; use tempdir::TempDir; - use tokio_io::io::read_exact; use tokio_core::reactor::Timeout; trace!("run_server_process"); @@ -101,20 +105,14 @@ fn run_server_process() -> Result { .spawn()?; let startup = listener.incoming().into_future().map_err(|e| e.0); - let startup = startup.and_then(|(socket, _rest)| { + let startup = startup.map_err(Error::from).and_then(|(socket, _rest)| { let (socket, _addr) = socket.unwrap(); // incoming() never returns None - read_exact(socket, [0u8]).map(|(_socket, byte)| { - if byte[0] == 0 { - ServerStartup::Ok - } else { - let err = format!("Server startup failed: {}", byte[0]).into(); - ServerStartup::Err(err) - } - }) + read_server_startup_status(socket) }); let timeout = Duration::from_millis(SERVER_STARTUP_TIMEOUT_MS.into()); - let timeout = Timeout::new(timeout, &handle)?.map(|()| ServerStartup::TimedOut); + let timeout = Timeout::new(timeout, &handle)?.map_err(Error::from) + .map(|()| ServerStartup::TimedOut); match core.run(startup.select(timeout)) { Ok((e, _other)) => Ok(e), Err((e, _other)) => Err(e.into()), @@ -241,14 +239,12 @@ fn redirect_error_log() -> Result<()> { /// Re-execute the current executable as a background server. #[cfg(windows)] fn run_server_process() -> Result { - use futures::Future; use kernel32; use mio_named_pipes::NamedPipe; use std::mem; use std::os::windows::ffi::OsStrExt; use std::ptr; use std::time::Duration; - use tokio_io::io::read_exact; use tokio_core::reactor::{Core, Timeout, PollEvented}; use uuid::Uuid; use winapi::{CREATE_UNICODE_ENVIRONMENT,DETACHED_PROCESS,CREATE_NEW_PROCESS_GROUP}; @@ -325,17 +321,11 @@ fn run_server_process() -> Result { return Err(io::Error::last_os_error().into()) } - let result = read_exact(server, [0u8]).map(|(_socket, byte)| { - if byte[0] == 0 { - ServerStartup::Ok - } else { - let err = format!("Server startup failed: {}", byte[0]).into(); - ServerStartup::Err(err) - } - }); + let result = read_server_startup_status(server); let timeout = Duration::from_millis(SERVER_STARTUP_TIMEOUT_MS.into()); - let timeout = Timeout::new(timeout, &handle)?.map(|()| ServerStartup::TimedOut); + let timeout = Timeout::new(timeout, &handle)?.map_err(Error::from) + .map(|()| ServerStartup::TimedOut); match core.run(result.select(timeout)) { Ok((e, _other)) => Ok(e), Err((e, _other)) => Err(e).chain_err(|| "failed waiting for server to start"), @@ -606,12 +596,16 @@ pub fn run_command(cmd: Command) -> Result { "failed to start server process" })?; match startup { - ServerStartup::Ok => {} + ServerStartup::Ok { port } => { + if port != DEFAULT_PORT { + println!("Listening on port {}", port); + } + } ServerStartup::TimedOut => { bail!("Timed out waiting for server startup") } - ServerStartup::Err(e) => { - return Err(e).chain_err(|| "Server startup error") + ServerStartup::Err { reason } => { + bail!("Server startup failed: {}", reason) } } } diff --git a/src/lib.rs b/src/lib.rs index f78089f4..3978db2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,6 +65,7 @@ extern crate ring; extern crate redis; extern crate regex; extern crate retry; +extern crate serde; extern crate serde_json; #[macro_use] extern crate serde_derive; diff --git a/src/server.rs b/src/server.rs index 829a6526..0189e549 100644 --- a/src/server.rs +++ b/src/server.rs @@ -60,13 +60,24 @@ use tokio_proto::streaming::pipeline::{Frame, ServerProto, Transport}; use tokio_proto::streaming::{Body, Message}; use tokio_serde_bincode::{ReadBincode, WriteBincode}; use tokio_service::Service; -use util::fmt_duration_as_secs; +use util; //::fmt_duration_as_secs; use errors::*; /// If the server is idle for this many seconds, shut down. const DEFAULT_IDLE_TIMEOUT: u64 = 600; +/// Result of background server startup. +#[derive(Debug, Serialize, Deserialize)] +pub enum ServerStartup { + /// Server started successfully on `port`. + Ok { port: u16 }, + /// Timed out waiting for server startup. + TimedOut, + /// Server encountered an error. + Err { reason: String }, +} + /// Get the time the server should idle for before shutting down. fn get_idle_timeout() -> u64 { // A value of 0 disables idle shutdown entirely. @@ -76,26 +87,24 @@ fn get_idle_timeout() -> u64 { .unwrap_or(DEFAULT_IDLE_TIMEOUT) } -fn notify_server_startup_internal(mut w: W, success: bool) -> io::Result<()> { - let data = [ if success { 0 } else { 1 }; 1]; - w.write_all(&data)?; - Ok(()) +fn notify_server_startup_internal(mut w: W, status: ServerStartup) -> Result<()> { + util::write_length_prefixed_bincode(&mut w, status) } #[cfg(unix)] -fn notify_server_startup(name: &Option, success: bool) -> io::Result<()> { +fn notify_server_startup(name: &Option, status: ServerStartup) -> Result<()> { use std::os::unix::net::UnixStream; let name = match *name { Some(ref s) => s, None => return Ok(()), }; - debug!("notify_server_startup(success: {})", success); + debug!("notify_server_startup({:?})", status); let stream = UnixStream::connect(name)?; - notify_server_startup_internal(stream, success) + notify_server_startup_internal(stream, status) } #[cfg(windows)] -fn notify_server_startup(name: &Option, success: bool) -> io::Result<()> { +fn notify_server_startup(name: &Option, status: ServerStartup) -> Result<()> { use std::fs::OpenOptions; let name = match *name { @@ -103,7 +112,7 @@ fn notify_server_startup(name: &Option, success: bool) -> io::Result<( None => return Ok(()), }; let pipe = try!(OpenOptions::new().write(true).read(true).open(name)); - notify_server_startup_internal(pipe, success) + notify_server_startup_internal(pipe, status) } #[cfg(unix)] @@ -121,7 +130,7 @@ fn get_signal(_status: ExitStatus) -> i32 { /// Spins an event loop handling client connections until a client /// requests a shutdown. pub fn start_server(port: u16) -> Result<()> { - trace!("start_server"); + info!("start_server: port: {}", port); let client = unsafe { Client::new() }; let core = Core::new()?; let pool = CpuPool::new(20); @@ -130,12 +139,16 @@ pub fn start_server(port: u16) -> Result<()> { let notify = env::var_os("SCCACHE_STARTUP_NOTIFY"); match res { Ok(srv) => { - notify_server_startup(¬ify, true)?; + let port = srv.port(); + info!("server started, listening on port {}", port); + notify_server_startup(¬ify, ServerStartup::Ok { port })?; srv.run(future::empty::<(), ()>())?; Ok(()) } Err(e) => { - notify_server_startup(¬ify, false)?; + error!("failed to start server: {}", e); + let reason = e.to_string(); + notify_server_startup(¬ify, ServerStartup::Err { reason })?; Err(e) } } @@ -651,7 +664,7 @@ impl SccacheService Ok(Some(info)) => { debug!("[{}]: Cache write finished in {}", info.object_file_pretty, - fmt_duration_as_secs(&info.duration)); + util::fmt_duration_as_secs(&info.duration)); me.stats.borrow_mut().cache_writes += 1; me.stats.borrow_mut().cache_write_duration += info.duration; } @@ -763,7 +776,7 @@ impl ServerStats { Default::default() }; // name, value, suffix length - $vec.push(($name, fmt_duration_as_secs(&s), 2)); + $vec.push(($name, util::fmt_duration_as_secs(&s), 2)); }}; } diff --git a/src/test/tests.rs b/src/test/tests.rs index a64efae7..005f8266 100644 --- a/src/test/tests.rs +++ b/src/test/tests.rs @@ -262,7 +262,6 @@ fn test_server_port_in_use() { .unwrap(); assert!(!output.status.success()); let s = String::from_utf8_lossy(&output.stderr); - assert!(s.contains("Server startup failed:"), - "Output did not contain 'Failed to start server:':\n========\n{}\n========", - s); + const MSG: &str = "Server startup failed:"; + assert!(s.contains(MSG), "Output did not contain '{}':\n========\n{}\n========", MSG, s); } diff --git a/src/util.rs b/src/util.rs index b028bd17..9a19c7e9 100644 --- a/src/util.rs +++ b/src/util.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use bincode; +use byteorder::{ByteOrder, BigEndian}; use futures::Future; use futures_cpupool::CpuPool; use mock_command::{CommandChild, RunCommand}; use ring::digest::{SHA512, Context}; +use serde::Serialize; use std::ffi::{OsStr, OsString}; use std::fs::File; use std::hash::Hasher; @@ -154,6 +157,20 @@ pub fn run_input_output(mut command: C, input: Option>) })) } +/// Write `data` to `writer` with bincode serialization, prefixed by a `u32` length. +pub fn write_length_prefixed_bincode(mut writer: W, data: S) -> Result<()> + where W: Write, + S: Serialize, +{ + let bytes = bincode::serialize(&data, bincode::Infinite)?; + let mut len = [0; 4]; + BigEndian::write_u32(&mut len, bytes.len() as u32); + writer.write_all(&len)?; + writer.write_all(&bytes)?; + writer.flush()?; + Ok(()) +} + pub trait OsStrExt { fn starts_with(&self, s: &str) -> bool; fn split_prefix(&self, s: &str) -> Option;