Support custom toolchain overrides

This commit is contained in:
Aidan Hobson Sayers 2018-07-19 19:13:19 +01:00 коммит произвёл Ted Mielczarek
Родитель bf1ec382d7
Коммит 11fe7f81f4
8 изменённых файлов: 158 добавлений и 83 удалений

Просмотреть файл

@ -102,7 +102,7 @@ impl OverlayBuilder {
fs::create_dir(&toolchain_dir)?;
let mut tccache = tccache.lock().unwrap();
let toolchain_rdr = match tccache.get(&tc.archive_id) {
let toolchain_rdr = match tccache.get(tc) {
Ok(rdr) => rdr,
Err(LruError::FileNotInCache) => bail!("expected toolchain {}, but not available", tc.archive_id),
Err(e) => return Err(Error::with_chain(e, "failed to get toolchain from cache")),
@ -397,7 +397,7 @@ impl DockerBuilder {
// Good as new, add it back to the container list
trace!("Reclaimed container");
self.container_lists.lock().unwrap().get_mut(&tc).unwrap().push(cid);
self.container_lists.lock().unwrap().get_mut(tc).unwrap().push(cid);
}
fn make_image(tc: &Toolchain, tccache: &Mutex<TcCache>) -> String {
@ -409,7 +409,7 @@ impl DockerBuilder {
};
let mut tccache = tccache.lock().unwrap();
let toolchain_rdr = match tccache.get(&tc.archive_id) {
let toolchain_rdr = match tccache.get(tc) {
Ok(rdr) => rdr,
Err(LruError::FileNotInCache) => panic!("expected toolchain, but not available"),
Err(e) => panic!("{}", e),

Просмотреть файл

@ -296,7 +296,7 @@ impl Server {
impl ServerIncoming for Server {
type Error = Error;
fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
let need_toolchain = !self.cache.lock().unwrap().contains_key(&tc.archive_id);
let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc);
assert!(self.job_toolchains.lock().unwrap().insert(job_id, tc).is_none());
if !need_toolchain {
// TODO: can start prepping the container now
@ -313,10 +313,10 @@ impl ServerIncoming for Server {
};
let mut cache = self.cache.lock().unwrap();
// TODO: this returns before reading all the data, is that valid?
if cache.contains_key(&tc.archive_id) {
if cache.contains_toolchain(&tc) {
return Ok(SubmitToolchainResult::Success)
}
Ok(cache.insert_with(&tc.archive_id, |mut file| io::copy(&mut {tc_rdr}, &mut file).map(|_| ()))
Ok(cache.insert_with(&tc, |mut file| io::copy(&mut {tc_rdr}, &mut file).map(|_| ()))
.map(|_| SubmitToolchainResult::Success)
.unwrap_or(SubmitToolchainResult::CannotCache))
}

Просмотреть файл

@ -351,12 +351,13 @@ fn dist_or_local_compile<T>(dist_client: Arc<dist::Client>,
compilation.into_inputs_creator()
.map(|inputs_creator| (dist_compile_cmd, inputs_creator, output_paths))
})
.and_then(move |(dist_compile_cmd, inputs_creator, output_paths)| {
.and_then(move |(mut dist_compile_cmd, inputs_creator, output_paths)| {
debug!("[{}]: Distributed compile request created, requesting allocation", compile_out_pretty);
let toolchain_creator_cb = BoxFnOnce::from(move |f| toolchain_creator.write_pkg(f));
// TODO: put on a thread
let archive_id = ftry!(dist_client.put_toolchain_cache(&weak_toolchain_key, toolchain_creator_cb));
let dist_toolchain = dist::Toolchain { archive_id };
let (dist_toolchain, dist_compile_executable) =
ftry!(dist_client.put_toolchain(&dist_compile_cmd.executable, &weak_toolchain_key, toolchain_creator_cb));
dist_compile_cmd.executable = dist_compile_executable;
Box::new(dist_client.do_alloc_job(dist_toolchain.clone()).map_err(Into::into)
.and_then(move |jares| {
debug!("[{}]: Allocation successful, sending compile", compile_out_pretty);
@ -371,7 +372,7 @@ fn dist_or_local_compile<T>(dist_client: Arc<dist::Client>,
}
}).map_err(Into::into)),
dist::AllocJobResult::Success { job_alloc, need_toolchain: false } => f_ok(job_alloc),
dist::AllocJobResult::Fail { msg: _ } => panic!(),
dist::AllocJobResult::Fail { msg: _ } => panic!("failed to allocate"),
};
alloc
.and_then(move |job_alloc| {
@ -432,8 +433,7 @@ pub struct HashResult {
pub compilation: Box<Compilation + 'static>,
/// A weak key that may be used to identify the toolchain
pub weak_toolchain_key: String,
/// A function that may be called to save the toolchain to a file
// TODO: more correct to be a Box<FnOnce> or FnBox
/// A object that may be used to package the toolchain into a file
pub toolchain_creator: Box<CompilerPackager>,
}

Просмотреть файл

@ -180,6 +180,14 @@ impl CacheConfigs {
}
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize)]
pub struct CustomToolchain {
pub compiler_executable: String,
pub archive: PathBuf,
pub archive_compiler_executable: String,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize)]
#[serde(default)]
@ -187,6 +195,7 @@ impl CacheConfigs {
pub struct DistConfig {
pub scheduler_addr: Option<IpAddr>,
pub cache_dir: PathBuf,
pub custom_toolchains: Vec<CustomToolchain>,
pub toolchain_cache_size: u64,
}
@ -195,6 +204,7 @@ impl Default for DistConfig {
Self {
scheduler_addr: Default::default(),
cache_dir: default_dist_cache_dir(),
custom_toolchains: Default::default(),
toolchain_cache_size: default_toolchain_cache_size(),
}
}

179
src/dist/cache.rs поставляемый
Просмотреть файл

@ -1,4 +1,6 @@
use boxfnonce::BoxFnOnce;
use config;
use dist::Toolchain;
use lru_disk_cache::{LruDiskCache, ReadSeek};
use lru_disk_cache::Error as LruError;
use lru_disk_cache::Result as LruResult;
@ -15,23 +17,33 @@ use util;
use errors::*;
#[derive(Clone, Debug)]
pub struct CustomToolchain {
archive: PathBuf,
compiler_executable: String,
}
// TODO: possibly shouldn't be public
pub struct ClientToolchainCache {
pub struct ClientToolchains {
cache_dir: PathBuf,
cache: Mutex<TcCache>,
// Lookup from dist toolchain -> toolchain details
custom_toolchains: Mutex<HashMap<Toolchain, CustomToolchain>>,
// Lookup from local path -> toolchain details
custom_toolchain_paths: Mutex<HashMap<String, (CustomToolchain, Option<Toolchain>)>>,
// Local machine mapping from 'weak' hashes to strong toolchain hashes
// - Weak hashes are what sccache uses to determine if a compiler has changed
// on the local machine - they're fast and 'good enough' (assuming we trust
// the local machine), but not safe if other users can update the cache.
// - Strong hashes are the hash of the complete compiler contents that will
// be sent over the wire for use in distributed compilation - it is assumed
// - Strong hashes (or archive ids) are the hash of the complete compiler contents that
// will be sent over the wire for use in distributed compilation - it is assumed
// that if two of them match, the contents of a compiler archive cannot
// have been tampered with
weak_map: Mutex<HashMap<String, String>>,
}
impl ClientToolchainCache {
pub fn new(cache_dir: &Path, cache_size: u64) -> Self {
impl ClientToolchains {
pub fn new(cache_dir: &Path, cache_size: u64, config_custom_toolchains: &[config::CustomToolchain]) -> Self {
let cache_dir = cache_dir.to_owned();
fs::create_dir_all(&cache_dir).unwrap();
@ -50,9 +62,27 @@ impl ClientToolchainCache {
let tc_cache_dir = cache_dir.join("tc");
let cache = Mutex::new(TcCache::new(&tc_cache_dir, cache_size).unwrap());
let mut custom_toolchain_paths = HashMap::new();
for ct in config_custom_toolchains.into_iter() {
if custom_toolchain_paths.contains_key(&ct.compiler_executable) {
panic!("Multiple toolchains for {:?}", ct.compiler_executable)
}
let config::CustomToolchain { compiler_executable, archive, archive_compiler_executable } = ct;
debug!("Registering custom toolchain for {}", compiler_executable);
let custom_tc = CustomToolchain {
archive: archive.clone(),
compiler_executable: archive_compiler_executable.clone(),
};
assert!(custom_toolchain_paths.insert(compiler_executable.clone(), (custom_tc, None)).is_none());
}
let custom_toolchain_paths = Mutex::new(custom_toolchain_paths);
Self {
cache_dir,
cache,
custom_toolchains: Mutex::new(HashMap::new()),
custom_toolchain_paths,
// TODO: shouldn't clear on restart, but also should have some
// form of pruning
weak_map: Mutex::new(weak_map),
@ -60,22 +90,31 @@ impl ClientToolchainCache {
}
// Get the bytes of a toolchain tar
pub fn get_toolchain_cache(&self, key: &str) -> Option<Vec<u8>> {
let mut toolchain_reader = match self.cache.lock().unwrap().get(key) {
Ok(rdr) => rdr,
Err(LruError::FileNotInCache) => return None,
Err(e) => panic!("{}", e),
// TODO: by this point the toolchain should be known to exist
pub fn get_toolchain(&self, tc: &Toolchain) -> Option<Vec<u8>> {
let mut rdr = if let Some(custom_tc) = self.custom_toolchains.lock().unwrap().get(tc) {
Box::new(fs::File::open(&custom_tc.archive).unwrap())
} else {
match self.cache.lock().unwrap().get(tc) {
Ok(rdr) => rdr,
Err(LruError::FileNotInCache) => return None,
Err(e) => panic!("{}", e),
}
};
let mut ret = vec![];
toolchain_reader.read_to_end(&mut ret).unwrap();
rdr.read_to_end(&mut ret).unwrap();
Some(ret)
}
// TODO: It's more correct to have a FnBox or Box<FnOnce> here
// If the toolchain doesn't already exist, create it and insert into the cache
pub fn put_toolchain_cache(&self, weak_key: &str, create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<String> {
if let Some(strong_key) = self.weak_to_strong(weak_key) {
debug!("Using cached toolchain {} -> {}", weak_key, strong_key);
return Ok(strong_key)
pub fn put_toolchain(&self, compiler_path: &str, weak_key: &str, create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<(Toolchain, String)> {
if let Some(tc_and_compiler_path) = self.get_custom_toolchain(compiler_path) {
debug!("Using custom toolchain for {:?}", compiler_path);
return Ok(tc_and_compiler_path.unwrap())
}
if let Some(archive_id) = self.weak_to_strong(weak_key) {
debug!("Using cached toolchain {} -> {}", weak_key, archive_id);
return Ok((Toolchain { archive_id }, compiler_path.to_owned()))
}
debug!("Weak key {} appears to be new", weak_key);
// Only permit one toolchain creation at a time. Not an issue if there are multiple attempts
@ -83,9 +122,26 @@ impl ClientToolchainCache {
let mut cache = self.cache.lock().unwrap();
let tmpfile = tempfile::NamedTempFile::new_in(self.cache_dir.join("toolchain_tmp"))?;
create.call(tmpfile.reopen()?)?;
let strong_key = cache.insert_file(tmpfile.path())?;
self.record_weak(weak_key.to_owned(), strong_key.clone());
Ok(strong_key)
let tc = cache.insert_file(tmpfile.path())?;
self.record_weak(weak_key.to_owned(), tc.archive_id.clone());
Ok((tc, compiler_path.to_owned()))
}
fn get_custom_toolchain(&self, compiler_path: &str) -> Option<Result<(Toolchain, String)>> {
return match self.custom_toolchain_paths.lock().unwrap().get_mut(compiler_path) {
Some((custom_tc, Some(tc))) => Some(Ok((tc.clone(), custom_tc.compiler_executable.clone()))),
Some((custom_tc, maybe_tc @ None)) => {
let archive_id = match path_key(&custom_tc.archive) {
Ok(archive_id) => archive_id,
Err(e) => return Some(Err(e)),
};
let tc = Toolchain { archive_id };
*maybe_tc = Some(tc.clone());
assert!(self.custom_toolchains.lock().unwrap().insert(tc.clone(), custom_tc.clone()).is_none());
Some(Ok((tc, custom_tc.compiler_executable.clone())))
},
None => None,
}
}
fn weak_to_strong(&self, weak_key: &str) -> Option<String> {
@ -99,6 +155,49 @@ impl ClientToolchainCache {
}
}
pub struct TcCache {
inner: LruDiskCache,
}
impl TcCache {
pub fn new(cache_dir: &Path, cache_size: u64) -> Result<TcCache> {
trace!("Using TcCache({:?}, {})", cache_dir, cache_size);
Ok(TcCache { inner: LruDiskCache::new(cache_dir, cache_size)? })
}
pub fn contains_toolchain(&self, tc: &Toolchain) -> bool {
self.inner.contains_key(make_lru_key_path(&tc.archive_id))
}
pub fn insert_with<F: FnOnce(File) -> io::Result<()>>(&mut self, tc: &Toolchain, with: F) -> Result<()> {
self.inner.insert_with(make_lru_key_path(&tc.archive_id), with).map_err(|e| -> Error { e.into() })?;
let verified_archive_id = file_key(self.get(tc)?)?;
// TODO: remove created toolchain?
if verified_archive_id == tc.archive_id { Ok(()) } else { Err("written file does not match expected hash key".into()) }
}
pub fn get(&mut self, tc: &Toolchain) -> LruResult<Box<ReadSeek>> {
self.inner.get(make_lru_key_path(&tc.archive_id))
}
fn insert_file<P: AsRef<OsStr>>(&mut self, path: P) -> Result<Toolchain> {
let archive_id = path_key(&path)?;
self.inner.insert_file(make_lru_key_path(&archive_id), path).map_err(|e| -> Error { e.into() })?;
Ok(Toolchain { archive_id })
}
}
fn path_key<P: AsRef<OsStr>>(path: P) -> Result<String> {
file_key(File::open(path.as_ref())?)
}
fn file_key<RS: ReadSeek + 'static>(rs: RS) -> Result<String> {
hash_reader(rs)
}
/// Make a path to the cache entry with key `key`.
fn make_lru_key_path(key: &str) -> PathBuf {
Path::new(&key[0..1]).join(&key[1..2]).join(key)
}
// Partially copied from util.rs
fn hash_reader<R: Read + Send + 'static>(rdr: R) -> Result<String> {
let mut m = Context::new(&SHA512);
@ -113,47 +212,3 @@ fn hash_reader<R: Read + Send + 'static>(rdr: R) -> Result<String> {
}
Ok(util::hex(m.finish().as_ref()))
}
/// Make a path to the cache entry with key `key`.
fn make_key_path(key: &str) -> PathBuf {
Path::new(&key[0..1]).join(&key[1..2]).join(key)
}
pub struct TcCache {
inner: LruDiskCache,
}
impl TcCache {
pub fn new(cache_dir: &Path, cache_size: u64) -> Result<TcCache> {
trace!("Using TcCache({:?}, {})", cache_dir, cache_size);
Ok(TcCache { inner: LruDiskCache::new(cache_dir, cache_size)? })
}
pub fn contains_key(&self, key: &str) -> bool {
self.inner.contains_key(make_key_path(key))
}
fn file_key<RS: ReadSeek + 'static>(&self, rs: RS) -> Result<String> {
// TODO: should explicitly pick the hash
hash_reader(rs)
}
pub fn insert_with<F: FnOnce(File) -> io::Result<()>>(&mut self, key: &str, with: F) -> Result<()> {
self.inner.insert_with(make_key_path(key), with).map_err(|e| -> Error { e.into() })?;
let verified_key = self.get(key).map_err(Into::into)
.and_then(|rs| self.file_key(rs))?;
// TODO: remove created toolchain?
if verified_key == key { Ok(()) } else { Err("written file does not match expected hash key".into()) }
}
pub fn insert_file<P: AsRef<OsStr>>(&mut self, path: P) -> Result<String> {
let file = File::open(path.as_ref())?;
let key = self.file_key(file)?;
self.inner.insert_file(make_key_path(&key), path).map_err(|e| -> Error { e.into() })?;
Ok(key)
}
pub fn get(&mut self, key: &str) -> LruResult<Box<ReadSeek>> {
self.inner.get(make_key_path(key))
}
}

13
src/dist/http.rs поставляемый
Просмотреть файл

@ -15,6 +15,7 @@
use bincode;
use boxfnonce::BoxFnOnce;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use config;
use futures::{Future, Stream};
use num_cpus;
use reqwest;
@ -402,15 +403,15 @@ impl ServerOutgoing for ServerRequester {
pub struct Client {
scheduler_addr: SocketAddr,
client: reqwest::unstable::async::Client,
tc_cache: cache::ClientToolchainCache,
tc_cache: cache::ClientToolchains,
}
impl Client {
pub fn new(handle: &tokio_core::reactor::Handle, scheduler_addr: IpAddr, cache_dir: &Path, cache_size: u64) -> Self {
pub fn new(handle: &tokio_core::reactor::Handle, scheduler_addr: IpAddr, cache_dir: &Path, cache_size: u64, custom_toolchains: &[config::CustomToolchain]) -> Self {
Self {
scheduler_addr: Cfg::scheduler_connect_addr(scheduler_addr),
client: reqwest::unstable::async::Client::new(handle),
tc_cache: cache::ClientToolchainCache::new(cache_dir, cache_size),
tc_cache: cache::ClientToolchains::new(cache_dir, cache_size, custom_toolchains),
}
}
}
@ -422,7 +423,7 @@ impl super::Client for Client {
}
fn do_submit_toolchain(&self, job_alloc: JobAlloc, tc: Toolchain) -> SFuture<SubmitToolchainResult> {
let url = format!("http://{}/api/v1/distserver/submit_toolchain/{}", job_alloc.server_id.addr(), job_alloc.job_id);
if let Some(toolchain_bytes) = self.tc_cache.get_toolchain_cache(&tc.archive_id) {
if let Some(toolchain_bytes) = self.tc_cache.get_toolchain(&tc) {
bincode_req_fut(self.client.post(&url).bytes(toolchain_bytes))
} else {
f_err("couldn't find toolchain locally")
@ -444,8 +445,8 @@ impl super::Client for Client {
bincode_req_fut(self.client.post(&url).bytes(body))
}
fn put_toolchain_cache(&self, weak_key: &str, create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<String> {
self.tc_cache.put_toolchain_cache(weak_key, create)
fn put_toolchain(&self, compiler_path: &str, weak_key: &str, create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<(Toolchain, String)> {
self.tc_cache.put_toolchain(compiler_path, weak_key, create)
}
fn may_dist(&self) -> bool {
true

4
src/dist/mod.rs поставляемый
Просмотреть файл

@ -277,7 +277,7 @@ pub trait Client {
// TODO: ideally Box<FnOnce or FnBox
// BoxFnOnce library doesn't work due to incorrect lifetime inference - https://github.com/rust-lang/rust/issues/28796#issuecomment-410071058
fn do_run_job(&self, job_alloc: JobAlloc, command: CompileCommand, outputs: Vec<PathBuf>, write_inputs: Box<FnMut(&mut Write)>) -> SFuture<RunJobResult>;
fn put_toolchain_cache(&self, weak_key: &str, create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<String>;
fn put_toolchain(&self, compiler_path: &str, weak_key: &str, create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<(Toolchain, String)>;
fn may_dist(&self) -> bool;
}
@ -296,7 +296,7 @@ impl Client for NoopClient {
panic!("NoopClient");
}
fn put_toolchain_cache(&self, _weak_key: &str, _create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<String> {
fn put_toolchain(&self, _compiler_path: &str, _weak_key: &str, _create: BoxFnOnce<(fs::File,), io::Result<()>>) -> Result<(Toolchain, String)> {
bail!("NoopClient");
}
fn may_dist(&self) -> bool {

Просмотреть файл

@ -138,7 +138,16 @@ pub fn start_server(port: u16) -> Result<()> {
let pool = CpuPool::new(20);
let dist_client: Arc<dist::Client> = match CONFIG.dist.scheduler_addr {
#[cfg(feature = "dist")]
Some(addr) => Arc::new(dist::http::Client::new(&core.handle(), addr, &CONFIG.dist.cache_dir.join("client"), CONFIG.dist.toolchain_cache_size)),
Some(addr) => {
info!("Enabling distributed sccache to {}", addr);
Arc::new(dist::http::Client::new(
&core.handle(),
addr,
&CONFIG.dist.cache_dir.join("client"),
CONFIG.dist.toolchain_cache_size,
&CONFIG.dist.custom_toolchains,
))
},
#[cfg(not(feature = "dist"))]
Some(_) => {
warn!("Scheduler address configured but dist feature disabled, disabling distributed sccache");