No need for Arcs since the serving threads are 'scoped'

This commit is contained in:
Aidan Hobson Sayers 2018-07-13 21:58:08 +01:00 коммит произвёл Ted Mielczarek
Родитель 3cda701671
Коммит 82d41f6058
3 изменённых файлов: 35 добавлений и 32 удалений

51
src/dist/build.rs поставляемый
Просмотреть файл

@ -18,17 +18,12 @@ use std::collections::HashMap;
use std::io;
use std::path::{Path, PathBuf};
use std::process::{Command, Output, Stdio};
use std::sync::{Arc, Mutex};
use std::sync::{Mutex};
use super::{CompileCommand, InputsReader, Toolchain};
use super::{BuildResult, BuilderIncoming};
use errors::*;
pub struct DockerBuilder {
image_map: Arc<Mutex<HashMap<Toolchain, String>>>,
container_lists: Arc<Mutex<HashMap<Toolchain, Vec<String>>>>,
}
fn check_output(output: &Output) {
if !output.status.success() {
error!("===========\n{}\n==========\n\n\n\n=========\n{}\n===============\n\n\n",
@ -37,18 +32,27 @@ fn check_output(output: &Output) {
}
}
pub struct DockerBuilder {
image_map: Mutex<HashMap<Toolchain, String>>,
container_lists: Mutex<HashMap<Toolchain, Vec<String>>>,
}
impl DockerBuilder {
// TODO: this should accept a unique string, e.g. inode of the tccache directory
// having locked a pidfile, or at minimum should loudly detect other running
// instances - pidfile in /tmp
pub fn new() -> Self {
Self::cleanup();
Self {
image_map: Arc::new(Mutex::new(HashMap::new())),
container_lists: Arc::new(Mutex::new(HashMap::new())),
}
let ret = Self {
image_map: Mutex::new(HashMap::new()),
container_lists: Mutex::new(HashMap::new()),
};
ret.cleanup();
ret
}
// TODO: this should really reclaim, and should check in the image map and container lists, so
// that when things are removed from there it becomes a form of GC
fn cleanup() {
fn cleanup(&self) {
info!("Performing initial Docker cleanup");
let containers = {
@ -103,9 +107,9 @@ impl DockerBuilder {
// If we have a spare running container, claim it and remove it from the available list,
// otherwise try and create a new container (possibly creating the Docker image along
// the way)
fn get_container(image_map: &Mutex<HashMap<Toolchain, String>>, container_lists: &Mutex<HashMap<Toolchain, Vec<String>>>, tc: &Toolchain, cache: Arc<Mutex<TcCache>>) -> String {
fn get_container(&self, tc: &Toolchain, cache: &Mutex<TcCache>) -> String {
let container = {
let mut map = container_lists.lock().unwrap();
let mut map = self.container_lists.lock().unwrap();
map.entry(tc.clone()).or_insert_with(Vec::new).pop()
};
match container {
@ -114,7 +118,7 @@ impl DockerBuilder {
// TODO: can improve parallelism (of creating multiple images at a time) by using another
// (more fine-grained) mutex around the entry value and checking if its empty a second time
let image = {
let mut map = image_map.lock().unwrap();
let mut map = self.image_map.lock().unwrap();
map.entry(tc.clone()).or_insert_with(|| {
info!("Creating Docker image for {:?} (may block requests)", tc);
Self::make_image(tc, cache)
@ -125,7 +129,9 @@ impl DockerBuilder {
}
}
fn finish_container(container_lists: &Mutex<HashMap<Toolchain, Vec<String>>>, tc: &Toolchain, cid: String) {
fn finish_container(&self, tc: &Toolchain, cid: String) {
// TODO: collect images
// Clean up any running processes
let output = Command::new("docker").args(&["exec", &cid, "/busybox", "kill", "-9", "-1"]).output().unwrap();
check_output(&output);
@ -184,10 +190,10 @@ impl DockerBuilder {
// Good as new, add it back to the container list
trace!("Reclaimed container");
container_lists.lock().unwrap().get_mut(&tc).unwrap().push(cid);
self.container_lists.lock().unwrap().get_mut(&tc).unwrap().push(cid);
}
fn make_image(tc: &Toolchain, cache: Arc<Mutex<TcCache>>) -> String {
fn make_image(tc: &Toolchain, cache: &Mutex<TcCache>) -> String {
let cid = {
let output = Command::new("docker").args(&["create", &tc.docker_img, "/busybox", "true"]).output().unwrap();
check_output(&output);
@ -289,16 +295,13 @@ impl DockerBuilder {
impl BuilderIncoming for DockerBuilder {
// From Server
fn run_build(&self, tc: Toolchain, command: CompileCommand, outputs: Vec<String>, inputs_rdr: InputsReader, cache: Arc<Mutex<TcCache>>) -> Result<BuildResult> {
let image_map = self.image_map.clone();
let container_lists = self.container_lists.clone();
fn run_build(&self, tc: Toolchain, command: CompileCommand, outputs: Vec<String>, inputs_rdr: InputsReader, cache: &Mutex<TcCache>) -> Result<BuildResult> {
debug!("Finding container");
let cid = Self::get_container(&image_map, &container_lists, &tc, cache);
let cid = self.get_container(&tc, cache);
debug!("Performing build with container {}", cid);
let res = Self::perform_build(command, inputs_rdr, outputs, &cid);
debug!("Finishing with container {}", cid);
Self::finish_container(&container_lists, &tc, cid);
self.finish_container(&tc, cid);
debug!("Returning result");
Ok(res)
}

14
src/dist/mod.rs поставляемый
Просмотреть файл

@ -24,7 +24,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::process;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Mutex};
use std::time::Instant;
use errors::*;
@ -242,7 +242,7 @@ pub trait ServerIncoming: Send + Sync {
pub trait BuilderIncoming: Send + Sync {
// From Server
fn run_build(&self, toolchain: Toolchain, command: CompileCommand, outputs: Vec<String>, inputs_rdr: InputsReader, cache: Arc<Mutex<TcCache>>) -> Result<BuildResult>;
fn run_build(&self, toolchain: Toolchain, command: CompileCommand, outputs: Vec<String>, inputs_rdr: InputsReader, cache: &Mutex<TcCache>) -> Result<BuildResult>;
}
/////////
@ -297,7 +297,7 @@ pub struct Scheduler {
// Acts as a ring buffer of most recently completed jobs
finished_jobs: VecDeque<JobStatus>,
servers: Arc<Mutex<HashMap<ServerId, ServerDetails>>>,
servers: Mutex<HashMap<ServerId, ServerDetails>>,
}
struct ServerDetails {
@ -312,7 +312,7 @@ impl Scheduler {
job_count: Mutex::new(0),
//jobs: HashMap::new(),
finished_jobs: VecDeque::new(),
servers: Arc::new(Mutex::new(HashMap::new())),
servers: Mutex::new(HashMap::new()),
}
}
}
@ -373,7 +373,7 @@ impl SchedulerIncoming for Scheduler {
pub struct Server {
builder: Box<BuilderIncoming>,
cache: Arc<Mutex<TcCache>>,
cache: Mutex<TcCache>,
job_toolchains: Mutex<HashMap<JobId, Toolchain>>,
}
@ -381,7 +381,7 @@ impl Server {
pub fn new(builder: Box<BuilderIncoming>) -> Server {
Server {
builder,
cache: Arc::new(Mutex::new(TcCache::new(&CONFIG.dist.cache_dir.join("server")).unwrap())),
cache: Mutex::new(TcCache::new(&CONFIG.dist.cache_dir.join("server")).unwrap()),
job_toolchains: Mutex::new(HashMap::new()),
}
}
@ -418,7 +418,7 @@ impl ServerIncoming for Server {
Some(tc) => tc,
None => return Ok(RunJobResult::JobNotFound),
};
let res = self.builder.run_build(tc, command, outputs, inputs_rdr, self.cache.clone()).unwrap();
let res = self.builder.run_build(tc, command, outputs, inputs_rdr, &self.cache).unwrap();
requester.do_update_job_status(job_id, JobStatus::Complete).unwrap();
Ok(RunJobResult::Complete(JobComplete { output: res.output, outputs: res.outputs }))
}

2
src/dist/test/system.rs поставляемый
Просмотреть файл

@ -131,7 +131,7 @@ impl Drop for DistSystem {
struct FailingBuilder;
impl dist::BuilderIncoming for FailingBuilder {
fn run_build(&self, _toolchain: Toolchain, _command: CompileCommand, _outputs: Vec<String>, _inputs_rdr: InputsReader, _cache: Arc<Mutex<TcCache>>) -> Result<BuildResult> {
fn run_build(&self, _toolchain: Toolchain, _command: CompileCommand, _outputs: Vec<String>, _inputs_rdr: InputsReader, _cache: &Mutex<TcCache>) -> Result<BuildResult> {
Err("FailingBuilder".into())
}
}