Make the server send more info in the startup notification message.

Currently the server notifies the client launching it with a simple boolean
success value. Since we added serde as a dependency long ago we can do
better now and send more information, so reuse the `ServerStartup` enum
for that purpose by making it serializable. This way the client will be
able to print the actual error message encountered by the server during
startup. Additionally, we'll return the port the server is using when
startup is successful, so that callers could feasibly start a server with
`SCCACHE_SERVER_PORT=0`, let the server choose an arbitrary free port, and
get the port number from the client's stdout. This would be useful for
parallelizing the sccache system tests so that we can run more than one
server at a time.
This commit is contained in:
Ted Mielczarek 2018-07-27 14:16:01 -04:00
Родитель 8e21796623
Коммит a8606b1203
6 изменённых файлов: 82 добавлений и 63 удалений

Просмотреть файл

@ -22,9 +22,9 @@ use std::io::{
BufReader,
BufWriter,
Read,
Write,
};
use std::net::TcpStream;
use util;
/// A connection to an sccache server.
pub struct ServerConnection {
@ -47,12 +47,7 @@ impl ServerConnection {
/// Send `request` to the server, read and return a `Response`.
pub fn request(&mut self, request: Request) -> Result<Response> {
trace!("ServerConnection::request");
let bytes = bincode::serialize(&request, bincode::Infinite)?;
let mut len = [0; 4];
BigEndian::write_u32(&mut len, bytes.len() as u32);
self.writer.write_all(&len)?;
self.writer.write_all(&bytes)?;
self.writer.flush()?;
util::write_length_prefixed_bincode(&mut self.writer, request)?;
trace!("ServerConnection::request: sent request");
self.read_one_response()
}

Просмотреть файл

@ -13,6 +13,8 @@
// limitations under the License.
use atty::{self, Stream};
use bincode;
use byteorder::{ByteOrder, BigEndian};
use client::{
connect_to_server,
connect_with_retry,
@ -20,6 +22,7 @@ use client::{
};
use cmdline::{Command, StatsFormat};
use compiler::ColorMode;
use futures::Future;
use jobserver::Client;
use log::LogLevel::Trace;
use mock_command::{
@ -29,7 +32,7 @@ use mock_command::{
};
use protocol::{Request, Response, CompileResponse, CompileFinished, Compile};
use serde_json;
use server::{self, ServerInfo};
use server::{self, ServerInfo, ServerStartup};
use std::env;
use std::ffi::{OsStr,OsString};
use std::fs::{File, OpenOptions};
@ -45,6 +48,8 @@ use std::path::{
use std::process;
use strip_ansi_escapes::Writer;
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
use tokio_io::io::read_exact;
use util::run_input_output;
use which::which_in;
@ -56,17 +61,6 @@ pub const DEFAULT_PORT: u16 = 4226;
/// The number of milliseconds to wait for server startup.
const SERVER_STARTUP_TIMEOUT_MS: u32 = 5000;
// Should this just be a Result?
/// Result of background server startup.
enum ServerStartup {
/// Server started successfully.
Ok,
/// Timed out waiting for server startup.
TimedOut,
/// Server encountered an error.
Err(Error),
}
/// Get the port on which the server should listen.
fn get_port() -> u16 {
env::var("SCCACHE_SERVER_PORT")
@ -75,16 +69,26 @@ fn get_port() -> u16 {
.unwrap_or(DEFAULT_PORT)
}
fn read_server_startup_status<R: AsyncRead>(server: R) -> impl Future<Item=ServerStartup, Error=Error> {
// This is an async equivalent of ServerConnection::read_one_response
read_exact(server, [0u8; 4]).map_err(Error::from).and_then(|(server, bytes)| {
let len = BigEndian::read_u32(&bytes);
let data = vec![0; len as usize];
read_exact(server, data).map_err(Error::from).and_then(|(_server, data)| {
Ok(bincode::deserialize(&data)?)
})
})
}
/// Re-execute the current executable as a background server, and wait
/// for it to start up.
#[cfg(not(windows))]
fn run_server_process() -> Result<ServerStartup> {
extern crate tokio_uds;
use futures::{Future, Stream};
use futures::Stream;
use std::time::Duration;
use tempdir::TempDir;
use tokio_io::io::read_exact;
use tokio_core::reactor::Timeout;
trace!("run_server_process");
@ -101,20 +105,14 @@ fn run_server_process() -> Result<ServerStartup> {
.spawn()?;
let startup = listener.incoming().into_future().map_err(|e| e.0);
let startup = startup.and_then(|(socket, _rest)| {
let startup = startup.map_err(Error::from).and_then(|(socket, _rest)| {
let (socket, _addr) = socket.unwrap(); // incoming() never returns None
read_exact(socket, [0u8]).map(|(_socket, byte)| {
if byte[0] == 0 {
ServerStartup::Ok
} else {
let err = format!("Server startup failed: {}", byte[0]).into();
ServerStartup::Err(err)
}
})
read_server_startup_status(socket)
});
let timeout = Duration::from_millis(SERVER_STARTUP_TIMEOUT_MS.into());
let timeout = Timeout::new(timeout, &handle)?.map(|()| ServerStartup::TimedOut);
let timeout = Timeout::new(timeout, &handle)?.map_err(Error::from)
.map(|()| ServerStartup::TimedOut);
match core.run(startup.select(timeout)) {
Ok((e, _other)) => Ok(e),
Err((e, _other)) => Err(e.into()),
@ -241,14 +239,12 @@ fn redirect_error_log() -> Result<()> {
/// Re-execute the current executable as a background server.
#[cfg(windows)]
fn run_server_process() -> Result<ServerStartup> {
use futures::Future;
use kernel32;
use mio_named_pipes::NamedPipe;
use std::mem;
use std::os::windows::ffi::OsStrExt;
use std::ptr;
use std::time::Duration;
use tokio_io::io::read_exact;
use tokio_core::reactor::{Core, Timeout, PollEvented};
use uuid::Uuid;
use winapi::{CREATE_UNICODE_ENVIRONMENT,DETACHED_PROCESS,CREATE_NEW_PROCESS_GROUP};
@ -325,17 +321,11 @@ fn run_server_process() -> Result<ServerStartup> {
return Err(io::Error::last_os_error().into())
}
let result = read_exact(server, [0u8]).map(|(_socket, byte)| {
if byte[0] == 0 {
ServerStartup::Ok
} else {
let err = format!("Server startup failed: {}", byte[0]).into();
ServerStartup::Err(err)
}
});
let result = read_server_startup_status(server);
let timeout = Duration::from_millis(SERVER_STARTUP_TIMEOUT_MS.into());
let timeout = Timeout::new(timeout, &handle)?.map(|()| ServerStartup::TimedOut);
let timeout = Timeout::new(timeout, &handle)?.map_err(Error::from)
.map(|()| ServerStartup::TimedOut);
match core.run(result.select(timeout)) {
Ok((e, _other)) => Ok(e),
Err((e, _other)) => Err(e).chain_err(|| "failed waiting for server to start"),
@ -606,12 +596,16 @@ pub fn run_command(cmd: Command) -> Result<i32> {
"failed to start server process"
})?;
match startup {
ServerStartup::Ok => {}
ServerStartup::Ok { port } => {
if port != DEFAULT_PORT {
println!("Listening on port {}", port);
}
}
ServerStartup::TimedOut => {
bail!("Timed out waiting for server startup")
}
ServerStartup::Err(e) => {
return Err(e).chain_err(|| "Server startup error")
ServerStartup::Err { reason } => {
bail!("Server startup failed: {}", reason)
}
}
}

Просмотреть файл

@ -65,6 +65,7 @@ extern crate ring;
extern crate redis;
extern crate regex;
extern crate retry;
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;

Просмотреть файл

@ -60,13 +60,24 @@ use tokio_proto::streaming::pipeline::{Frame, ServerProto, Transport};
use tokio_proto::streaming::{Body, Message};
use tokio_serde_bincode::{ReadBincode, WriteBincode};
use tokio_service::Service;
use util::fmt_duration_as_secs;
use util; //::fmt_duration_as_secs;
use errors::*;
/// If the server is idle for this many seconds, shut down.
const DEFAULT_IDLE_TIMEOUT: u64 = 600;
/// Result of background server startup.
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerStartup {
/// Server started successfully on `port`.
Ok { port: u16 },
/// Timed out waiting for server startup.
TimedOut,
/// Server encountered an error.
Err { reason: String },
}
/// Get the time the server should idle for before shutting down.
fn get_idle_timeout() -> u64 {
// A value of 0 disables idle shutdown entirely.
@ -76,26 +87,24 @@ fn get_idle_timeout() -> u64 {
.unwrap_or(DEFAULT_IDLE_TIMEOUT)
}
fn notify_server_startup_internal<W: Write>(mut w: W, success: bool) -> io::Result<()> {
let data = [ if success { 0 } else { 1 }; 1];
w.write_all(&data)?;
Ok(())
fn notify_server_startup_internal<W: Write>(mut w: W, status: ServerStartup) -> Result<()> {
util::write_length_prefixed_bincode(&mut w, status)
}
#[cfg(unix)]
fn notify_server_startup(name: &Option<OsString>, success: bool) -> io::Result<()> {
fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
use std::os::unix::net::UnixStream;
let name = match *name {
Some(ref s) => s,
None => return Ok(()),
};
debug!("notify_server_startup(success: {})", success);
debug!("notify_server_startup({:?})", status);
let stream = UnixStream::connect(name)?;
notify_server_startup_internal(stream, success)
notify_server_startup_internal(stream, status)
}
#[cfg(windows)]
fn notify_server_startup(name: &Option<OsString>, success: bool) -> io::Result<()> {
fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
use std::fs::OpenOptions;
let name = match *name {
@ -103,7 +112,7 @@ fn notify_server_startup(name: &Option<OsString>, success: bool) -> io::Result<(
None => return Ok(()),
};
let pipe = try!(OpenOptions::new().write(true).read(true).open(name));
notify_server_startup_internal(pipe, success)
notify_server_startup_internal(pipe, status)
}
#[cfg(unix)]
@ -121,7 +130,7 @@ fn get_signal(_status: ExitStatus) -> i32 {
/// Spins an event loop handling client connections until a client
/// requests a shutdown.
pub fn start_server(port: u16) -> Result<()> {
trace!("start_server");
info!("start_server: port: {}", port);
let client = unsafe { Client::new() };
let core = Core::new()?;
let pool = CpuPool::new(20);
@ -130,12 +139,16 @@ pub fn start_server(port: u16) -> Result<()> {
let notify = env::var_os("SCCACHE_STARTUP_NOTIFY");
match res {
Ok(srv) => {
notify_server_startup(&notify, true)?;
let port = srv.port();
info!("server started, listening on port {}", port);
notify_server_startup(&notify, ServerStartup::Ok { port })?;
srv.run(future::empty::<(), ()>())?;
Ok(())
}
Err(e) => {
notify_server_startup(&notify, false)?;
error!("failed to start server: {}", e);
let reason = e.to_string();
notify_server_startup(&notify, ServerStartup::Err { reason })?;
Err(e)
}
}
@ -651,7 +664,7 @@ impl<C> SccacheService<C>
Ok(Some(info)) => {
debug!("[{}]: Cache write finished in {}",
info.object_file_pretty,
fmt_duration_as_secs(&info.duration));
util::fmt_duration_as_secs(&info.duration));
me.stats.borrow_mut().cache_writes += 1;
me.stats.borrow_mut().cache_write_duration += info.duration;
}
@ -763,7 +776,7 @@ impl ServerStats {
Default::default()
};
// name, value, suffix length
$vec.push(($name, fmt_duration_as_secs(&s), 2));
$vec.push(($name, util::fmt_duration_as_secs(&s), 2));
}};
}

Просмотреть файл

@ -262,7 +262,6 @@ fn test_server_port_in_use() {
.unwrap();
assert!(!output.status.success());
let s = String::from_utf8_lossy(&output.stderr);
assert!(s.contains("Server startup failed:"),
"Output did not contain 'Failed to start server:':\n========\n{}\n========",
s);
const MSG: &str = "Server startup failed:";
assert!(s.contains(MSG), "Output did not contain '{}':\n========\n{}\n========", MSG, s);
}

Просмотреть файл

@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use bincode;
use byteorder::{ByteOrder, BigEndian};
use futures::Future;
use futures_cpupool::CpuPool;
use mock_command::{CommandChild, RunCommand};
use ring::digest::{SHA512, Context};
use serde::Serialize;
use std::ffi::{OsStr, OsString};
use std::fs::File;
use std::hash::Hasher;
@ -154,6 +157,20 @@ pub fn run_input_output<C>(mut command: C, input: Option<Vec<u8>>)
}))
}
/// Write `data` to `writer` with bincode serialization, prefixed by a `u32` length.
pub fn write_length_prefixed_bincode<W, S>(mut writer: W, data: S) -> Result<()>
where W: Write,
S: Serialize,
{
let bytes = bincode::serialize(&data, bincode::Infinite)?;
let mut len = [0; 4];
BigEndian::write_u32(&mut len, bytes.len() as u32);
writer.write_all(&len)?;
writer.write_all(&bytes)?;
writer.flush()?;
Ok(())
}
pub trait OsStrExt {
fn starts_with(&self, s: &str) -> bool;
fn split_prefix(&self, s: &str) -> Option<OsString>;