Instead of the recorder constantly sending messages to the runner, it
now sends a single `Request` message and receives a series of responses
from the runner.

Tests will be fixed in a later commit.

Received profiles, prefs, and Firefox builds are not presently persisted
through restarts. This will be fixed in an upcoming patch.

The protocol docs no longer match what happens, so they've been removed
for now.
This commit is contained in:
Barret Rennie 2020-06-26 16:15:01 -04:00
Родитель 54483275ae
Коммит 4870a145c8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4D71D86C09132D72
13 изменённых файлов: 535 добавлений и 676 удалений

Двоичные данные
docs/diagrams/download-build.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 20 KiB

Двоичные данные
docs/diagrams/handshake-failure.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 9.1 KiB

Двоичные данные
docs/diagrams/handshake.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 14 KiB

Двоичные данные
docs/diagrams/send-prefs.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 9.4 KiB

Двоичные данные
docs/diagrams/send-profile-empty.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 7.6 KiB

Двоичные данные
docs/diagrams/send-profile.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 18 KiB

Двоичные данные
docs/diagrams/wait-for-idle.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 9.4 KiB

Просмотреть файл

@ -1,99 +0,0 @@
# fxrecord protocol
The fxrecord protocol is broken up into a number of sections:
1. Handshake
2. DownloadBuild
3. SendProfile
4. SendPrefs
5. WaitForIdle
## Message Format
Messages are encoded as JSON blobs (via Serde). Each message is prefixed with
a 4-byte length.
Example:
```
00 00 00 1F # Length of Message (31)
{"Handshake":{"restart":false}}
```
In replies, it is common for the recorder to send a `Result` back. If the
result is `Ok`, then this indicates that the corresponding request was
successful. However, if an `Err` is returned to the recorder, then a fatal
error has occurred and the protocol cannot continue. At this point, the
recorder and runner will disconnect from eachother.
An example of protocol failure can be seen below in Figure 2.
## 1. Handshake
The protocol is initiated by the recorder connecting to the runner over TCP.
The recorder will send a `Handshake` message to the runner, indicating that
it should restart. The runner replies with a `HandshakeReply` with the status
of the restart operation. They then disconnect and the recorder waits for the
runner to restart.
> ![](/docs/diagrams/handshake.png)
>
> Figure 1: Handshake
If something goes wrong with the handshake on the runner's end (such as a
failure with the Windows API when attempting to restart), it will instead
reply with an error message inside its `HandshakeReply`:
> ![](/docs/diagrams/handshake-failure.png)
>
> Figure 2: Handshake Failure
If the recorder requested a restart, it will then attempt to reconnect to the
runner with exponential backoff and handshake again, this time not requesting
a restart
## 2. DownloadBuild
After reconnecting, the next message from the recorder will be for the runner
to download a specific build of Firefox from Taskcluster.
> ![](/docs/diagrams/download-build.png)
>
> Figure 3: Download Build
## 3. SendProfile
After fxrunner has downloaded a build, fxrecorder can optionally send a
zipped profile for it to use when running Firefox. If it does, it will send a
`SendProfile` message with the given profile size. It will then drop the
protocol down to a raw TCP connection and transfer the profile as raw bytes.
The runner will receive these bytes and write them to disk, then extract the
profile.
> ![](/docs/diagrams/send-profile.png)
>
> Figure 4: Send Profile
However, if no preset profile is to be used, an empty `SendProfile` message is
sent and fxrunner will have Firefox generate a new profile on start.
> ![](/docs/diagrams/send-profile-empty.png)
>
> Figure 5: Send Profile (Empty Profile Case)
## 4. SendPrefs
Next, the fxrecorder may send a list of prefs that fxrunner should use when
running Firefox. If provided, they will be written to the `prefs.js` in the
profile directory from the `SendProfile` phase. If no profile was transferred
in that phase, a new profile directory will be created containing `prefs.js`.
> ![](/docs/diagrams/send-prefs.png)
>
> Figure 6: Send Prefs
## 5. WaitForIdle
> ![](/docs/diagrams/wait-for-idle.png)
>
> Figure 7: Wait for Idle

Просмотреть файл

