Bits and pieces for distributed Rust

This commit is contained in:
Aidan Hobson Sayers 2018-04-12 02:25:15 +01:00 коммит произвёл Ted Mielczarek
Родитель 8ac19f2af7
Коммит 85acb04a46
9 изменённых файлов: 417 добавлений и 60 удалений

33
Cargo.lock сгенерированный
Просмотреть файл

@ -261,6 +261,16 @@ dependencies = [
"redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "filetime"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "flate2"
version = "0.2.20"
@ -974,6 +984,7 @@ dependencies = [
"serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"strip-ansi-escapes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tar 0.4.15 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1153,6 +1164,17 @@ name = "take"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "tar"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"filetime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
"xattr 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tempdir"
version = "0.3.6"
@ -1517,6 +1539,14 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "xattr"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "zip"
version = "0.2.3"
@ -1563,6 +1593,7 @@ dependencies = [
"checksum error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff511d5dc435d703f4971bc399647c9bc38e20cb41452e3b9feb4765419ed3f3"
"checksum escargot 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddeb6acd0f12aeba309aa295bcad6d4d88603406640bf2595c887a7a9684a4bd"
"checksum filetime 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "714653f3e34871534de23771ac7b26e999651a0a228f47beb324dfdf1dd4b10f"
"checksum filetime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "08530a39af0bd442c40aabb9e854f442a83bd2403feb1ed58fbe982dec2385f3"
"checksum flate2 0.2.20 (registry+https://github.com/rust-lang/crates.io-index)" = "e6234dd4468ae5d1e2dbb06fe2b058696fdc50a339c68a393aefbf00bc81e423"
"checksum float-cmp 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "134a8fa843d80a51a5b77d36d42bc2def9edcb0262c914861d08129fd1926600"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
@ -1661,6 +1692,7 @@ dependencies = [
"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad"
"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6"
"checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5"
"checksum tar 0.4.15 (registry+https://github.com/rust-lang/crates.io-index)" = "6af6b94659f9a571bf769a5b71f54079393585ee0bfdd71b691be22d7d6b1d18"
"checksum tempdir 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f73eebdb68c14bcb24aef74ea96079830e7fa7b31a6106e42ea7ee887c1e134e"
"checksum tempfile 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "11ce2fe9db64b842314052e2421ac61a73ce41b898dc8e3750398b219c5fc1e0"
"checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096"
@ -1704,4 +1736,5 @@ dependencies = [
"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
"checksum xattr 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "abb373b92de38a4301d66bec009929b4fb83120ea1c4a401be89dbe0b9777443"
"checksum zip 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a8e9988af1aa47bb7ccb1a61fd1261c45f646dda65ea00c6562d6b611403acf9"

Просмотреть файл

@ -15,7 +15,7 @@ appveyor = { repository = "mozilla/sccache" }
[dependencies]
base64 = "0.9.0"
bincode = "0.9"
bincode = "0.9" # TODO: update to 1.0
byteorder = "1.0"
chrono = { version = "0.3", optional = true }
clap = "2.23.0"
@ -46,6 +46,7 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
strip-ansi-escapes = "0.1"
tar = "0.4"
tempdir = "0.3.4"
tempfile = "2.1.5"
time = "0.1.35"

Просмотреть файл

@ -23,6 +23,8 @@ use std::ffi::OsString;
use std::path::PathBuf;
use which::which_in;
use dist;
arg_enum!{
#[derive(Debug)]
#[allow(non_camel_case_types)]
@ -90,6 +92,23 @@ pub fn get_app<'a, 'b>() -> App<'a, 'b> {
pub fn parse() -> Result<Command> {
trace!("parse");
let cwd = env::current_dir().chain_err(|| "sccache: Couldn't determine current working directory")?;
let start_daemon_worker = match env::var("SCCACHE_START_DAEMON_WORKER") {
Ok(val) => val == "1",
Err(_) => false,
};
if start_daemon_worker {
let builder = dist::SccacheBuilder::new();
let server = dist::SccacheDaemonServer::new(Box::new(builder));
server.start()
}
let start_scheduler = match env::var("SCCACHE_START_SCHEDULER") {
Ok(val) => val == "1",
Err(_) => false,
};
if start_scheduler {
let scheduler = dist::SccacheScheduler::new();
scheduler.start()
}
// The internal start server command is passed in the environment.
let internal_start_server = match env::var("SCCACHE_START_SERVER") {
Ok(val) => val == "1",

Просмотреть файл

@ -14,6 +14,7 @@
use compiler::{Cacheable, ColorMode, Compiler, CompilerArguments, CompilerHasher, CompilerKind,
Compilation, HashResult};
use dist;
use futures::Future;
use futures_cpupool::CpuPool;
use mock_command::CommandCreatorSync;
@ -24,6 +25,7 @@ use std::fmt;
use std::hash::Hash;
use std::path::{Path, PathBuf};
use std::process;
use std::sync::Arc;
use util::{HashToDigest, Digest};
use errors::*;
@ -202,6 +204,7 @@ impl<T, I> CompilerHasher<T> for CCompilerHasher<I>
I: CCompilerImpl,
{
fn generate_hash_key(self: Box<Self>,
_daemon_client: Arc<dist::DaemonClientRequester>,
creator: &T,
cwd: &Path,
env_vars: &[(OsString, OsString)],
@ -252,6 +255,7 @@ impl<T, I> CompilerHasher<T> for CCompilerHasher<I>
executable: executable,
compiler: compiler,
}),
dist_toolchain: f_err("cannot package toolchain for Rust compilers"),
})
}))
}

Просмотреть файл

@ -23,6 +23,7 @@ use compiler::clang::Clang;
use compiler::gcc::GCC;
use compiler::msvc::MSVC;
use compiler::rust::Rust;
use dist::{self, DaemonClientRequester};
use futures::{Future, IntoFuture};
use futures_cpupool::CpuPool;
use mock_command::{
@ -40,7 +41,7 @@ use std::fs;
use std::fs::File;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::process::{self,Stdio};
use std::process::{self, Stdio};
use std::str;
use std::sync::Arc;
use std::time::{
@ -89,6 +90,7 @@ pub trait CompilerHasher<T>: fmt::Debug + Send + 'static
/// that can be used for cache lookups, as well as any additional
/// information that can be reused for compilation if necessary.
fn generate_hash_key(self: Box<Self>,
daemon_client: Arc<dist::DaemonClientRequester>,
creator: &T,
cwd: &Path,
env_vars: &[(OsString, OsString)],
@ -111,18 +113,19 @@ pub trait CompilerHasher<T>: fmt::Debug + Send + 'static
handle: Handle)
-> SFuture<(CompileResult, process::Output)>
{
let daemon_client = Arc::new(dist::SccacheDaemonClient::new()); // TODO: pass this in from elsewhere
let out_pretty = self.output_pretty().into_owned();
debug!("[{}]: get_cached_or_compile: {:?}", out_pretty, arguments);
let start = Instant::now();
let result = self.generate_hash_key(&creator, &cwd, &env_vars, &pool);
let result = self.generate_hash_key(daemon_client.clone(), &creator, &cwd, &env_vars, &pool);
Box::new(result.then(move |res| -> SFuture<_> {
debug!("[{}]: generate_hash_key took {}", out_pretty, fmt_duration_as_secs(&start.elapsed()));
let (key, compilation) = match res {
let (key, compilation, dist_toolchain) = match res {
Err(Error(ErrorKind::ProcessError(output), _)) => {
return f_ok((CompileResult::Error, output));
}
Err(e) => return f_err(e),
Ok(HashResult { key, compilation }) => (key, compilation),
Ok(HashResult { key, compilation, dist_toolchain }) => (key, compilation, dist_toolchain),
};
trace!("[{}]: Hash key: {}", out_pretty, key);
// If `ForceRecache` is enabled, we won't check the cache.
@ -212,7 +215,17 @@ pub trait CompilerHasher<T>: fmt::Debug + Send + 'static
// Cache miss, so compile it.
let start = Instant::now();
let compile = compilation.compile(&creator, &cwd, &env_vars);
let compile = if let Some(dist_reqs_fut) = compilation.generate_dist_requests(&cwd, &env_vars, dist_toolchain) {
debug!("[{}]: Attempting distributed compilation", out_pretty);
Box::new(dist_reqs_fut.and_then(|(jareq, jreq)|
daemon_client.do_allocation_request(jareq)
.and_then(move |jares| {
daemon_client.do_compile_request(jares, jreq)
}).map(|jres| (Cacheable::No, jres.output.into())) // TODO: allow caching
))
} else {
compilation.compile(&creator, &cwd, &env_vars)
};
Box::new(compile.and_then(move |(cacheable, compiler_result)| {
let duration = start.elapsed();
if !compiler_result.status.success() {
@ -298,6 +311,13 @@ pub trait Compilation<T>
env_vars: &[(OsString, OsString)])
-> SFuture<(Cacheable, process::Output)>;
/// Generate the requests that will be used to perform a distributed compilation
fn generate_dist_requests(&self,
_cwd: &Path,
_env_vars: &[(OsString, OsString)],
_toolchain: SFuture<dist::Toolchain>)
-> Option<SFuture<(dist::JobAllocRequest, dist::JobRequest)>> { None }
/// Returns an iterator over the results of this compilation.
///
/// Each item is a descriptive (and unique) name of the output paired with
@ -311,6 +331,8 @@ pub struct HashResult<T: CommandCreatorSync> {
pub key: String,
/// An object to use for the actual compilation, if necessary.
pub compilation: Box<Compilation<T> + 'static>,
/// A future that may resolve to a packaged toolchain if required.
pub dist_toolchain: SFuture<dist::Toolchain>,
}
/// Possible results of parsing compiler arguments.

Просмотреть файл

@ -15,6 +15,7 @@
use compiler::{Cacheable, ColorMode, Compiler, CompilerArguments, CompilerHasher, CompilerKind,
Compilation, HashResult};
use compiler::args::*;
use dist;
use futures::{Future, future};
use futures_cpupool::CpuPool;
use log::LogLevel::Trace;
@ -30,7 +31,9 @@ use std::io::Read;
use std::iter;
use std::path::{Path, PathBuf};
use std::process::{self, Stdio};
use std::sync::Arc;
use std::time::Instant;
use tar;
use tempdir::TempDir;
use util::{fmt_duration_as_secs, run_input_output, Digest};
use util::{HashToDigest, OsStrExt};
@ -50,6 +53,8 @@ const LIBS_DIR: &str = "bin";
pub struct Rust {
/// The path to the rustc executable.
executable: PathBuf,
/// The path to the rustc sysroot.
sysroot: PathBuf,
/// The SHA-1 digests of all the shared libraries in rustc's $sysroot/lib (or /bin on Windows).
compiler_shlibs_digests: Vec<String>,
}
@ -59,6 +64,8 @@ pub struct Rust {
pub struct RustHasher {
/// The path to the rustc executable.
executable: PathBuf,
/// The path to the rustc sysroot.
sysroot: PathBuf,
/// The SHA-1 digests of all the shared libraries in rustc's $sysroot/lib (or /bin on Windows).
compiler_shlibs_digests: Vec<String>,
parsed_args: ParsedArguments,
@ -236,9 +243,10 @@ impl Rust {
.stderr(Stdio::null())
.arg("--print=sysroot");
let output = run_input_output(cmd, None);
let libs = output.and_then(move |output| -> Result<_> {
let sysroot_and_libs = output.and_then(move |output| -> Result<_> {
let outstr = String::from_utf8(output.stdout).chain_err(|| "Error parsing sysroot")?;
let libs_path = Path::new(outstr.trim_right()).join(LIBS_DIR);
let sysroot = PathBuf::from(outstr.trim_right());
let libs_path = sysroot.join(LIBS_DIR);
let mut libs = fs::read_dir(&libs_path).chain_err(|| format!("Failed to list rustc sysroot: `{:?}`", libs_path))?.filter_map(|e| {
e.ok().and_then(|e| {
e.file_type().ok().and_then(|t| {
@ -252,12 +260,13 @@ impl Rust {
})
}).collect::<Vec<_>>();
libs.sort();
Ok(libs)
Ok((sysroot, libs))
});
Box::new(libs.and_then(move |libs| {
Box::new(sysroot_and_libs.and_then(move |(sysroot, libs)| {
hash_all(libs, &pool).map(move |digests| {
Rust {
executable: executable,
sysroot,
compiler_shlibs_digests: digests,
}
})
@ -288,6 +297,7 @@ impl<T> Compiler<T> for Rust
CompilerArguments::Ok(args) => {
CompilerArguments::Ok(Box::new(RustHasher {
executable: self.executable.clone(),
sysroot: self.sysroot.clone(),
compiler_shlibs_digests: self.compiler_shlibs_digests.clone(),
parsed_args: args,
}))
@ -576,6 +586,7 @@ impl<T> CompilerHasher<T> for RustHasher
where T: CommandCreatorSync,
{
fn generate_hash_key(self: Box<Self>,
daemon_client: Arc<dist::DaemonClientRequester>,
creator: &T,
cwd: &Path,
env_vars: &[(OsString, OsString)],
@ -583,7 +594,7 @@ impl<T> CompilerHasher<T> for RustHasher
-> SFuture<HashResult<T>>
{
let me = *self;
let RustHasher { executable, compiler_shlibs_digests, parsed_args: ParsedArguments { arguments, output_dir, externs, staticlibs, crate_name, dep_info, color_mode: _ } } = me;
let RustHasher { executable, sysroot, compiler_shlibs_digests, parsed_args: ParsedArguments { arguments, output_dir, externs, staticlibs, crate_name, dep_info, color_mode: _ } } = me;
trace!("[{}]: generate_hash_key", crate_name);
// `filtered_arguments` omits --emit and --out-dir arguments.
// It's used for invoking rustc with `--emit=dep-info` to get the list of
@ -616,6 +627,7 @@ impl<T> CompilerHasher<T> for RustHasher
let creator = creator.clone();
let cwd = cwd.to_owned();
let env_vars = env_vars.to_vec();
let toolchain_pool = pool.clone();
let hashes = source_hashes.join3(extern_hashes, staticlib_hashes);
Box::new(hashes.and_then(move |(source_hashes, extern_hashes, staticlib_hashes)|
-> SFuture<_> {
@ -690,6 +702,22 @@ impl<T> CompilerHasher<T> for RustHasher
let p = output_dir.join(&dep_info);
outputs.insert(dep_info.to_string_lossy().into_owned(), p);
}
// CPU pool futures are eager, delay until poll is called
let toolchain_future = Box::new(future::lazy(move || {
toolchain_pool.spawn_fn(move || {
let path = daemon_client.toolchain_cache(&mut |f| {
let mut builder = tar::Builder::new(f);
// TODO: attempt to mimic original layout
// TODO: FnBox would remove need for this clone
builder.append_dir_all("", sysroot.clone()).unwrap();
builder.finish().unwrap()
});
future::ok(dist::Toolchain {
docker_img: "ubuntu:16.04".to_owned(),
archive: path,
})
})
}));
HashResult {
key: m.finish(),
compilation: Box::new(RustCompilation {
@ -698,6 +726,7 @@ impl<T> CompilerHasher<T> for RustHasher
outputs: outputs,
crate_name: crate_name,
}),
dist_toolchain: toolchain_future,
}
}))
}))
@ -739,6 +768,29 @@ impl<T> Compilation<T> for RustCompilation
}))
}
fn generate_dist_requests(&self,
cwd: &Path,
env_vars: &[(OsString, OsString)],
toolchain: SFuture<dist::Toolchain>)
-> Option<SFuture<(dist::JobAllocRequest, dist::JobRequest)>> {
let executable = self.executable.clone();
let arguments = self.arguments.clone();
let cwd = cwd.to_owned();
let env_vars = env_vars.to_owned();
Some(Box::new(toolchain.map(move |toolchain| (
dist::JobAllocRequest {
toolchain: toolchain.clone(),
},
dist::JobRequest {
executable: PathBuf::from("/toolchain/bin/rustc"),
arguments,
cwd,
env_vars,
toolchain,
}
))))
}
fn outputs<'a>(&'a self) -> Box<Iterator<Item=(&'a str, &'a Path)> + 'a> {
Box::new(self.outputs.iter().map(|(k, v)| (k.as_str(), &**v)))
}

322
src/dist/mod.rs поставляемый
Просмотреть файл

@ -1,52 +1,138 @@
#![allow(non_camel_case_types, unused)]
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::ffi::OsString;
use std::fs;
use std::io::BufReader;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::process::{self, Command};
use std::sync::Mutex;
use futures::Future;
use bincode;
use futures::{Future, future};
use tar;
use tokio_core;
use errors::*;
use mock_command::exit_status;
#[cfg(test)]
#[macro_use]
mod test;
// TODO: Clone by assuming immutable/no GC for now
// TODO: make fields non-public?
#[derive(Clone)]
#[derive(Serialize, Deserialize)]
pub struct Toolchain {
pub docker_img: String,
// TODO: this should be an ID the daemon server can request, not a path
pub archive: PathBuf,
}
// process::Output is not serialize
#[derive(Serialize, Deserialize)]
pub struct ProcessOutput {
code: Option<i32>, // TODO: extract the extra info from the UnixCommandExt
stdout: Vec<u8>,
stderr: Vec<u8>,
}
impl From<process::Output> for ProcessOutput {
fn from(o: process::Output) -> Self {
ProcessOutput { code: o.status.code(), stdout: o.stdout, stderr: o.stderr }
}
}
impl From<ProcessOutput> for process::Output {
fn from(o: ProcessOutput) -> Self {
// TODO: handle signals, i.e. None code
process::Output { status: exit_status(o.code.unwrap()), stdout: o.stdout, stderr: o.stderr }
}
}
#[derive(Hash, Eq, PartialEq)]
struct JobId(u64);
struct DaemonId(u64);
struct JobRequest;
struct JobResult;
const SCHEDULER_SERVERS_PORT: u16 = 10500;
const SCHEDULER_CLIENTS_PORT: u16 = 10501;
const SERVER_CLIENTS_PORT: u16 = 10502;
struct JobAllocRequest;
struct JobAllocResult;
// TODO: make these fields not public
struct AllocAssignment;
#[derive(Serialize, Deserialize)]
pub struct JobRequest {
pub executable: PathBuf,
pub arguments: Vec<OsString>,
// TODO: next two can't be sent across the wire like this if coming from Windows
pub cwd: PathBuf,
pub env_vars: Vec<(OsString, OsString)>,
pub toolchain: Toolchain,
}
#[derive(Serialize, Deserialize)]
pub struct JobResult {
pub output: ProcessOutput,
}
struct CompileRequest;
struct CompileResult;
#[derive(Serialize, Deserialize)]
pub struct JobAllocRequest {
pub toolchain: Toolchain,
}
#[derive(Serialize, Deserialize)]
pub struct JobAllocResult {
addr: SocketAddr,
}
struct BuildRequest;
struct BuildResult;
#[derive(Serialize, Deserialize)]
pub struct AllocAssignment;
trait Scheduler {
#[derive(Serialize, Deserialize)]
pub struct BuildRequest(JobRequest);
#[derive(Serialize, Deserialize)]
pub struct BuildResult {
output: ProcessOutput,
}
trait SchedulerHandler {
// From DaemonClient
fn allocation_request(&self, JobAllocRequest) -> Box<Future<Item=JobAllocResult, Error=()>>;
fn handle_allocation_request(&self, JobAllocRequest) -> SFuture<JobAllocResult>;
}
pub trait SchedulerRequester {
// To DaemonServer
fn do_allocation_assign(&self, usize, AllocAssignment) -> SFuture<()>;
}
trait DaemonClient {
// From Client
fn compile_request(&self, CompileRequest) -> Box<Future<Item=CompileResult, Error=()>>;
// This protocol is part of non-distributed sccache
trait DaemonClientHandler {
}
pub trait DaemonClientRequester: Send + Sync {
// To Scheduler
fn do_allocation_request(&self, JobAllocRequest) -> SFuture<JobAllocResult>;
// To DaemonServer
fn do_compile_request(&self, JobAllocResult, JobRequest) -> SFuture<JobResult>;
// TODO: Really want fnbox here
fn toolchain_cache(&self, create: &mut FnMut(fs::File)) -> PathBuf;
}
trait DaemonServer {
trait DaemonServerHandler {
// From Scheduler
fn allocation_assign(&self, AllocAssignment) -> Box<Future<Item=(), Error=()>>;
fn handle_allocation_assign(&self, AllocAssignment) -> SFuture<()>;
// From DaemonClient
fn compile_request(&self, JobRequest) -> Box<Future<Item=JobResult, Error=()>>;
fn handle_compile_request(&self, JobRequest) -> SFuture<JobResult>;
}
pub trait DaemonServerRequester {
}
trait Builder {
// TODO: this being public is asymmetric
pub trait BuilderHandler {
// From DaemonServer
fn compile_request(&self, BuildRequest) -> Box<Future<Item=BuildResult, Error=()>>;
fn handle_compile_request(&self, BuildRequest) -> SFuture<BuildResult>;
}
pub trait BuilderRequester {
}
enum JobStatus {
@ -60,70 +146,208 @@ enum JobStatus {
JobFailed(DaemonId, JobAllocRequest, JobAllocResult),
}
struct SccacheScheduler {
pub struct SccacheScheduler {
jobs: HashMap<JobId, JobStatus>,
// Acts as a ring buffer of most recently completed jobs
finished_jobs: VecDeque<JobStatus>,
servers: Vec<TcpStream>,
}
impl SccacheScheduler {
fn new(addr: SocketAddr) -> SccacheScheduler {
SccacheScheduler { jobs: HashMap::new(), finished_jobs: VecDeque::new() }
pub fn new() -> Self {
SccacheScheduler {
jobs: HashMap::new(),
finished_jobs: VecDeque::new(),
servers: vec![],
}
}
pub fn start(mut self) -> ! {
let mut core = tokio_core::reactor::Core::new().unwrap();
assert!(self.servers.is_empty());
{
let listener = TcpListener::bind(("127.0.0.1", SCHEDULER_SERVERS_PORT)).unwrap();
let conn = listener.accept().unwrap().0;
self.servers.push(conn);
assert!(self.servers.len() == 1);
}
loop {
let listener = TcpListener::bind(("127.0.0.1", SCHEDULER_CLIENTS_PORT)).unwrap();
let conn = listener.accept().unwrap().0;
core.run(future::lazy(|| {
let req = bincode::deserialize_from(&mut &conn, bincode::Infinite).unwrap();
self.handle_allocation_request(req).and_then(|res| {
f_ok(bincode::serialize_into(&mut &conn, &res, bincode::Infinite).unwrap())
})
})).unwrap()
}
}
}
impl Scheduler for SccacheScheduler {
fn allocation_request(&self, req: JobAllocRequest) -> Box<Future<Item=JobAllocResult, Error=()>> {
panic!()
impl SchedulerHandler for SccacheScheduler {
fn handle_allocation_request(&self, req: JobAllocRequest) -> SFuture<JobAllocResult> {
assert!(self.servers.len() == 1);
self.do_allocation_assign(0, AllocAssignment);
let ip_addr = self.servers[0].peer_addr().unwrap().ip();
f_ok(JobAllocResult { addr: SocketAddr::new(ip_addr, SERVER_CLIENTS_PORT) })
}
}
impl SchedulerRequester for SccacheScheduler {
fn do_allocation_assign(&self, server_id: usize, req: AllocAssignment) -> SFuture<()> {
f_ok(bincode::serialize_into(&mut &self.servers[0], &req, bincode::Infinite).unwrap())
}
}
struct SccacheDaemonClient;
// TODO: possibly shouldn't be public
pub struct SccacheDaemonClient {
cache_mutex: Mutex<()>,
}
impl SccacheDaemonClient {
fn new(addr: SocketAddr) -> SccacheDaemonClient {
SccacheDaemonClient
pub fn new() -> Self {
SccacheDaemonClient {
cache_mutex: Mutex::new(()),
}
}
}
impl DaemonClient for SccacheDaemonClient {
// From Client
fn compile_request(&self, req: CompileRequest) -> Box<Future<Item=CompileResult, Error=()>> {
panic!()
impl DaemonClientHandler for SccacheDaemonClient {
}
impl DaemonClientRequester for SccacheDaemonClient {
fn do_allocation_request(&self, req: JobAllocRequest) -> SFuture<JobAllocResult> {
Box::new(future::lazy(move || -> SFuture<JobAllocResult> {
let conn = TcpStream::connect(("127.0.0.1", SCHEDULER_CLIENTS_PORT)).unwrap();
bincode::serialize_into(&mut &conn, &req, bincode::Infinite).unwrap();
f_ok(bincode::deserialize_from(&mut &conn, bincode::Infinite).unwrap())
}))
}
fn do_compile_request(&self, ja_res: JobAllocResult, req: JobRequest) -> SFuture<JobResult> {
Box::new(future::lazy(move || -> SFuture<JobResult> {
let conn = TcpStream::connect(ja_res.addr).unwrap();
bincode::serialize_into(&mut &conn, &req, bincode::Infinite).unwrap();
f_ok(bincode::deserialize_from(&mut &conn, bincode::Infinite).unwrap())
}))
}
fn toolchain_cache(&self, create: &mut FnMut(fs::File)) -> PathBuf {
{
let _l = self.cache_mutex.lock();
if !Path::new("/tmp/sccache_rust_cache.tar").exists() {
let file = fs::OpenOptions::new()
.create_new(true)
.append(true)
.open("/tmp/sccache_rust_cache.tar");
match file {
Ok(f) => create(f),
Err(e) => panic!("{}", e),
}
}
}
"/tmp/sccache_rust_cache.tar".into()
}
}
struct SccacheDaemonServer;
pub struct SccacheDaemonServer {
builder: Box<BuilderHandler>,
sched_conn: TcpStream,
}
impl SccacheDaemonServer {
fn new<B: Builder>(addr: SocketAddr, builder: B) -> SccacheDaemonServer {
SccacheDaemonServer
pub fn new(builder: Box<BuilderHandler>) -> SccacheDaemonServer {
SccacheDaemonServer { builder, sched_conn: TcpStream::connect(("127.0.0.1", SCHEDULER_SERVERS_PORT)).unwrap() }
}
pub fn start(self) -> ! {
let mut core = tokio_core::reactor::Core::new().unwrap();
loop {
let req = bincode::deserialize_from(&mut &self.sched_conn, bincode::Infinite).unwrap();
let () = core.run(self.handle_allocation_assign(req)).unwrap();
let listener = TcpListener::bind(("127.0.0.1", SERVER_CLIENTS_PORT)).unwrap();
let conn = listener.accept().unwrap().0;
core.run(future::lazy(|| {
let req = bincode::deserialize_from(&mut &conn, bincode::Infinite).unwrap();
self.handle_compile_request(req).and_then(|res| {
f_ok(bincode::serialize_into(&mut &conn, &res, bincode::Infinite).unwrap())
})
})).unwrap()
}
}
}
impl DaemonServer for SccacheDaemonServer {
// From Scheduler
fn allocation_assign(&self, alloc: AllocAssignment) -> Box<Future<Item=(), Error=()>> {
panic!()
impl DaemonServerHandler for SccacheDaemonServer {
fn handle_allocation_assign(&self, alloc: AllocAssignment) -> SFuture<()> {
// TODO: track ID of incoming job so scheduler is kept up-do-date
f_ok(())
}
// From DaemonClient
fn compile_request(&self, req: JobRequest) -> Box<Future<Item=JobResult, Error=()>> {
panic!()
fn handle_compile_request(&self, req: JobRequest) -> SFuture<JobResult> {
Box::new(self.builder.handle_compile_request(BuildRequest(req)).map(|res| JobResult { output: res.output }))
}
}
impl DaemonServerRequester for SccacheDaemonServer {
}
struct SccacheBuilder;
pub struct SccacheBuilder;
impl SccacheBuilder {
fn new() -> SccacheBuilder {
pub fn new() -> SccacheBuilder {
SccacheBuilder
}
}
impl Builder for SccacheBuilder {
impl BuilderHandler for SccacheBuilder {
// From DaemonServer
fn compile_request(&self, req: BuildRequest) -> Box<Future<Item=BuildResult, Error=()>> {
panic!()
fn handle_compile_request(&self, req: BuildRequest) -> SFuture<BuildResult> {
let BuildRequest(job_req) = req;
let cache_dir = Path::new("/tmp/sccache_rust_cache");
if cache_dir.is_dir() {
fs::remove_dir_all("/tmp/sccache_rust_cache").unwrap();
}
let mut ar = tar::Archive::new(fs::File::open(job_req.toolchain.archive).unwrap());
ar.unpack("/tmp/sccache_rust_cache").unwrap();
let cid = {
// This odd construction is to ensure bash stays as the root process - without
// the &&, bash will just exec since it's the last command in the pipeline.
let cmd = "sleep infinity && true";
let args = &["run", "--rm", "-d", "-v", "/tmp/sccache_rust_cache:/toolchain", &job_req.toolchain.docker_img, "bash", "-c", cmd];
let output = Command::new("docker").args(args).output().unwrap();
assert!(output.status.success());
let stdout = String::from_utf8(output.stdout).unwrap();
stdout.trim().to_owned()
};
// TODO: dirname to make sure the source is copied onto the correct directory (otherwise it
// copies *under* the target directory), though this is still flawed if the dir exists before mkdir
let output = Command::new("docker").args(&["exec", &cid, "mkdir", "-p", job_req.cwd.parent().unwrap().to_str().unwrap()]).output().unwrap();
if !output.status.success() {
error!("===========\n{}\n==========\n\n\n\n=========\n{}\n===============\n\n\n",
String::from_utf8_lossy(&output.stdout), String::from_utf8_lossy(&output.stderr));
panic!()
}
let output = Command::new("docker").args(&["cp", job_req.cwd.to_str().unwrap(), &format!("{}:{}", cid, job_req.cwd.to_str().unwrap())]).output().unwrap();
if !output.status.success() {
error!("===========\n{}\n==========\n\n\n\n=========\n{}\n===============\n\n\n",
String::from_utf8_lossy(&output.stdout), String::from_utf8_lossy(&output.stderr));
panic!()
}
let cmdstr = format!("cd '{}' && exec '{}' \"$@\"", job_req.cwd.to_str().unwrap(), job_req.executable.to_str().unwrap());
info!("{:?}", job_req.env_vars);
info!("{:?}", cmdstr);
info!("{:?}", job_req.arguments);
let mut cmd = Command::new("docker");
cmd.arg("exec");
for (k, v) in job_req.env_vars {
let mut env = k;
env.push("=");
env.push(v);
cmd.arg("-e").arg(env);
}
cmd.args(&[&cid, "bash", "-c", &cmdstr, "sh"]).args(job_req.arguments);
let output = cmd.output().unwrap();
println!("output: {:?}", output);
f_ok(BuildResult { output: output.into() })
}
}
impl BuilderRequester for SccacheBuilder {
}

Просмотреть файл

@ -60,6 +60,7 @@ error_chain! {
display("didn't get a successful HTTP status, got `{}`", status)
}
ProcessError(output: process::Output)
DistError(msg: String) // TODO: make this not a string
}
}

Просмотреть файл

@ -70,6 +70,7 @@ extern crate serde_json;
#[macro_use]
extern crate serde_derive;
extern crate strip_ansi_escapes;
extern crate tar;
extern crate tempdir;
extern crate tempfile;
extern crate time;