`fxrecorder` now starts a recording session via its `Recorder` and
requests `fxrunner` to start Firefox. After a delay, Firefox is shut
down and the recording stops.

Recorder and Splash mocks are also added for integration tests, but as
of yet they do nothing.
This commit is contained in:
Barret Rennie 2020-08-24 15:55:26 -04:00
Родитель 3cf57dccc8
Коммит 4f52550351
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4D71D86C09132D72
13 изменённых файлов: 335 добавлений и 24 удалений

1
Cargo.lock сгенерированный
Просмотреть файл

@ -486,6 +486,7 @@ dependencies = [
"serde",
"slog",
"structopt",
"tempfile",
"thiserror",
"tokio",
"toml",

Просмотреть файл

@ -3,12 +3,14 @@ host = "127.0.0.1:8888"
[fxrecorder.recording]
ffmpeg_path = "C:\\ffmpeg\\ffmpeg.exe"
video_size = { y = 1920, x = 1080 }
output_size = { y = 1366, x = 768 }
video_size = { x = 1920, y = 1080 }
output_size = { x = 1366, y = 768 }
frame_rate = 60
device = "AVerMedia GC551 Video Capture"
buffer_size = "1000M"
minimum_recording_time_secs = 90
[fxrunner]
host = "0.0.0.0:8888"
session_dir = "C:\\fxrunner\\sessions"
display_size = { x = 1366, y = 768 }

Просмотреть файл

@ -19,6 +19,7 @@ libfxrecord = { path = "../libfxrecord" }
serde = { version = "1.0.110", features = ["derive"] }
slog = "2.5.2"
structopt = "0.3.14"
tempfile = "3.1.0"
thiserror = "1.0.20"
tokio = { version = "0.2.21", features = ["process", "tcp", "rt-threaded", "time"] }
toml = "0.5.6"

Просмотреть файл

@ -12,6 +12,7 @@ use libfxrecord::prefs::{parse_pref, PrefValue};
use libfxrecord::{run, CommonOptions};
use libfxrecorder::config::Config;
use libfxrecorder::proto::RecorderProto;
use libfxrecorder::recorder::FfmpegRecorder;
use libfxrecorder::retry::delayed_exponential_retry;
use slog::{error, info, Logger};
use structopt::StructOpt;
@ -75,7 +76,13 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(),
let stream = TcpStream::connect(&config.host).await?;
info!(log, "Connected"; "peer" => config.host);
let mut proto = RecorderProto::new(log.clone(), stream);
// TODO: Ideally we would split new_session and resume_session into
// static methods so that we do not need to specify the recorder here.
let mut proto = RecorderProto::new(
log.clone(),
stream,
FfmpegRecorder::new(log.clone(), &config.recording),
);
proto
.new_session(&task_id, profile_path.as_deref(), prefs)
@ -104,7 +111,11 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(),
info!(log, "Re-connected"; "peer" => config.host);
let mut proto = RecorderProto::new(log, stream);
let mut proto = RecorderProto::new(
log.clone(),
stream,
FfmpegRecorder::new(log.clone(), &config.recording),
);
let idle = if options.skip_idle {
Idle::Skip

Просмотреть файл

@ -56,6 +56,9 @@ pub struct RecordingConfig {
///
/// This corresponds to the `-rtbufsize` argument to `ffmpeg`.
pub buffer_size: String,
/// The minimum recording time. `ffmpeg` will record for at least this long.
pub minimum_recording_time_secs: u8,
}
/// The size of a video.

Просмотреть файл

@ -4,4 +4,5 @@
pub mod config;
pub mod proto;
pub mod recorder;
pub mod retry;

Просмотреть файл

@ -2,29 +2,39 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use std::error::Error;
use std::fmt::Debug;
use std::io;
use std::path::Path;
use libfxrecord::error::ErrorMessage;
use libfxrecord::net::*;
use libfxrecord::prefs::PrefValue;
use slog::{error, info, Logger};
use slog::{error, info, warn, Logger};
use tempfile::TempDir;
use thiserror::Error;
use tokio::fs::File;
use tokio::net::TcpStream;
use crate::recorder::Recorder;
/// The recorder side of the protocol.
pub struct RecorderProto {
pub struct RecorderProto<R> {
inner: Option<Proto<RunnerMessage, RecorderMessage, RunnerMessageKind, RecorderMessageKind>>,
log: Logger,
recorder: R,
}
impl RecorderProto {
impl<R> RecorderProto<R>
where
R: Recorder,
{
/// Create a new RecorderProto.
pub fn new(log: Logger, stream: TcpStream) -> RecorderProto {
pub fn new(log: Logger, stream: TcpStream, recorder: R) -> Self {
Self {
inner: Some(Proto::new(stream)),
log,
recorder,
}
}
@ -34,7 +44,7 @@ impl RecorderProto {
task_id: &str,
profile_path: Option<&Path>,
prefs: Vec<(String, PrefValue)>,
) -> Result<String, RecorderProtoError> {
) -> Result<String, RecorderProtoError<R::Error>> {
info!(self.log, "Requesting new session");
let profile_size = match profile_path {
@ -120,7 +130,9 @@ impl RecorderProto {
&mut self,
session_id: &str,
idle: Idle,
) -> Result<(), RecorderProtoError> {
) -> Result<(), RecorderProtoError<R::Error>> {
let tmpdir = TempDir::new().expect("could not create temporary directory");
info!(self.log, "Resuming session");
self.send::<Session>(
ResumeSessionRequest {
@ -152,6 +164,61 @@ impl RecorderProto {
info!(self.log, "Runner became idle");
}
info!(self.log, "Beginning recording...");
let handle = self
.recorder
.start_recording(tmpdir.path())
.await
.map_err(RecorderProtoError::Recording)?;
info!(self.log, "requesting Firefox start...");
self.send(StartFirefox).await?;
if let Err(e) = self.recv::<StartedFirefox>().await?.result {
error!(self.log, "recorder could not launch firefox"; "error" => %e);
return Err(e.into());
}
info!(self.log, "runner started Firefox.");
let recording_path = self
.recorder
.wait_for_recording_finished(handle)
.await
.map_err(RecorderProtoError::Recording)?;
info!(self.log, "requesting runner stop Firefox...");
self.send(StopFirefox).await?;
if let Err(errors) = self.recv::<StoppedFirefox>().await?.result {
if errors.len() > 1 {
for error in &errors {
warn!(
self.log,
"recorder could not stop firefox (multiple errors)";
"error" => %error
);
}
} else {
assert!(!errors.is_empty());
warn!(
self.log,
"recorder could not stop Firefox";
"error" => %errors[0]
);
}
}
info!(self.log, "runner stopped Firefox");
if let Err(e) = self.recv::<SessionFinished>().await?.result {
warn!(self.log, "runner did not clean up successfully"; "error" => ?e);
}
info!(
self.log,
"recorded firefox";
"path" => %recording_path.display(),
);
Ok(())
}
@ -160,7 +227,7 @@ impl RecorderProto {
&mut self,
profile_path: &Path,
profile_size: u64,
) -> Result<(), RecorderProtoError> {
) -> Result<(), RecorderProtoError<R::Error>> {
let RecvProfile { result } = self.recv().await?;
match result? {
@ -177,7 +244,7 @@ impl RecorderProto {
}
let mut stream = self.inner.take().unwrap().into_inner();
let result = RecorderProto::send_profile_impl(&mut stream, profile_path).await;
let result = Self::send_profile_impl(&mut stream, profile_path).await;
self.inner = Some(Proto::new(stream));
result?;
@ -222,7 +289,7 @@ impl RecorderProto {
async fn send_profile_impl(
stream: &mut TcpStream,
profile_path: &Path,
) -> Result<(), RecorderProtoError> {
) -> Result<(), RecorderProtoError<R::Error>> {
let mut f = File::open(profile_path).await?;
tokio::io::copy(&mut f, stream)
@ -230,7 +297,6 @@ impl RecorderProto {
.map_err(Into::into)
.map(drop)
}
/// Send the given message to the recorder.
///
/// If the underlying proto is None, this will panic.
@ -252,8 +318,14 @@ impl RecorderProto {
}
}
/// An error in the RecordingProto.
///
/// For a `RecordingProto<R: Recorder>`, `RecordingError` is `<R as Recorder>::Error`.
#[derive(Debug, Error)]
pub enum RecorderProtoError {
pub enum RecorderProtoError<RecordingError>
where
RecordingError: Error + 'static,
{
#[error(transparent)]
Proto(#[from] ProtoError<RunnerMessageKind>),
@ -266,15 +338,24 @@ pub enum RecorderProtoError {
expected: DownloadStatus,
received: DownloadStatus,
},
#[error(transparent)]
Recording(RecordingError),
}
impl From<ErrorMessage<String>> for RecorderProtoError {
impl<RecordingError> From<ErrorMessage<String>> for RecorderProtoError<RecordingError>
where
RecordingError: Error + 'static,
{
fn from(e: ErrorMessage<String>) -> Self {
RecorderProtoError::Proto(ProtoError::from(e))
}
}
impl From<io::Error> for RecorderProtoError {
impl<RecordingError> From<io::Error> for RecorderProtoError<RecordingError>
where
RecordingError: Error + 'static,
{
fn from(e: io::Error) -> Self {
RecorderProtoError::Proto(ProtoError::from(e))
}

Просмотреть файл

@ -12,6 +12,7 @@ use libfxrunner::config::Config;
use libfxrunner::osapi::{WindowsPerfProvider, WindowsShutdownProvider};
use libfxrunner::proto::RunnerProto;
use libfxrunner::session::DefaultSessionManager;
use libfxrunner::splash::WindowsSplash;
use libfxrunner::taskcluster::FirefoxCi;
use slog::{error, info, warn, Logger};
use structopt::StructOpt;
@ -83,8 +84,9 @@ async fn fxrunner(log: Logger, options: Options, config: Config) -> Result<(), B
let (stream, addr) = listener.accept().await?;
info!(log, "Received connection"; "peer" => addr);
let result = RunnerProto::handle_request(
let result = RunnerProto::<_, _, _, _, WindowsSplash>::handle_request(
log.clone(),
config.display_size,
stream,
shutdown_provider(&options),
FirefoxCi::default(),

Просмотреть файл

@ -15,4 +15,16 @@ pub struct Config {
/// The directory to store session state in.
pub session_dir: PathBuf,
/// The size of the display.
pub display_size: Size,
}
/// The size of a video.
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq)]
pub struct Size {
/// The size in the y dimension.
pub y: u16,
/// The size in the x dimension.
pub x: u16,
}

Просмотреть файл

@ -3,7 +3,9 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use std::io;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use indoc::indoc;
use libfxrecord::error::ErrorExt;
@ -15,36 +17,45 @@ use thiserror::Error;
use tokio::fs::{create_dir, rename, File, OpenOptions};
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::process::Command;
use tokio::task::spawn_blocking;
use crate::config::Size;
use crate::fs::PathExt;
use crate::osapi::process::{child_processes, open_process, terminate_process};
use crate::osapi::{cpu_and_disk_idle, PerfProvider, ShutdownProvider, WaitForIdleError};
use crate::session::{
cleanup_session, NewSessionError, ResumeSessionError, SessionInfo, SessionManager,
};
use crate::splash::Splash;
use crate::taskcluster::Taskcluster;
use crate::zip::{unzip, ZipError};
/// The runner side of the protocol.
pub struct RunnerProto<S, T, P, R> {
pub struct RunnerProto<S, T, P, R, Sp> {
inner: Option<Proto<RecorderMessage, RunnerMessage, RecorderMessageKind, RunnerMessageKind>>,
log: Logger,
display_size: Size,
shutdown_handler: S,
tc: T,
perf_provider: P,
session_manager: R,
_marker: PhantomData<Sp>,
}
impl<S, T, P, R> RunnerProto<S, T, P, R>
impl<S, T, P, R, Sp> RunnerProto<S, T, P, R, Sp>
where
S: ShutdownProvider,
T: Taskcluster,
P: PerfProvider + 'static,
R: SessionManager,
Sp: Splash,
{
/// Handle a request from the recorder.
pub async fn handle_request(
log: Logger,
display_size: Size,
stream: TcpStream,
shutdown_handler: S,
tc: T,
@ -53,11 +64,13 @@ where
) -> Result<bool, RunnerProtoError<S, T, P>> {
let mut proto = Self {
inner: Some(Proto::new(stream)),
display_size,
log,
shutdown_handler,
tc,
perf_provider,
session_manager,
_marker: PhantomData,
};
match proto.recv::<Session>().await? {
@ -231,6 +244,27 @@ where
self.send(WaitForIdle { result: Ok(()) }).await?;
}
self.recv::<StartFirefox>().await?;
let mut splash = Sp::new(self.display_size.x as u32, self.display_size.y as u32).await?;
let run_firefox_result = self
.run_firefox(&session_info.firefox_path(), &session_info.profile_path())
.await;
if let Err(e) = splash.destroy() {
error!(self.log, "Could not destroy splash"; "error" => %e);
self.send(SessionFinished {
result: Err(e.into_error_message()),
})
.await?;
}
if let Err(e) = run_firefox_result {
return Err(e);
}
self.send(SessionFinished { result: Ok(()) }).await?;
Ok(())
}
@ -445,6 +479,95 @@ where
Ok(zip_path)
}
/// Run the given Firefox binary with the specified profile.
///
/// The process will be terminated after 45 seconds.
async fn run_firefox(
&mut self,
firefox_bin: &Path,
profile: &Path,
) -> Result<(), RunnerProtoError<S, T, P>> {
info!(self.log, "starting Firefox...");
let firefox_launcher = match Command::new(firefox_bin)
.arg("--profile")
.arg(profile)
.arg("--new-instance")
.arg("--wait-for-browser")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
{
Ok(launcher) => launcher,
Err(e) => {
error!(self.log, "could not start Firefox"; "error" => %e);
self.send(StartedFirefox {
result: Err(e.into_error_message()),
})
.await?;
return Err(RunnerProtoError::StartFirefox(e));
}
};
self.send(StartedFirefox { result: Ok(()) }).await?;
self.recv::<StopFirefox>().await?;
info!(self.log, "stopping Firefox...");
let mut errors = Vec::new();
{
info!(self.log, "opening firefox process...");
let firefox_launcher_handle =
open_process(firefox_launcher.id(), winapi::um::winnt::PROCESS_ALL_ACCESS)?;
let mut terminated = false;
info!(self.log, "iterating child processes...");
for firefox_main_handle in child_processes(
firefox_launcher_handle,
winapi::um::winnt::PROCESS_TERMINATE,
)? {
let firefox_main_handle = match firefox_main_handle {
Ok(handle) => handle,
Err(e) => {
error!(self.log, "could not retrieve handle to Firefox main process"; "error" => %e);
errors.push(e.into_error_message());
break;
}
};
info!(self.log, "child_process()"; "handle" => ?firefox_main_handle.as_ptr());
if let Err(e) = terminate_process(&firefox_main_handle, 1) {
error!(self.log, "could not terminate Firefox main process"; "error" => %e);
errors.push(e.into_error_message());
continue;
}
terminated = true;
}
if let Err(e) = firefox_launcher.await {
error!(self.log, "could not wait for Firefox launcher process to exit"; "error" => %e);
errors.push(e.into_error_message());
}
if !errors.is_empty() {
self.send(StoppedFirefox {
result: Err(errors),
})
.await?;
} else if !terminated {
error!(self.log, "did not find a main Firefox process to terminate");
}
}
info!(self.log, "terminated Firefox");
self.send(StoppedFirefox { result: Ok(()) }).await?;
Ok(())
}
/// Send the given message to the runner.
///
/// If the underlying proto is None, this will panic.
@ -505,6 +628,9 @@ where
#[error(transparent)]
EnsureProfile(io::Error),
#[error("Could not start Firefox: {}", .0)]
StartFirefox(#[source] io::Error),
}
impl<S, T, P> From<io::Error> for RunnerProtoError<S, T, P>

Просмотреть файл

@ -10,10 +10,12 @@ use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use libfxrecord::error::ErrorMessage;
use libfxrecorder::recorder::Recorder;
use libfxrunner::osapi::{IoCounters, PerfProvider, ShutdownProvider};
use libfxrunner::session::{
NewSessionError, ResumeSessionError, ResumeSessionErrorKind, SessionInfo, SessionManager,
};
use libfxrunner::splash::Splash;
use libfxrunner::taskcluster::Taskcluster;
use tempfile::TempDir;
use tokio::fs;
@ -333,3 +335,36 @@ fn clone_new_session_err(err: &NewSessionError) -> NewSessionError {
}
}
}
pub struct TestSplash;
#[async_trait]
impl Splash for TestSplash {
async fn new(_display_width: u32, _display_height: u32) -> Result<Self, io::Error> {
Ok(TestSplash)
}
fn destroy(&mut self) -> Result<(), io::Error> {
Ok(())
}
}
pub struct TestRecorder;
pub struct TestRecorderHandle(PathBuf);
#[async_trait]
impl Recorder for TestRecorder {
type Error = io::Error;
type Handle = TestRecorderHandle;
async fn start_recording(&self, directory: &Path) -> Result<Self::Handle, Self::Error> {
Ok(TestRecorderHandle(directory.join("recording.mp4")))
}
async fn wait_for_recording_finished(
&self,
handle: Self::Handle,
) -> Result<PathBuf, Self::Error> {
Ok(handle.0)
}
}

Просмотреть файл

@ -14,6 +14,7 @@ use futures::join;
use indoc::indoc;
use libfxrecord::net::*;
use libfxrecorder::proto::{RecorderProto, RecorderProtoError};
use libfxrunner::config::Size;
use libfxrunner::osapi::WaitForIdleError;
use libfxrunner::proto::{RunnerProto, RunnerProtoError};
use libfxrunner::session::{
@ -34,11 +35,20 @@ fn test_logger() -> Logger {
Logger::root(slog::Discard, slog::o! {})
}
type TestRunnerProto =
RunnerProto<TestShutdownProvider, TestTaskcluster, TestPerfProvider, TestSessionManager>;
type TestRunnerProto = RunnerProto<
TestShutdownProvider,
TestTaskcluster,
TestPerfProvider,
TestSessionManager,
TestSplash,
>;
type TestRunnerProtoError =
RunnerProtoError<TestShutdownProvider, TestTaskcluster, TestPerfProvider>;
type TestRecorderProto = RecorderProto<TestRecorder>;
const DISPLAY_SIZE: Size = Size { x: 640, y: 480 };
struct RunnerInfo {
result: Result<bool, TestRunnerProtoError>,
session_info: Option<SessionInfo<'static>>,
@ -51,7 +61,7 @@ async fn run_proto_test<'a, Fut>(
tc: TestTaskcluster,
perf_provider: TestPerfProvider,
session_manager: TestSessionManager,
recorder_fn: impl FnOnce(RecorderProto) -> Fut,
recorder_fn: impl FnOnce(TestRecorderProto) -> Fut,
runner_fn: impl FnOnce(RunnerInfo),
) where
Fut: Future<Output = ()>,
@ -65,6 +75,7 @@ async fn run_proto_test<'a, Fut>(
let result = TestRunnerProto::handle_request(
test_logger(),
DISPLAY_SIZE,
stream,
shutdown_provider,
tc,
@ -81,7 +92,7 @@ async fn run_proto_test<'a, Fut>(
let recorder = async {
let stream = TcpStream::connect(&addr).await.unwrap();
let proto = RecorderProto::new(test_logger(), stream);
let proto = TestRecorderProto::new(test_logger(), stream, TestRecorder);
recorder_fn(proto).await;
};

Просмотреть файл

@ -142,6 +142,16 @@ message_type! {
/// request](enum.RecorderSession.html#variant.NewSession).
ResumeSession(ResumeSessionRequest),
}
/// Request the runner start Firefox.
///
/// Sent once the recorder has started ffmpeg.
pub struct StartFirefox;
/// Request the runner to stop Firefox.
///
/// Send once the recorder has finished recording.
pub struct StopFirefox;
}
message_type! {
@ -197,4 +207,19 @@ message_type! {
pub struct WaitForIdle {
pub result: ForeignResult<()>,
}
/// The status of the StartFirefox phase.
pub struct StartedFirefox {
pub result: ForeignResult<()>,
}
/// The status of the StopFirefox phase.
pub struct StoppedFirefox {
pub result: Result<(), Vec<ErrorMessage<String>>>,
}
/// The status of any cleanup or teardown before the session finishes.
pub struct SessionFinished {
pub result: ForeignResult<()>,
}
}