@ -72,9 +72,13 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(),
let mut proto = RecorderProto::new(log.clone(), stream); let mut proto = RecorderProto::new(log.clone(), stream);
proto.handshake(true).await?; proto
.send_new_request(&task_id, profile_path.as_ref().map(PathBuf::as_path), prefs)
.await?;
} }
info!(log, "Disconnected from runner. Waiting to reconnect...");
{ {
let reconnect = || { let reconnect = || {
info!(log, "Attempting re-connection to runner..."); info!(log, "Attempting re-connection to runner...");
@ -97,13 +101,7 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(),
let mut proto = RecorderProto::new(log, stream); let mut proto = RecorderProto::new(log, stream);
proto.handshake(false).await?; proto.send_resume_request().await?;
proto.download_build(&task_id).await?;
proto
.send_profile(profile_path.as_ref().map(PathBuf::as_path))
.await?;
proto.send_prefs(prefs).await?;
proto.wait_for_idle().await?;
} }
Ok(()) Ok(())

Просмотреть файл

@ -21,6 +21,7 @@ pub struct RecorderProto {
} }
impl RecorderProto { impl RecorderProto {
/// Create a new RecorderProto.
pub fn new(log: Logger, stream: TcpStream) -> RecorderProto { pub fn new(log: Logger, stream: TcpStream) -> RecorderProto {
Self { Self {
inner: Some(Proto::new(stream)), inner: Some(Proto::new(stream)),
@ -28,11 +29,174 @@ impl RecorderProto {
} }
} }
/// Consume the RecorderProto and return the underlying `Proto`. /// Send a new request to the runner.
pub fn into_inner( pub async fn send_new_request(
self, &mut self,
) -> Proto<RunnerMessage, RecorderMessage, RunnerMessageKind, RecorderMessageKind> { task_id: &str,
self.inner.unwrap() profile_path: Option<&Path>,
prefs: Vec<(String, PrefValue)>,
) -> Result<(), RecorderProtoError> {
info!(self.log, "Sending request");
let profile_size = match profile_path {
None => None,
Some(profile_path) => Some(tokio::fs::metadata(profile_path).await?.len()),
};
self.send::<Request>(
NewRequest {
build_task_id: task_id.into(),
profile_size,
prefs,
}
.into(),
)
.await?;
loop {
let DownloadBuild { result } = self.recv().await?;
match result {
Ok(DownloadStatus::Downloading) => {
info!(self.log, "Downloading build ...");
}
Ok(DownloadStatus::Downloaded) => {
info!(self.log, "Build download complete; extracting build ...");
}
Ok(DownloadStatus::Extracted) => {
info!(self.log, "Build extracted");
break;
}
Err(e) => {
error!(self.log, "Build download failed"; "task_id" => task_id, "error" => ?e);
return Err(e.into());
}
}
}
if let Some(profile_path) = profile_path {
self.send_profile(profile_path, profile_size.unwrap())
.await?
} else {
info!(self.log, "No profile to send");
}
if let WritePrefs { result: Err(e) } = self.recv().await? {
error!(self.log, "Runner could not write prefs"; "error" => ?e);
return Err(e.into());
}
if let Restarting { result: Err(e) } = self.recv().await? {
error!(self.log, "Runner could not restart"; "error" => ?e);
return Err(e.into());
}
info!(self.log, "Runner is restarting...");
Ok(())
}
/// Send a resume request to the runner.
pub async fn send_resume_request(&mut self) -> Result<(), RecorderProtoError> {
info!(self.log, "Resuming request");
self.send::<Request>(ResumeRequest {}.into()).await?;
if let ResumeResponse { result: Err(e) } = self.recv().await? {
error!(self.log, "Could not resume request with runner"; "error" => ?e);
return Err(e.into());
}
info!(self.log, "Waiting for runner to become idle...");
if let WaitForIdle { result: Err(e) } = self.recv().await? {
error!(self.log, "Runner could not become idle"; "error" => ?e);
return Err(e.into());
}
info!(self.log, "Runner became idle");
Ok(())
}
/// Send the profile at the given path to the runner.
async fn send_profile(
&mut self,
profile_path: &Path,
profile_size: u64,
) -> Result<(), RecorderProtoError> {
let RecvProfile { result } = self.recv().await?;
match result? {
DownloadStatus::Downloading => {
info!(self.log, "Sending profile"; "profile_size" => profile_size);
}
unexpected => {
return Err(RecorderProtoError::RecvProfileMismatch {
received: unexpected,
expected: DownloadStatus::Downloading,
}
.into())
}
}
let mut stream = self.inner.take().unwrap().into_inner();
let result = RecorderProto::send_profile_impl(&mut stream, profile_path).await;
self.inner = Some(Proto::new(stream));
result?;
let mut state = DownloadStatus::Downloading;
loop {
let next_state = self.recv::<RecvProfile>().await?.result?;
assert_ne!(state, DownloadStatus::Extracted);
let expected = state.next().unwrap();
if expected != next_state {
return Err(RecorderProtoError::RecvProfileMismatch {
received: next_state,
expected: expected,
}
.into());
}
state = next_state;
match state {
// This would be caught above because this is never an expected state.
DownloadStatus::Downloading => unreachable!(),
DownloadStatus::Downloaded => {
info!(self.log, "Profile sent; extracting...");
}
DownloadStatus::Extracted => {
info!(self.log, "Profile extracted");
break;
}
}
}
assert!(state == DownloadStatus::Extracted);
Ok(())
}
/// Write the raw bytes from the profile to the runner.
async fn send_profile_impl(
stream: &mut TcpStream,
profile_path: &Path,
) -> Result<(), RecorderProtoError> {
let mut f = File::open(profile_path).await?;
tokio::io::copy(&mut f, stream)
.await
.map_err(Into::into)
.map(drop)
} }
/// Send the given message to the recorder. /// Send the given message to the recorder.
@ -54,209 +218,6 @@ impl RecorderProto {
{ {
self.inner.as_mut().unwrap().recv::<M>().await self.inner.as_mut().unwrap().recv::<M>().await
} }
/// Handshake with FxRunner.
pub async fn handshake(&mut self, restart: bool) -> Result<(), RecorderProtoError> {
info!(self.log, "Handshaking ...");
self.send(Handshake { restart }).await?;
let HandshakeReply { result } = self.recv().await?;
match result {
Ok(..) => {
info!(self.log, "Handshake complete");
Ok(())
}
Err(e) => {
info!(self.log, "Handshake failed: runner could not restart"; "error" => ?e);
Err(e.into())
}
}
}
pub async fn download_build(&mut self, task_id: &str) -> Result<(), RecorderProtoError> {
info!(self.log, "Requesting download of build from task"; "task_id" => task_id);
self.send(DownloadBuild {
task_id: task_id.into(),
})
.await?;
loop {
let DownloadBuildReply { result } = self.recv().await?;
match result {
Ok(DownloadStatus::Downloading) => {
info!(self.log, "Downloading build ...");
}
Ok(DownloadStatus::Downloaded) => {
info!(self.log, "Build download complete; extracting build ...");
}
Ok(DownloadStatus::Extracted) => {
info!(self.log, "Build extracted");
return Ok(());
}
Err(e) => {
error!(self.log, "Build download failed"; "task_id" => task_id, "error" => ?e);
return Err(e.into());
}
}
}
}
/// Send the profile at the given path to the runner.
///
/// If the profile path is specified, the profile must exist, or this function will panic.
pub async fn send_profile(
&mut self,
profile_path: Option<&Path>,
) -> Result<(), RecorderProtoError> {
let profile_path = match profile_path {
Some(profile_path) => profile_path,
None => {
info!(self.log, "No profile to send");
self.send(SendProfile { profile_size: None }).await?;
let SendProfileReply { result } = self.recv().await?;
return match result? {
Some(unexpected) => Err(RecorderProtoError::SendProfileMismatch {
expected: None,
received: Some(unexpected),
}
.into()),
None => Ok(()),
};
}
};
assert!(profile_path.exists());
let profile_size = tokio::fs::metadata(profile_path).await?.len();
self.send(SendProfile {
profile_size: Some(profile_size),
})
.await?;
let SendProfileReply { result } = self.recv().await?;
match result? {
Some(DownloadStatus::Downloading) => {
info!(self.log, "Sending profile"; "profile_size" => profile_size);
}
unexpected => {
return Err(RecorderProtoError::SendProfileMismatch {
received: unexpected,
expected: Some(DownloadStatus::Downloading),
}
.into())
}
}
let mut stream = self.inner.take().unwrap().into_inner();
let result = RecorderProto::send_profile_impl(&mut stream, profile_path).await;
self.inner = Some(Proto::new(stream));
result?;
let mut state = DownloadStatus::Downloading;
loop {
let SendProfileReply { result } = self.recv().await?;
match result? {
Some(next_state) => {
assert_ne!(state, DownloadStatus::Extracted);
let expected = state.next().unwrap();
if expected != next_state {
return Err(RecorderProtoError::SendProfileMismatch {
received: Some(next_state),
expected: Some(expected),
}
.into());
}
state = next_state;
match state {
// This would be caught above because this is never an expected state.
DownloadStatus::Downloading => unreachable!(),
DownloadStatus::Downloaded => {
info!(self.log, "Profile sent; extracting...");
}
DownloadStatus::Extracted => {
info!(self.log, "Profile extracted");
break;
}
}
}
None => {
return Err(RecorderProtoError::SendProfileMismatch {
received: None,
expected: state.next(),
}
.into())
}
}
}
assert!(state == DownloadStatus::Extracted);
Ok(())
}
async fn send_profile_impl(
stream: &mut TcpStream,
profile_path: &Path,
) -> Result<(), RecorderProtoError> {
let mut f = File::open(profile_path).await?;
tokio::io::copy(&mut f, stream)
.await
.map_err(Into::into)
.map(drop)
}
/// Send the preferences that the runner should use.
pub async fn send_prefs(
&mut self,
prefs: Vec<(String, PrefValue)>,
) -> Result<(), RecorderProtoError> {
info!(self.log, "Sending prefs ...");
self.send(SendPrefs { prefs }).await?;
let SendPrefsReply { result } = self.recv().await?;
if let Err(e) = result {
error!(self.log, "Could not send prefs"; "error" => ?e);
return Err(e.into());
}
info!(self.log, "Prefs sent");
Ok(())
}
pub async fn wait_for_idle(&mut self) -> Result<(), RecorderProtoError> {
info!(self.log, "Waiting for runner to become idle...");
self.send(WaitForIdle).await?;
let WaitForIdleReply { result } = self.recv().await?;
if let Err(e) = result {
error!(self.log, "Runner did not go idle"; "error" => %e);
Err(e.into())
} else {
info!(self.log, "Runner is now idle");
Ok(())
}
}
} }
#[derive(Debug, Display)] #[derive(Debug, Display)]
@ -264,13 +225,13 @@ pub enum RecorderProtoError {
Proto(ProtoError<RunnerMessageKind>), Proto(ProtoError<RunnerMessageKind>),
#[display( #[display(
fmt = "Expected a download status of `{:?}', but received `{:?}' instead", fmt = "Expected a download status of `{}', but received `{}' instead",
expected, expected,
received received
)] )]
SendProfileMismatch { RecvProfileMismatch {
expected: Option<DownloadStatus>, expected: DownloadStatus,
received: Option<DownloadStatus>, received: DownloadStatus,
}, },
} }
@ -278,7 +239,7 @@ impl Error for RecorderProtoError {
fn source(&self) -> Option<&(dyn Error + 'static)> { fn source(&self) -> Option<&(dyn Error + 'static)> {
match self { match self {
RecorderProtoError::Proto(ref e) => Some(e), RecorderProtoError::Proto(ref e) => Some(e),
RecorderProtoError::SendProfileMismatch { .. } => None, RecorderProtoError::RecvProfileMismatch { .. } => None,
} }
} }
} }

