diff --git a/docs/diagrams/download-build.png b/docs/diagrams/download-build.png deleted file mode 100644 index 75f270e..0000000 Binary files a/docs/diagrams/download-build.png and /dev/null differ diff --git a/docs/diagrams/handshake-failure.png b/docs/diagrams/handshake-failure.png deleted file mode 100644 index 9de76cf..0000000 Binary files a/docs/diagrams/handshake-failure.png and /dev/null differ diff --git a/docs/diagrams/handshake.png b/docs/diagrams/handshake.png deleted file mode 100644 index 965a0c9..0000000 Binary files a/docs/diagrams/handshake.png and /dev/null differ diff --git a/docs/diagrams/send-prefs.png b/docs/diagrams/send-prefs.png deleted file mode 100644 index 8ce42fc..0000000 Binary files a/docs/diagrams/send-prefs.png and /dev/null differ diff --git a/docs/diagrams/send-profile-empty.png b/docs/diagrams/send-profile-empty.png deleted file mode 100644 index cdd60a9..0000000 Binary files a/docs/diagrams/send-profile-empty.png and /dev/null differ diff --git a/docs/diagrams/send-profile.png b/docs/diagrams/send-profile.png deleted file mode 100644 index 28626f6..0000000 Binary files a/docs/diagrams/send-profile.png and /dev/null differ diff --git a/docs/diagrams/wait-for-idle.png b/docs/diagrams/wait-for-idle.png deleted file mode 100644 index 88a485c..0000000 Binary files a/docs/diagrams/wait-for-idle.png and /dev/null differ diff --git a/docs/protocol.md b/docs/protocol.md deleted file mode 100644 index 9e94fdc..0000000 --- a/docs/protocol.md +++ /dev/null @@ -1,99 +0,0 @@ -# fxrecord protocol - -The fxrecord protocol is broken up into a number of sections: - -1. Handshake -2. DownloadBuild -3. SendProfile -4. SendPrefs -5. WaitForIdle - -## Message Format - -Messages are encoded as JSON blobs (via Serde). Each message is prefixed with -a 4-byte length. - -Example: - -``` -00 00 00 1F # Length of Message (31) -{"Handshake":{"restart":false}} -``` - -In replies, it is common for the recorder to send a `Result` back. If the -result is `Ok`, then this indicates that the corresponding request was -successful. However, if an `Err` is returned to the recorder, then a fatal -error has occurred and the protocol cannot continue. At this point, the -recorder and runner will disconnect from eachother. - -An example of protocol failure can be seen below in Figure 2. - -## 1. Handshake - -The protocol is initiated by the recorder connecting to the runner over TCP. -The recorder will send a `Handshake` message to the runner, indicating that -it should restart. The runner replies with a `HandshakeReply` with the status -of the restart operation. They then disconnect and the recorder waits for the -runner to restart. - -> ![](/docs/diagrams/handshake.png) -> -> Figure 1: Handshake - -If something goes wrong with the handshake on the runner's end (such as a -failure with the Windows API when attempting to restart), it will instead -reply with an error message inside its `HandshakeReply`: - -> ![](/docs/diagrams/handshake-failure.png) -> -> Figure 2: Handshake Failure - -If the recorder requested a restart, it will then attempt to reconnect to the -runner with exponential backoff and handshake again, this time not requesting -a restart - -## 2. DownloadBuild - -After reconnecting, the next message from the recorder will be for the runner -to download a specific build of Firefox from Taskcluster. - -> ![](/docs/diagrams/download-build.png) -> -> Figure 3: Download Build - -## 3. SendProfile - -After fxrunner has downloaded a build, fxrecorder can optionally send a -zipped profile for it to use when running Firefox. If it does, it will send a -`SendProfile` message with the given profile size. It will then drop the -protocol down to a raw TCP connection and transfer the profile as raw bytes. -The runner will receive these bytes and write them to disk, then extract the -profile. - -> ![](/docs/diagrams/send-profile.png) -> -> Figure 4: Send Profile - -However, if no preset profile is to be used, an empty `SendProfile` message is -sent and fxrunner will have Firefox generate a new profile on start. - -> ![](/docs/diagrams/send-profile-empty.png) -> -> Figure 5: Send Profile (Empty Profile Case) - -## 4. SendPrefs - -Next, the fxrecorder may send a list of prefs that fxrunner should use when -running Firefox. If provided, they will be written to the `prefs.js` in the -profile directory from the `SendProfile` phase. If no profile was transferred -in that phase, a new profile directory will be created containing `prefs.js`. - -> ![](/docs/diagrams/send-prefs.png) -> -> Figure 6: Send Prefs - -## 5. WaitForIdle - -> ![](/docs/diagrams/wait-for-idle.png) -> -> Figure 7: Wait for Idle diff --git a/fxrecorder/src/bin/main.rs b/fxrecorder/src/bin/main.rs index 2570df8..defd84c 100644 --- a/fxrecorder/src/bin/main.rs +++ b/fxrecorder/src/bin/main.rs @@ -72,9 +72,13 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(), let mut proto = RecorderProto::new(log.clone(), stream); - proto.handshake(true).await?; + proto + .send_new_request(&task_id, profile_path.as_ref().map(PathBuf::as_path), prefs) + .await?; } + info!(log, "Disconnected from runner. Waiting to reconnect..."); + { let reconnect = || { info!(log, "Attempting re-connection to runner..."); @@ -97,13 +101,7 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(), let mut proto = RecorderProto::new(log, stream); - proto.handshake(false).await?; - proto.download_build(&task_id).await?; - proto - .send_profile(profile_path.as_ref().map(PathBuf::as_path)) - .await?; - proto.send_prefs(prefs).await?; - proto.wait_for_idle().await?; + proto.send_resume_request().await?; } Ok(()) diff --git a/fxrecorder/src/lib/proto.rs b/fxrecorder/src/lib/proto.rs index d799fb3..92e4313 100644 --- a/fxrecorder/src/lib/proto.rs +++ b/fxrecorder/src/lib/proto.rs @@ -21,6 +21,7 @@ pub struct RecorderProto { } impl RecorderProto { + /// Create a new RecorderProto. pub fn new(log: Logger, stream: TcpStream) -> RecorderProto { Self { inner: Some(Proto::new(stream)), @@ -28,11 +29,174 @@ impl RecorderProto { } } - /// Consume the RecorderProto and return the underlying `Proto`. - pub fn into_inner( - self, - ) -> Proto { - self.inner.unwrap() + /// Send a new request to the runner. + pub async fn send_new_request( + &mut self, + task_id: &str, + profile_path: Option<&Path>, + prefs: Vec<(String, PrefValue)>, + ) -> Result<(), RecorderProtoError> { + info!(self.log, "Sending request"); + + let profile_size = match profile_path { + None => None, + Some(profile_path) => Some(tokio::fs::metadata(profile_path).await?.len()), + }; + + self.send::( + NewRequest { + build_task_id: task_id.into(), + profile_size, + prefs, + } + .into(), + ) + .await?; + + loop { + let DownloadBuild { result } = self.recv().await?; + + match result { + Ok(DownloadStatus::Downloading) => { + info!(self.log, "Downloading build ..."); + } + + Ok(DownloadStatus::Downloaded) => { + info!(self.log, "Build download complete; extracting build ..."); + } + + Ok(DownloadStatus::Extracted) => { + info!(self.log, "Build extracted"); + break; + } + + Err(e) => { + error!(self.log, "Build download failed"; "task_id" => task_id, "error" => ?e); + return Err(e.into()); + } + } + } + + if let Some(profile_path) = profile_path { + self.send_profile(profile_path, profile_size.unwrap()) + .await? + } else { + info!(self.log, "No profile to send"); + } + + if let WritePrefs { result: Err(e) } = self.recv().await? { + error!(self.log, "Runner could not write prefs"; "error" => ?e); + return Err(e.into()); + } + + if let Restarting { result: Err(e) } = self.recv().await? { + error!(self.log, "Runner could not restart"; "error" => ?e); + return Err(e.into()); + } + + info!(self.log, "Runner is restarting..."); + + Ok(()) + } + + /// Send a resume request to the runner. + pub async fn send_resume_request(&mut self) -> Result<(), RecorderProtoError> { + info!(self.log, "Resuming request"); + self.send::(ResumeRequest {}.into()).await?; + + if let ResumeResponse { result: Err(e) } = self.recv().await? { + error!(self.log, "Could not resume request with runner"; "error" => ?e); + return Err(e.into()); + } + + info!(self.log, "Waiting for runner to become idle..."); + + if let WaitForIdle { result: Err(e) } = self.recv().await? { + error!(self.log, "Runner could not become idle"; "error" => ?e); + return Err(e.into()); + } + + info!(self.log, "Runner became idle"); + + Ok(()) + } + + /// Send the profile at the given path to the runner. + async fn send_profile( + &mut self, + profile_path: &Path, + profile_size: u64, + ) -> Result<(), RecorderProtoError> { + let RecvProfile { result } = self.recv().await?; + + match result? { + DownloadStatus::Downloading => { + info!(self.log, "Sending profile"; "profile_size" => profile_size); + } + + unexpected => { + return Err(RecorderProtoError::RecvProfileMismatch { + received: unexpected, + expected: DownloadStatus::Downloading, + } + .into()) + } + } + + let mut stream = self.inner.take().unwrap().into_inner(); + let result = RecorderProto::send_profile_impl(&mut stream, profile_path).await; + self.inner = Some(Proto::new(stream)); + + result?; + + let mut state = DownloadStatus::Downloading; + loop { + let next_state = self.recv::().await?.result?; + + assert_ne!(state, DownloadStatus::Extracted); + let expected = state.next().unwrap(); + + if expected != next_state { + return Err(RecorderProtoError::RecvProfileMismatch { + received: next_state, + expected: expected, + } + .into()); + } + + state = next_state; + + match state { + // This would be caught above because this is never an expected state. + DownloadStatus::Downloading => unreachable!(), + + DownloadStatus::Downloaded => { + info!(self.log, "Profile sent; extracting..."); + } + + DownloadStatus::Extracted => { + info!(self.log, "Profile extracted"); + break; + } + } + } + + assert!(state == DownloadStatus::Extracted); + + Ok(()) + } + + /// Write the raw bytes from the profile to the runner. + async fn send_profile_impl( + stream: &mut TcpStream, + profile_path: &Path, + ) -> Result<(), RecorderProtoError> { + let mut f = File::open(profile_path).await?; + + tokio::io::copy(&mut f, stream) + .await + .map_err(Into::into) + .map(drop) } /// Send the given message to the recorder. @@ -54,209 +218,6 @@ impl RecorderProto { { self.inner.as_mut().unwrap().recv::().await } - - /// Handshake with FxRunner. - pub async fn handshake(&mut self, restart: bool) -> Result<(), RecorderProtoError> { - info!(self.log, "Handshaking ..."); - self.send(Handshake { restart }).await?; - let HandshakeReply { result } = self.recv().await?; - - match result { - Ok(..) => { - info!(self.log, "Handshake complete"); - Ok(()) - } - Err(e) => { - info!(self.log, "Handshake failed: runner could not restart"; "error" => ?e); - Err(e.into()) - } - } - } - - pub async fn download_build(&mut self, task_id: &str) -> Result<(), RecorderProtoError> { - info!(self.log, "Requesting download of build from task"; "task_id" => task_id); - self.send(DownloadBuild { - task_id: task_id.into(), - }) - .await?; - - loop { - let DownloadBuildReply { result } = self.recv().await?; - - match result { - Ok(DownloadStatus::Downloading) => { - info!(self.log, "Downloading build ..."); - } - - Ok(DownloadStatus::Downloaded) => { - info!(self.log, "Build download complete; extracting build ..."); - } - - Ok(DownloadStatus::Extracted) => { - info!(self.log, "Build extracted"); - return Ok(()); - } - - Err(e) => { - error!(self.log, "Build download failed"; "task_id" => task_id, "error" => ?e); - return Err(e.into()); - } - } - } - } - - /// Send the profile at the given path to the runner. - /// - /// If the profile path is specified, the profile must exist, or this function will panic. - pub async fn send_profile( - &mut self, - profile_path: Option<&Path>, - ) -> Result<(), RecorderProtoError> { - let profile_path = match profile_path { - Some(profile_path) => profile_path, - - None => { - info!(self.log, "No profile to send"); - - self.send(SendProfile { profile_size: None }).await?; - let SendProfileReply { result } = self.recv().await?; - - return match result? { - Some(unexpected) => Err(RecorderProtoError::SendProfileMismatch { - expected: None, - received: Some(unexpected), - } - .into()), - - None => Ok(()), - }; - } - }; - - assert!(profile_path.exists()); - let profile_size = tokio::fs::metadata(profile_path).await?.len(); - - self.send(SendProfile { - profile_size: Some(profile_size), - }) - .await?; - - let SendProfileReply { result } = self.recv().await?; - - match result? { - Some(DownloadStatus::Downloading) => { - info!(self.log, "Sending profile"; "profile_size" => profile_size); - } - - unexpected => { - return Err(RecorderProtoError::SendProfileMismatch { - received: unexpected, - expected: Some(DownloadStatus::Downloading), - } - .into()) - } - } - - let mut stream = self.inner.take().unwrap().into_inner(); - let result = RecorderProto::send_profile_impl(&mut stream, profile_path).await; - self.inner = Some(Proto::new(stream)); - - result?; - - let mut state = DownloadStatus::Downloading; - loop { - let SendProfileReply { result } = self.recv().await?; - - match result? { - Some(next_state) => { - assert_ne!(state, DownloadStatus::Extracted); - let expected = state.next().unwrap(); - - if expected != next_state { - return Err(RecorderProtoError::SendProfileMismatch { - received: Some(next_state), - expected: Some(expected), - } - .into()); - } - - state = next_state; - - match state { - // This would be caught above because this is never an expected state. - DownloadStatus::Downloading => unreachable!(), - - DownloadStatus::Downloaded => { - info!(self.log, "Profile sent; extracting..."); - } - - DownloadStatus::Extracted => { - info!(self.log, "Profile extracted"); - break; - } - } - } - - None => { - return Err(RecorderProtoError::SendProfileMismatch { - received: None, - expected: state.next(), - } - .into()) - } - } - } - - assert!(state == DownloadStatus::Extracted); - - Ok(()) - } - - async fn send_profile_impl( - stream: &mut TcpStream, - profile_path: &Path, - ) -> Result<(), RecorderProtoError> { - let mut f = File::open(profile_path).await?; - - tokio::io::copy(&mut f, stream) - .await - .map_err(Into::into) - .map(drop) - } - - /// Send the preferences that the runner should use. - pub async fn send_prefs( - &mut self, - prefs: Vec<(String, PrefValue)>, - ) -> Result<(), RecorderProtoError> { - info!(self.log, "Sending prefs ..."); - self.send(SendPrefs { prefs }).await?; - let SendPrefsReply { result } = self.recv().await?; - - if let Err(e) = result { - error!(self.log, "Could not send prefs"; "error" => ?e); - return Err(e.into()); - } - - info!(self.log, "Prefs sent"); - - Ok(()) - } - - pub async fn wait_for_idle(&mut self) -> Result<(), RecorderProtoError> { - info!(self.log, "Waiting for runner to become idle..."); - self.send(WaitForIdle).await?; - - let WaitForIdleReply { result } = self.recv().await?; - - if let Err(e) = result { - error!(self.log, "Runner did not go idle"; "error" => %e); - Err(e.into()) - } else { - info!(self.log, "Runner is now idle"); - Ok(()) - } - } } #[derive(Debug, Display)] @@ -264,13 +225,13 @@ pub enum RecorderProtoError { Proto(ProtoError), #[display( - fmt = "Expected a download status of `{:?}', but received `{:?}' instead", + fmt = "Expected a download status of `{}', but received `{}' instead", expected, received )] - SendProfileMismatch { - expected: Option, - received: Option, + RecvProfileMismatch { + expected: DownloadStatus, + received: DownloadStatus, }, } @@ -278,7 +239,7 @@ impl Error for RecorderProtoError { fn source(&self) -> Option<&(dyn Error + 'static)> { match self { RecorderProtoError::Proto(ref e) => Some(e), - RecorderProtoError::SendProfileMismatch { .. } => None, + RecorderProtoError::RecvProfileMismatch { .. } => None, } } } diff --git a/fxrunner/src/bin/main.rs b/fxrunner/src/bin/main.rs index 9f59619..42e2ec1 100644 --- a/fxrunner/src/bin/main.rs +++ b/fxrunner/src/bin/main.rs @@ -13,8 +13,6 @@ use libfxrunner::proto::RunnerProto; use libfxrunner::taskcluster::Taskcluster; use slog::{info, Logger}; use structopt::StructOpt; -use tempfile::TempDir; -use tokio::fs::create_dir_all; use tokio::net::TcpListener; use tokio::time::delay_for; @@ -79,31 +77,9 @@ async fn fxrunner(log: Logger, options: Options, config: Config) -> Result<(), B WindowsPerfProvider::default(), ); - if proto.handshake_reply().await? { + if proto.handle_request().await? { break; } - // We download everything into a temporary directory that will be - // cleaned up after the connection closes. - let download_dir = TempDir::new()?; - let firefox_bin = proto.download_build_reply(download_dir.path()).await?; - assert!(firefox_bin.is_file()); - - let profile_path = match proto.send_profile_reply(download_dir.path()).await? { - Some(profile_path) => profile_path, - None => { - let profile_path = download_dir.path().join("profile"); - info!(log, "Creating new empty profile"); - create_dir_all(&profile_path).await?; - profile_path - } - }; - assert!(profile_path.is_dir()); - - proto - .send_prefs_reply(&profile_path.join("user.js")) - .await?; - - proto.wait_for_idle_reply().await?; info!(log, "Client disconnected"); } diff --git a/fxrunner/src/lib/proto.rs b/fxrunner/src/lib/proto.rs index 735b24b..47ad8cb 100644 --- a/fxrunner/src/lib/proto.rs +++ b/fxrunner/src/lib/proto.rs @@ -11,7 +11,8 @@ use libfxrecord::error::ErrorExt; use libfxrecord::net::*; use libfxrecord::prefs::write_prefs; use slog::{error, info, Logger}; -use tokio::fs::{File, OpenOptions}; +use tempfile::TempDir; +use tokio::fs::{create_dir_all, File, OpenOptions}; use tokio::net::TcpStream; use tokio::prelude::*; use tokio::task::spawn_blocking; @@ -50,11 +51,282 @@ where } } - /// Consume the RunnerProto and return the underlying `Proto`. - pub fn into_inner( - self, - ) -> Proto { - self.inner.unwrap() + /// Handle a request from the recorder. + pub async fn handle_request(&mut self) -> Result> { + match self.recv::().await?.request { + RecorderRequest::NewRequest(req) => { + self.handle_new_request(req).await?; + Ok(true) + } + + RecorderRequest::ResumeRequest(req) => { + self.handle_resume_request(req).await?; + Ok(false) + } + } + } + + /// Handle a new request from the recorder. + async fn handle_new_request( + &mut self, + request: NewRequest, + ) -> Result<(), RunnerProtoError> { + let download_dir = TempDir::new()?; + + let firefox_bin = self + .download_build(&request.build_task_id, download_dir.path()) + .await?; + assert!(firefox_bin.is_file()); + + let profile_path = match request.profile_size { + Some(profile_size) => self.recv_profile(profile_size, download_dir.path()).await?, + None => { + let profile_path = download_dir.path().join("profile"); + info!(self.log, "Creating new empty profile"); + create_dir_all(&profile_path).await?; + profile_path + } + }; + assert!(profile_path.is_dir()); + + if request.prefs.len() > 0 { + let prefs_path = profile_path.join("user.js"); + let mut f = match OpenOptions::new() + .append(true) + .create(true) + .open(&prefs_path) + .await + { + Ok(f) => f, + Err(e) => { + self.send(WritePrefs { + result: Err(e.into_error_message()), + }) + .await?; + + return Err(e.into()); + } + }; + + if let Err(e) = write_prefs(&mut f, request.prefs.into_iter()).await { + self.send(WritePrefs { + result: Err(e.into_error_message()), + }) + .await?; + return Err(e.into()); + } + } + + self.send(WritePrefs { result: Ok(()) }).await?; + + // TODO: Persist the profile and Firefox instance for a restart + + if let Err(e) = self + .shutdown_handler + .initiate_restart("fxrunner: restarting for cold Firefox start") + { + // TODO: Once we persist firefox and profile, we need + error!(self.log, "Could not restart"; "error" => ?e); + self.send(Restarting { + result: Err(e.into_error_message()), + }) + .await?; + + return Err(RunnerProtoError::Shutdown(e)); + } + + self.send(Restarting { result: Ok(()) }).await?; + + Ok(()) + } + + /// Handle a resume request from the runner. + async fn handle_resume_request( + &mut self, + _request: ResumeRequest, + ) -> Result<(), RunnerProtoError> { + info!(self.log, "Received resumption request"); + + self.send(ResumeResponse { result: Ok(()) }).await?; + + info!(self.log, "Waiting to become idle"); + + if let Err(e) = cpu_and_disk_idle(&self.perf_provider).await { + error!(self.log, "CPU and disk did not become idle"; "error" => %e); + self.send(WaitForIdle { + result: Err(e.into_error_message()), + }) + .await?; + + return Err(RunnerProtoError::WaitForIdle(e)); + } + info!(self.log, "Became idle"); + + self.send(WaitForIdle { result: Ok(()) }).await?; + + Ok(()) + } + + /// Download a build from taskcluster. + async fn download_build( + &mut self, + task_id: &str, + download_dir: &Path, + ) -> Result> { + info!(self.log, "Download build from Taskcluster"; "task_id" => &task_id); + self.send(DownloadBuild { + result: Ok(DownloadStatus::Downloading), + }) + .await?; + + let download_path = match self.tc.download_build_artifact(task_id, download_dir).await { + Ok(download_path) => download_path, + Err(e) => { + error!(self.log, "Could not download build"; "error" => ?e); + self.send(DownloadBuild { + result: Err(e.into_error_message()), + }) + .await?; + return Err(e.into()); + } + }; + + self.send(DownloadBuild { + result: Ok(DownloadStatus::Downloaded), + }) + .await?; + info!(self.log, "Extracting downloaded artifact..."); + + let unzip_result = spawn_blocking({ + let download_dir = PathBuf::from(download_dir); + move || unzip(&download_path, &download_dir) + }) + .await + .expect("unzip task was cancelled or panicked"); + + if let Err(e) = unzip_result { + self.send(DownloadBuild { + result: Err(e.into_error_message()), + }) + .await?; + return Err(e.into()); + } + + let firefox_path = download_dir.join("firefox").join("firefox.exe"); + if !firefox_path.exists() { + let err = RunnerProtoError::MissingFirefox; + + self.send(DownloadBuild { + result: Err(err.into_error_message()), + }) + .await?; + + return Err(err); + } + + info!(self.log, "Extracted build"); + self.send(DownloadBuild { + result: Ok(DownloadStatus::Extracted), + }) + .await?; + Ok(firefox_path) + } + + /// Receive a profile from the recorder. + async fn recv_profile( + &mut self, + profile_size: u64, + download_dir: &Path, + ) -> Result> { + info!(self.log, "Receiving profile..."); + self.send(RecvProfile { + result: Ok(DownloadStatus::Downloading), + }) + .await?; + + let mut stream = self.inner.take().unwrap().into_inner(); + let result = Self::recv_profile_raw(&mut stream, download_dir, profile_size).await; + self.inner = Some(Proto::new(stream)); + + let zip_path = match result { + Ok(zip_path) => zip_path, + Err(e) => { + self.send(DownloadBuild { + result: Err(e.into_error_message()), + }) + .await?; + return Err(e.into()); + } + }; + + info!(self.log, "Profile received; extracting..."); + self.send(RecvProfile { + result: Ok(DownloadStatus::Downloaded), + }) + .await?; + + let unzip_path = download_dir.join("profile"); + + let unzip_result = spawn_blocking({ + let zip_path = zip_path.clone(); + let unzip_path = unzip_path.clone(); + move || unzip(&zip_path, &unzip_path) + }) + .await + .expect("unzip profile task was cancelled or panicked"); + + let stats = match unzip_result { + Ok(stats) => stats, + Err(e) => { + error!(self.log, "Could not extract profile"; "error" => ?e); + + self.send(RecvProfile { + result: Err(e.into_error_message()), + }) + .await?; + + return Err(e.into()); + } + }; + + if stats.extracted == 0 { + error!(self.log, "Profile was empty"); + let e = RunnerProtoError::EmptyProfile; + self.send(RecvProfile { + result: Err(e.into_error_message()), + }) + .await?; + + return Err(e); + } + + info!(self.log, "Profile extracted"); + + let profile_dir = match stats.top_level_dir { + Some(top_level_dir) => unzip_path.join(top_level_dir), + None => unzip_path, + }; + + self.send(RecvProfile { + result: { Ok(DownloadStatus::Extracted) }, + }) + .await?; + + Ok(profile_dir) + } + + /// Receive the raw bytes of a profile from the recorder. + async fn recv_profile_raw( + stream: &mut TcpStream, + download_dir: &Path, + profile_size: u64, + ) -> Result> { + let zip_path = download_dir.join("profile.zip"); + let mut f = File::create(&zip_path).await?; + + tokio::io::copy(&mut stream.take(profile_size), &mut f).await?; + + Ok(zip_path) } /// Send the given message to the runner. @@ -76,285 +348,6 @@ where { self.inner.as_mut().unwrap().recv::().await } - - /// Handshake with FxRecorder. - pub async fn handshake_reply(&mut self) -> Result> { - info!(self.log, "Handshaking ..."); - let Handshake { restart } = self.recv().await?; - - if restart { - if let Err(e) = self - .shutdown_handler - .initiate_restart("fxrecord: recorder requested restart") - { - error!(self.log, "an error occurred while handshaking"; "error" => ?e); - self.send(HandshakeReply { - result: Err(e.into_error_message()), - }) - .await?; - - return Err(RunnerProtoError::Shutdown(e)); - } - info!(self.log, "Restart requested; restarting ..."); - } - - self.send(HandshakeReply { result: Ok(()) }).await?; - info!(self.log, "Handshake complete"); - - Ok(restart) - } - - pub async fn download_build_reply( - &mut self, - download_dir: &Path, - ) -> Result> { - let DownloadBuild { task_id } = self.recv().await?; - - info!(self.log, "Received build download request"; "task_id" => &task_id); - - self.send(DownloadBuildReply { - result: Ok(DownloadStatus::Downloading), - }) - .await?; - - match self - .tc - .download_build_artifact(&task_id, download_dir) - .await - { - Ok(download_path) => { - self.send(DownloadBuildReply { - result: Ok(DownloadStatus::Downloaded), - }) - .await?; - - let unzip_result = spawn_blocking({ - let download_dir = PathBuf::from(download_dir); - move || unzip(&download_path, &download_dir) - }) - .await - .expect("unzip task was cancelled or panicked"); - - if let Err(e) = unzip_result { - self.send(DownloadBuildReply { - result: Err(e.into_error_message()), - }) - .await?; - - Err(e.into()) - } else { - let firefox_path = download_dir.join("firefox").join("firefox.exe"); - - if !firefox_path.exists() { - let err = RunnerProtoError::MissingFirefox; - self.send(DownloadBuildReply { - result: Err(err.into_error_message()), - }) - .await?; - - Err(err) - } else { - self.send(DownloadBuildReply { - result: Ok(DownloadStatus::Extracted), - }) - .await?; - - Ok(firefox_path) - } - } - } - - Err(e) => { - error!(self.log, "could not download build"; "error" => ?e); - self.send(DownloadBuildReply { - result: Err(e.into_error_message()), - }) - .await?; - Err(e.into()) - } - } - } - - pub async fn send_profile_reply( - &mut self, - download_dir: &Path, - ) -> Result, RunnerProtoError> { - info!(self.log, "Waiting for profile..."); - - let SendProfile { profile_size } = self.recv().await?; - - let profile_size = match profile_size { - Some(profile_size) => profile_size, - None => { - info!(self.log, "No profile provided"); - self.send(SendProfileReply { result: Ok(None) }).await?; - - return Ok(None); - } - }; - - info!(self.log, "Receiving profile..."); - self.send(SendProfileReply { - result: Ok(Some(DownloadStatus::Downloading)), - }) - .await?; - - let mut stream = self.inner.take().unwrap().into_inner(); - let result = Self::send_profile_reply_impl( - &mut stream, - download_dir, - profile_size, - ) - .await; - self.inner = Some(Proto::new(stream)); - - info!(self.log, "Profile received; extracting..."); - - let zip_path = match result { - Ok(zip_path) => { - self.send(SendProfileReply { - result: { Ok(Some(DownloadStatus::Downloaded)) }, - }) - .await?; - zip_path - } - - Err(e) => { - self.send(SendProfileReply { - result: { Err(e.into_error_message()) }, - }) - .await?; - return Err(e); - } - }; - - let unzip_path = download_dir.join("profile"); - - let unzip_result = spawn_blocking({ - let zip_path = zip_path.clone(); - let unzip_path = unzip_path.clone(); - move || unzip(&zip_path, &unzip_path) - }) - .await - .expect("unzip profile task was cancelled or panicked"); - - let stats = match unzip_result { - Ok(stats) => stats, - Err(e) => { - error!(self.log, "Could not extract profile"; "error" => ?e); - - self.send(SendProfileReply { - result: Err(e.into_error_message()), - }) - .await?; - - return Err(e.into()); - } - }; - - if stats.extracted == 0 { - error!(self.log, "Profile was empty!"); - let e = RunnerProtoError::EmptyProfile; - self.send(SendProfileReply { - result: Err(e.into_error_message()), - }) - .await?; - - return Err(e); - } - - error!(self.log, "Profile extracted"); - - let profile_dir = match stats.top_level_dir { - Some(top_level_dir) => unzip_path.join(top_level_dir), - None => unzip_path, - }; - - self.send(SendProfileReply { - result: { Ok(Some(DownloadStatus::Extracted)) }, - }) - .await?; - - Ok(Some(profile_dir)) - } - - async fn send_profile_reply_impl( - stream: &mut TcpStream, - download_dir: &Path, - profile_size: u64, - ) -> Result> { - let zip_path = download_dir.join("profile.zip"); - let mut f = File::create(&zip_path).await?; - - tokio::io::copy(&mut stream.take(profile_size), &mut f).await?; - - Ok(zip_path) - } - - pub async fn send_prefs_reply( - &mut self, - prefs_path: &Path, - ) -> Result<(), RunnerProtoError> { - let SendPrefs { prefs } = self.recv().await?; - - if prefs.is_empty() { - return self - .send(SendPrefsReply { result: Ok(()) }) - .await - .map_err(Into::into); - } - - let mut f = match OpenOptions::new() - .append(true) - .create(true) - .open(&prefs_path) - .await - { - Ok(f) => f, - Err(e) => { - self.send(SendPrefsReply { - result: Err(e.into_error_message()), - }) - .await?; - return Err(e.into()); - } - }; - - match write_prefs(&mut f, prefs.into_iter()).await { - Ok(()) => { - self.send(SendPrefsReply { result: Ok(()) }).await?; - Ok(()) - } - Err(e) => { - self.send(SendPrefsReply { - result: Err(e.into_error_message()), - }) - .await?; - Err(e.into()) - } - } - } - - pub async fn wait_for_idle_reply(&mut self) -> Result<(), RunnerProtoError> { - self.recv::().await?; - - info!(self.log, "Waiting for CPU and disk to become idle..."); - - if let Err(e) = cpu_and_disk_idle(&self.perf_provider).await { - error!(self.log, "CPU and disk did not become idle"; "error" => %e); - self.send(WaitForIdleReply { - result: Err(e.into_error_message()), - }) - .await?; - - return Err(RunnerProtoError::WaitForIdle(e)); - } else { - self.send(WaitForIdleReply { result: Ok(()) }).await?; - } - - info!(self.log, "Did become idle"); - Ok(()) - } } #[derive(Debug, Display)] diff --git a/libfxrecord/src/net/message.rs b/libfxrecord/src/net/message.rs index 5b356de..57e299a 100644 --- a/libfxrecord/src/net/message.rs +++ b/libfxrecord/src/net/message.rs @@ -110,7 +110,7 @@ where fn kind() -> K; } -/// An error that occurs when attempting to extract a message variant.. +/// An error that occurs when attempting to extract a message variant. #[derive(Debug, Display)] #[display( fmt = "could not convert message of kind `{}' to kind `{}'", @@ -308,6 +308,7 @@ macro_rules! impl_message { type Error = KindMismatch<$kind_ty>; fn try_from(msg: $msg_ty) -> Result { + #[allow(irrefutable_let_patterns)] if let $msg_ty::$inner_ty(msg) = msg { Ok(msg) } else { @@ -328,6 +329,54 @@ macro_rules! impl_message { }; } +/// A request from the recorder to the runner. +#[derive(Debug, Deserialize, Serialize)] +pub enum RecorderRequest { + /// A new request. + /// + /// If successful, the runner will restart and the recorder should send a + /// [`ResumeRequest`](enum.RecorderRequest.html#variant.ResumeRequest) + /// upon reconnection. + NewRequest(NewRequest), + + /// A request to resume a [previous + /// request](enum.RecorderRequest.html#variant.NewRequest). + ResumeRequest(ResumeRequest), +} + +impl From for Request { + fn from(req: NewRequest) -> Request { + Request { + request: RecorderRequest::NewRequest(req), + } + } +} + +impl From for Request { + fn from(req: ResumeRequest) -> Request { + Request { + request: RecorderRequest::ResumeRequest(req), + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct NewRequest { + /// The task ID of the Taskcluster build task. + /// + /// The build artifact from this task will be downloaded by the runner. + pub build_task_id: String, + + /// The size of the profile that will be sent, if any. + pub profile_size: Option, + + /// Prefs to override in the profile. + pub prefs: Vec<(String, PrefValue)>, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ResumeRequest {} + impl_message! { /// A message from FxRecorder to FxRunner. RecorderMessage, @@ -335,35 +384,13 @@ impl_message! { /// The kind of a [`RecorderMessage`](struct.RecorderMessage.html). RecorderMessageKind; - /// A handshake from FxRecorder to FxRunner. - Handshake { - /// Whether or not the runner should restart. - restart: bool, + /// A request from the recorder to the runner. + Request { + request: RecorderRequest, }; - - /// A request to download a specific build of Firefox. - DownloadBuild { - /// The build task ID. - task_id: String, - }; - - /// A request to send a profile of the given size. - /// - /// A size of zero indicates that there is no profile. - SendProfile { - profile_size: Option, - }; - - /// A request for the runner to use the provided prefs. - SendPrefs { - prefs: Vec<(String, PrefValue)>, - }; - - /// A request for the runner to wait for its CPU and disk to become idle. - WaitForIdle; } -#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Display, Eq, PartialEq, Serialize, Deserialize)] pub enum DownloadStatus { Downloading, Downloaded, @@ -381,6 +408,8 @@ impl DownloadStatus { } } +pub type ForeignResult = Result>; + impl_message! { /// A message from FxRunner to FxRecorder. RunnerMessage, @@ -388,32 +417,33 @@ impl_message! { /// The kind of a [`RunnerMessage`](struct.RunnerMessage.html). RunnerMessageKind; - /// A reply to a [`Handshake`](struct.Handshake.html) from FxRecorder. - HandshakeReply { - result: Result<(), ErrorMessage>, + /// The status of the DownloadBuild phase. + DownloadBuild { + result: ForeignResult, }; - /// A reply to a [`DownloadBuild`](struct.DownloadBuild.html) message from - /// FxRecorder. - DownloadBuildReply { - result: Result>, + /// The status of the RecvProfile phase. + RecvProfile { + result: ForeignResult, }; - /// A reply to a [`SendProfile`](struct.SendProfile.html) message from - /// FxRecorder. - SendProfileReply { - result: Result, ErrorMessage>, + /// The status of the WritePrefs phase. + WritePrefs { + result: ForeignResult<()>, }; - /// A reply to a [`SendPrefs`](struct.SendPrefs.html) message from - /// FxRecorder. - SendPrefsReply { - result: Result<(), ErrorMessage>, + /// The status of the Restarting phase. + Restarting { + result: ForeignResult<()>, }; - /// A reply to a [`WaitForIdle`](struct.WaitForIdle.html) message from - /// FxRecorder. - WaitForIdleReply { - result: Result<(), ErrorMessage>, + /// The status of the ResumeResponse phase. + ResumeResponse { + result: ForeignResult<()>, + }; + + /// The status of the WaitForIdle phase. + WaitForIdle { + result: ForeignResult<()>, }; }