Просмотреть файл

@ -13,8 +13,6 @@ use libfxrunner::proto::RunnerProto;
use libfxrunner::taskcluster::Taskcluster; use libfxrunner::taskcluster::Taskcluster;
use slog::{info, Logger}; use slog::{info, Logger};
use structopt::StructOpt; use structopt::StructOpt;
use tempfile::TempDir;
use tokio::fs::create_dir_all;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::time::delay_for; use tokio::time::delay_for;
@ -79,31 +77,9 @@ async fn fxrunner(log: Logger, options: Options, config: Config) -> Result<(), B
WindowsPerfProvider::default(), WindowsPerfProvider::default(),
); );
if proto.handshake_reply().await? { if proto.handle_request().await? {
break; break;
} }
// We download everything into a temporary directory that will be
// cleaned up after the connection closes.
let download_dir = TempDir::new()?;
let firefox_bin = proto.download_build_reply(download_dir.path()).await?;
assert!(firefox_bin.is_file());
let profile_path = match proto.send_profile_reply(download_dir.path()).await? {
Some(profile_path) => profile_path,
None => {
let profile_path = download_dir.path().join("profile");
info!(log, "Creating new empty profile");
create_dir_all(&profile_path).await?;
profile_path
}
};
assert!(profile_path.is_dir());
proto
.send_prefs_reply(&profile_path.join("user.js"))
.await?;
proto.wait_for_idle_reply().await?;
info!(log, "Client disconnected"); info!(log, "Client disconnected");
} }

Просмотреть файл

@ -11,7 +11,8 @@ use libfxrecord::error::ErrorExt;
use libfxrecord::net::*; use libfxrecord::net::*;
use libfxrecord::prefs::write_prefs; use libfxrecord::prefs::write_prefs;
use slog::{error, info, Logger}; use slog::{error, info, Logger};
use tokio::fs::{File, OpenOptions}; use tempfile::TempDir;
use tokio::fs::{create_dir_all, File, OpenOptions};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
@ -50,11 +51,282 @@ where
} }
} }
/// Consume the RunnerProto and return the underlying `Proto`. /// Handle a request from the recorder.
pub fn into_inner( pub async fn handle_request(&mut self) -> Result<bool, RunnerProtoError<S, P>> {
self, match self.recv::<Request>().await?.request {
) -> Proto<RecorderMessage, RunnerMessage, RecorderMessageKind, RunnerMessageKind> { RecorderRequest::NewRequest(req) => {
self.inner.unwrap() self.handle_new_request(req).await?;
Ok(true)
}
RecorderRequest::ResumeRequest(req) => {
self.handle_resume_request(req).await?;
Ok(false)
}
}
}
/// Handle a new request from the recorder.
async fn handle_new_request(
&mut self,
request: NewRequest,
) -> Result<(), RunnerProtoError<S, P>> {
let download_dir = TempDir::new()?;
let firefox_bin = self
.download_build(&request.build_task_id, download_dir.path())
.await?;
assert!(firefox_bin.is_file());
let profile_path = match request.profile_size {
Some(profile_size) => self.recv_profile(profile_size, download_dir.path()).await?,
None => {
let profile_path = download_dir.path().join("profile");
info!(self.log, "Creating new empty profile");
create_dir_all(&profile_path).await?;
profile_path
}
};
assert!(profile_path.is_dir());
if request.prefs.len() > 0 {
let prefs_path = profile_path.join("user.js");
let mut f = match OpenOptions::new()
.append(true)
.create(true)
.open(&prefs_path)
.await
{
Ok(f) => f,
Err(e) => {
self.send(WritePrefs {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
};
if let Err(e) = write_prefs(&mut f, request.prefs.into_iter()).await {
self.send(WritePrefs {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
}
self.send(WritePrefs { result: Ok(()) }).await?;
// TODO: Persist the profile and Firefox instance for a restart
if let Err(e) = self
.shutdown_handler
.initiate_restart("fxrunner: restarting for cold Firefox start")
{
// TODO: Once we persist firefox and profile, we need
error!(self.log, "Could not restart"; "error" => ?e);
self.send(Restarting {
result: Err(e.into_error_message()),
})
.await?;
return Err(RunnerProtoError::Shutdown(e));
}
self.send(Restarting { result: Ok(()) }).await?;
Ok(())
}
/// Handle a resume request from the runner.
async fn handle_resume_request(
&mut self,
_request: ResumeRequest,
) -> Result<(), RunnerProtoError<S, P>> {
info!(self.log, "Received resumption request");
self.send(ResumeResponse { result: Ok(()) }).await?;
info!(self.log, "Waiting to become idle");
if let Err(e) = cpu_and_disk_idle(&self.perf_provider).await {
error!(self.log, "CPU and disk did not become idle"; "error" => %e);
self.send(WaitForIdle {
result: Err(e.into_error_message()),
})
.await?;
return Err(RunnerProtoError::WaitForIdle(e));
}
info!(self.log, "Became idle");
self.send(WaitForIdle { result: Ok(()) }).await?;
Ok(())
}
/// Download a build from taskcluster.
async fn download_build(
&mut self,
task_id: &str,
download_dir: &Path,
) -> Result<PathBuf, RunnerProtoError<S, P>> {
info!(self.log, "Download build from Taskcluster"; "task_id" => &task_id);
self.send(DownloadBuild {
result: Ok(DownloadStatus::Downloading),
})
.await?;
let download_path = match self.tc.download_build_artifact(task_id, download_dir).await {
Ok(download_path) => download_path,
Err(e) => {
error!(self.log, "Could not download build"; "error" => ?e);
self.send(DownloadBuild {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
};
self.send(DownloadBuild {
result: Ok(DownloadStatus::Downloaded),
})
.await?;
info!(self.log, "Extracting downloaded artifact...");
let unzip_result = spawn_blocking({
let download_dir = PathBuf::from(download_dir);
move || unzip(&download_path, &download_dir)
})
.await
.expect("unzip task was cancelled or panicked");
if let Err(e) = unzip_result {
self.send(DownloadBuild {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
let firefox_path = download_dir.join("firefox").join("firefox.exe");
if !firefox_path.exists() {
let err = RunnerProtoError::MissingFirefox;
self.send(DownloadBuild {
result: Err(err.into_error_message()),
})
.await?;
return Err(err);
}
info!(self.log, "Extracted build");
self.send(DownloadBuild {
result: Ok(DownloadStatus::Extracted),
})
.await?;
Ok(firefox_path)
}
/// Receive a profile from the recorder.
async fn recv_profile(
&mut self,
profile_size: u64,
download_dir: &Path,
) -> Result<PathBuf, RunnerProtoError<S, P>> {
info!(self.log, "Receiving profile...");
self.send(RecvProfile {
result: Ok(DownloadStatus::Downloading),
})
.await?;
let mut stream = self.inner.take().unwrap().into_inner();
let result = Self::recv_profile_raw(&mut stream, download_dir, profile_size).await;
self.inner = Some(Proto::new(stream));
let zip_path = match result {
Ok(zip_path) => zip_path,
Err(e) => {
self.send(DownloadBuild {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
};
info!(self.log, "Profile received; extracting...");
self.send(RecvProfile {
result: Ok(DownloadStatus::Downloaded),
})
.await?;
let unzip_path = download_dir.join("profile");
let unzip_result = spawn_blocking({
let zip_path = zip_path.clone();
let unzip_path = unzip_path.clone();
move || unzip(&zip_path, &unzip_path)
})
.await
.expect("unzip profile task was cancelled or panicked");
let stats = match unzip_result {
Ok(stats) => stats,
Err(e) => {
error!(self.log, "Could not extract profile"; "error" => ?e);
self.send(RecvProfile {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
};
if stats.extracted == 0 {
error!(self.log, "Profile was empty");
let e = RunnerProtoError::EmptyProfile;
self.send(RecvProfile {
result: Err(e.into_error_message()),
})
.await?;
return Err(e);
}
info!(self.log, "Profile extracted");
let profile_dir = match stats.top_level_dir {
Some(top_level_dir) => unzip_path.join(top_level_dir),
None => unzip_path,
};
self.send(RecvProfile {
result: { Ok(DownloadStatus::Extracted) },
})
.await?;
Ok(profile_dir)
}
/// Receive the raw bytes of a profile from the recorder.
async fn recv_profile_raw(
stream: &mut TcpStream,
download_dir: &Path,
profile_size: u64,
) -> Result<PathBuf, RunnerProtoError<S, P>> {
let zip_path = download_dir.join("profile.zip");
let mut f = File::create(&zip_path).await?;
tokio::io::copy(&mut stream.take(profile_size), &mut f).await?;
Ok(zip_path)
} }
/// Send the given message to the runner. /// Send the given message to the runner.
@ -76,285 +348,6 @@ where
{ {
self.inner.as_mut().unwrap().recv::<M>().await self.inner.as_mut().unwrap().recv::<M>().await
} }
/// Handshake with FxRecorder.
pub async fn handshake_reply(&mut self) -> Result<bool, RunnerProtoError<S, P>> {
info!(self.log, "Handshaking ...");
let Handshake { restart } = self.recv().await?;
if restart {
if let Err(e) = self
.shutdown_handler
.initiate_restart("fxrecord: recorder requested restart")
{
error!(self.log, "an error occurred while handshaking"; "error" => ?e);
self.send(HandshakeReply {
result: Err(e.into_error_message()),
})
.await?;
return Err(RunnerProtoError::Shutdown(e));
}
info!(self.log, "Restart requested; restarting ...");
}
self.send(HandshakeReply { result: Ok(()) }).await?;
info!(self.log, "Handshake complete");
Ok(restart)
}
pub async fn download_build_reply(
&mut self,
download_dir: &Path,
) -> Result<PathBuf, RunnerProtoError<S, P>> {
let DownloadBuild { task_id } = self.recv().await?;
info!(self.log, "Received build download request"; "task_id" => &task_id);
self.send(DownloadBuildReply {
result: Ok(DownloadStatus::Downloading),
})
.await?;
match self
.tc
.download_build_artifact(&task_id, download_dir)
.await
{
Ok(download_path) => {
self.send(DownloadBuildReply {
result: Ok(DownloadStatus::Downloaded),
})
.await?;
let unzip_result = spawn_blocking({
let download_dir = PathBuf::from(download_dir);
move || unzip(&download_path, &download_dir)
})
.await
.expect("unzip task was cancelled or panicked");
if let Err(e) = unzip_result {
self.send(DownloadBuildReply {
result: Err(e.into_error_message()),
})
.await?;
Err(e.into())
} else {
let firefox_path = download_dir.join("firefox").join("firefox.exe");
if !firefox_path.exists() {
let err = RunnerProtoError::MissingFirefox;
self.send(DownloadBuildReply {
result: Err(err.into_error_message()),
})
.await?;
Err(err)
} else {
self.send(DownloadBuildReply {
result: Ok(DownloadStatus::Extracted),
})
.await?;
Ok(firefox_path)
}
}
}
Err(e) => {
error!(self.log, "could not download build"; "error" => ?e);
self.send(DownloadBuildReply {
result: Err(e.into_error_message()),
})
.await?;
Err(e.into())
}
}
}
pub async fn send_profile_reply(
&mut self,
download_dir: &Path,
) -> Result<Option<PathBuf>, RunnerProtoError<S, P>> {
info!(self.log, "Waiting for profile...");
let SendProfile { profile_size } = self.recv().await?;
let profile_size = match profile_size {
Some(profile_size) => profile_size,
None => {
info!(self.log, "No profile provided");
self.send(SendProfileReply { result: Ok(None) }).await?;
return Ok(None);
}
};
info!(self.log, "Receiving profile...");
self.send(SendProfileReply {
result: Ok(Some(DownloadStatus::Downloading)),
})
.await?;
let mut stream = self.inner.take().unwrap().into_inner();
let result = Self::send_profile_reply_impl(
&mut stream,
download_dir,
profile_size,
)
.await;
self.inner = Some(Proto::new(stream));
info!(self.log, "Profile received; extracting...");
let zip_path = match result {
Ok(zip_path) => {
self.send(SendProfileReply {
result: { Ok(Some(DownloadStatus::Downloaded)) },
})
.await?;
zip_path
}
Err(e) => {
self.send(SendProfileReply {
result: { Err(e.into_error_message()) },
})
.await?;
return Err(e);
}
};
let unzip_path = download_dir.join("profile");
let unzip_result = spawn_blocking({
let zip_path = zip_path.clone();
let unzip_path = unzip_path.clone();
move || unzip(&zip_path, &unzip_path)
})
.await
.expect("unzip profile task was cancelled or panicked");
let stats = match unzip_result {
Ok(stats) => stats,
Err(e) => {
error!(self.log, "Could not extract profile"; "error" => ?e);
self.send(SendProfileReply {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
};
if stats.extracted == 0 {
error!(self.log, "Profile was empty!");
let e = RunnerProtoError::EmptyProfile;
self.send(SendProfileReply {
result: Err(e.into_error_message()),
})
.await?;
return Err(e);
}
error!(self.log, "Profile extracted");
let profile_dir = match stats.top_level_dir {
Some(top_level_dir) => unzip_path.join(top_level_dir),
None => unzip_path,
};
self.send(SendProfileReply {
result: { Ok(Some(DownloadStatus::Extracted)) },
})
.await?;
Ok(Some(profile_dir))
}
async fn send_profile_reply_impl(
stream: &mut TcpStream,
download_dir: &Path,
profile_size: u64,
) -> Result<PathBuf, RunnerProtoError<S, P>> {
let zip_path = download_dir.join("profile.zip");
let mut f = File::create(&zip_path).await?;
tokio::io::copy(&mut stream.take(profile_size), &mut f).await?;
Ok(zip_path)
}
pub async fn send_prefs_reply(
&mut self,
prefs_path: &Path,
) -> Result<(), RunnerProtoError<S, P>> {
let SendPrefs { prefs } = self.recv().await?;
if prefs.is_empty() {
return self
.send(SendPrefsReply { result: Ok(()) })
.await
.map_err(Into::into);
}
let mut f = match OpenOptions::new()
.append(true)
.create(true)
.open(&prefs_path)
.await
{
Ok(f) => f,
Err(e) => {
self.send(SendPrefsReply {
result: Err(e.into_error_message()),
})
.await?;
return Err(e.into());
}
};
match write_prefs(&mut f, prefs.into_iter()).await {
Ok(()) => {
self.send(SendPrefsReply { result: Ok(()) }).await?;
Ok(())
}
Err(e) => {
self.send(SendPrefsReply {
result: Err(e.into_error_message()),
})
.await?;
Err(e.into())
}
}
}
pub async fn wait_for_idle_reply(&mut self) -> Result<(), RunnerProtoError<S, P>> {
self.recv::<WaitForIdle>().await?;
info!(self.log, "Waiting for CPU and disk to become idle...");
if let Err(e) = cpu_and_disk_idle(&self.perf_provider).await {
error!(self.log, "CPU and disk did not become idle"; "error" => %e);
self.send(WaitForIdleReply {
result: Err(e.into_error_message()),
})
.await?;
return Err(RunnerProtoError::WaitForIdle(e));
} else {
self.send(WaitForIdleReply { result: Ok(()) }).await?;
}
info!(self.log, "Did become idle");
Ok(())
}
} }
#[derive(Debug, Display)] #[derive(Debug, Display)]

Просмотреть файл

@ -110,7 +110,7 @@ where
fn kind() -> K; fn kind() -> K;
} }
/// An error that occurs when attempting to extract a message variant.. /// An error that occurs when attempting to extract a message variant.
#[derive(Debug, Display)] #[derive(Debug, Display)]
#[display( #[display(
fmt = "could not convert message of kind `{}' to kind `{}'", fmt = "could not convert message of kind `{}' to kind `{}'",
@ -308,6 +308,7 @@ macro_rules! impl_message {
type Error = KindMismatch<$kind_ty>; type Error = KindMismatch<$kind_ty>;
fn try_from(msg: $msg_ty) -> Result<Self, Self::Error> { fn try_from(msg: $msg_ty) -> Result<Self, Self::Error> {
#[allow(irrefutable_let_patterns)]
if let $msg_ty::$inner_ty(msg) = msg { if let $msg_ty::$inner_ty(msg) = msg {
Ok(msg) Ok(msg)
} else { } else {
@ -328,6 +329,54 @@ macro_rules! impl_message {
}; };
} }
/// A request from the recorder to the runner.
#[derive(Debug, Deserialize, Serialize)]
pub enum RecorderRequest {
/// A new request.
///
/// If successful, the runner will restart and the recorder should send a
/// [`ResumeRequest`](enum.RecorderRequest.html#variant.ResumeRequest)
/// upon reconnection.
NewRequest(NewRequest),
/// A request to resume a [previous
/// request](enum.RecorderRequest.html#variant.NewRequest).
ResumeRequest(ResumeRequest),
}
impl From<NewRequest> for Request {
fn from(req: NewRequest) -> Request {
Request {
request: RecorderRequest::NewRequest(req),
}
}
}
impl From<ResumeRequest> for Request {
fn from(req: ResumeRequest) -> Request {
Request {
request: RecorderRequest::ResumeRequest(req),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct NewRequest {
/// The task ID of the Taskcluster build task.
///
/// The build artifact from this task will be downloaded by the runner.
pub build_task_id: String,
/// The size of the profile that will be sent, if any.
pub profile_size: Option<u64>,
/// Prefs to override in the profile.
pub prefs: Vec<(String, PrefValue)>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct ResumeRequest {}
impl_message! { impl_message! {
/// A message from FxRecorder to FxRunner. /// A message from FxRecorder to FxRunner.
RecorderMessage, RecorderMessage,
@ -335,35 +384,13 @@ impl_message! {
/// The kind of a [`RecorderMessage`](struct.RecorderMessage.html). /// The kind of a [`RecorderMessage`](struct.RecorderMessage.html).
RecorderMessageKind; RecorderMessageKind;
/// A handshake from FxRecorder to FxRunner. /// A request from the recorder to the runner.
Handshake { Request {
/// Whether or not the runner should restart. request: RecorderRequest,
restart: bool,
}; };
/// A request to download a specific build of Firefox.
DownloadBuild {
/// The build task ID.
task_id: String,
};
/// A request to send a profile of the given size.
///
/// A size of zero indicates that there is no profile.
SendProfile {
profile_size: Option<u64>,
};
/// A request for the runner to use the provided prefs.
SendPrefs {
prefs: Vec<(String, PrefValue)>,
};
/// A request for the runner to wait for its CPU and disk to become idle.
WaitForIdle;
} }
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] #[derive(Debug, Display, Eq, PartialEq, Serialize, Deserialize)]
pub enum DownloadStatus { pub enum DownloadStatus {
Downloading, Downloading,
Downloaded, Downloaded,
@ -381,6 +408,8 @@ impl DownloadStatus {
} }
} }
pub type ForeignResult<T> = Result<T, ErrorMessage<String>>;
impl_message! { impl_message! {
/// A message from FxRunner to FxRecorder. /// A message from FxRunner to FxRecorder.
RunnerMessage, RunnerMessage,
@ -388,32 +417,33 @@ impl_message! {
/// The kind of a [`RunnerMessage`](struct.RunnerMessage.html). /// The kind of a [`RunnerMessage`](struct.RunnerMessage.html).
RunnerMessageKind; RunnerMessageKind;
/// A reply to a [`Handshake`](struct.Handshake.html) from FxRecorder. /// The status of the DownloadBuild phase.
HandshakeReply { DownloadBuild {
result: Result<(), ErrorMessage<String>>, result: ForeignResult<DownloadStatus>,
}; };
/// A reply to a [`DownloadBuild`](struct.DownloadBuild.html) message from /// The status of the RecvProfile phase.
/// FxRecorder. RecvProfile {
DownloadBuildReply { result: ForeignResult<DownloadStatus>,
result: Result<DownloadStatus, ErrorMessage<String>>,
}; };
/// A reply to a [`SendProfile`](struct.SendProfile.html) message from /// The status of the WritePrefs phase.
/// FxRecorder. WritePrefs {
SendProfileReply { result: ForeignResult<()>,
result: Result<Option<DownloadStatus>, ErrorMessage<String>>,
}; };
/// A reply to a [`SendPrefs`](struct.SendPrefs.html) message from /// The status of the Restarting phase.
/// FxRecorder. Restarting {
SendPrefsReply { result: ForeignResult<()>,
result: Result<(), ErrorMessage<String>>,
}; };
/// A reply to a [`WaitForIdle`](struct.WaitForIdle.html) message from /// The status of the ResumeResponse phase.
/// FxRecorder. ResumeResponse {
WaitForIdleReply { result: ForeignResult<()>,
result: Result<(), ErrorMessage<String>>, };
/// The status of the WaitForIdle phase.
WaitForIdle {
result: ForeignResult<()>,
}; };
} }