diff --git a/Cargo.lock b/Cargo.lock index bdff4ce..3f211b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -486,6 +486,7 @@ dependencies = [ "serde", "slog", "structopt", + "tempfile", "thiserror", "tokio", "toml", diff --git a/fxrecord.example.toml b/fxrecord.example.toml index 56034a6..cac2580 100644 --- a/fxrecord.example.toml +++ b/fxrecord.example.toml @@ -3,12 +3,14 @@ host = "127.0.0.1:8888" [fxrecorder.recording] ffmpeg_path = "C:\\ffmpeg\\ffmpeg.exe" -video_size = { y = 1920, x = 1080 } -output_size = { y = 1366, x = 768 } +video_size = { x = 1920, y = 1080 } +output_size = { x = 1366, y = 768 } frame_rate = 60 device = "AVerMedia GC551 Video Capture" buffer_size = "1000M" +minimum_recording_time_secs = 90 [fxrunner] host = "0.0.0.0:8888" session_dir = "C:\\fxrunner\\sessions" +display_size = { x = 1366, y = 768 } diff --git a/fxrecorder/Cargo.toml b/fxrecorder/Cargo.toml index 45b647b..b6d8330 100644 --- a/fxrecorder/Cargo.toml +++ b/fxrecorder/Cargo.toml @@ -19,6 +19,7 @@ libfxrecord = { path = "../libfxrecord" } serde = { version = "1.0.110", features = ["derive"] } slog = "2.5.2" structopt = "0.3.14" +tempfile = "3.1.0" thiserror = "1.0.20" tokio = { version = "0.2.21", features = ["process", "tcp", "rt-threaded", "time"] } toml = "0.5.6" diff --git a/fxrecorder/src/bin/main.rs b/fxrecorder/src/bin/main.rs index 74685fb..36b180f 100644 --- a/fxrecorder/src/bin/main.rs +++ b/fxrecorder/src/bin/main.rs @@ -12,6 +12,7 @@ use libfxrecord::prefs::{parse_pref, PrefValue}; use libfxrecord::{run, CommonOptions}; use libfxrecorder::config::Config; use libfxrecorder::proto::RecorderProto; +use libfxrecorder::recorder::FfmpegRecorder; use libfxrecorder::retry::delayed_exponential_retry; use slog::{error, info, Logger}; use structopt::StructOpt; @@ -75,7 +76,13 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(), let stream = TcpStream::connect(&config.host).await?; info!(log, "Connected"; "peer" => config.host); - let mut proto = RecorderProto::new(log.clone(), stream); + // TODO: Ideally we would split new_session and resume_session into + // static methods so that we do not need to specify the recorder here. + let mut proto = RecorderProto::new( + log.clone(), + stream, + FfmpegRecorder::new(log.clone(), &config.recording), + ); proto .new_session(&task_id, profile_path.as_deref(), prefs) @@ -104,7 +111,11 @@ async fn fxrecorder(log: Logger, options: Options, config: Config) -> Result<(), info!(log, "Re-connected"; "peer" => config.host); - let mut proto = RecorderProto::new(log, stream); + let mut proto = RecorderProto::new( + log.clone(), + stream, + FfmpegRecorder::new(log.clone(), &config.recording), + ); let idle = if options.skip_idle { Idle::Skip diff --git a/fxrecorder/src/lib/config.rs b/fxrecorder/src/lib/config.rs index d10b3e8..a283f42 100644 --- a/fxrecorder/src/lib/config.rs +++ b/fxrecorder/src/lib/config.rs @@ -56,6 +56,9 @@ pub struct RecordingConfig { /// /// This corresponds to the `-rtbufsize` argument to `ffmpeg`. pub buffer_size: String, + + /// The minimum recording time. `ffmpeg` will record for at least this long. + pub minimum_recording_time_secs: u8, } /// The size of a video. diff --git a/fxrecorder/src/lib/lib.rs b/fxrecorder/src/lib/lib.rs index 35c2594..830d026 100644 --- a/fxrecorder/src/lib/lib.rs +++ b/fxrecorder/src/lib/lib.rs @@ -4,4 +4,5 @@ pub mod config; pub mod proto; +pub mod recorder; pub mod retry; diff --git a/fxrecorder/src/lib/proto.rs b/fxrecorder/src/lib/proto.rs index da8a312..4e07a5b 100644 --- a/fxrecorder/src/lib/proto.rs +++ b/fxrecorder/src/lib/proto.rs @@ -2,29 +2,39 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use std::error::Error; +use std::fmt::Debug; use std::io; use std::path::Path; use libfxrecord::error::ErrorMessage; use libfxrecord::net::*; use libfxrecord::prefs::PrefValue; -use slog::{error, info, Logger}; +use slog::{error, info, warn, Logger}; +use tempfile::TempDir; use thiserror::Error; use tokio::fs::File; use tokio::net::TcpStream; +use crate::recorder::Recorder; + /// The recorder side of the protocol. -pub struct RecorderProto { +pub struct RecorderProto { inner: Option>, log: Logger, + recorder: R, } -impl RecorderProto { +impl RecorderProto +where + R: Recorder, +{ /// Create a new RecorderProto. - pub fn new(log: Logger, stream: TcpStream) -> RecorderProto { + pub fn new(log: Logger, stream: TcpStream, recorder: R) -> Self { Self { inner: Some(Proto::new(stream)), log, + recorder, } } @@ -34,7 +44,7 @@ impl RecorderProto { task_id: &str, profile_path: Option<&Path>, prefs: Vec<(String, PrefValue)>, - ) -> Result { + ) -> Result> { info!(self.log, "Requesting new session"); let profile_size = match profile_path { @@ -120,7 +130,9 @@ impl RecorderProto { &mut self, session_id: &str, idle: Idle, - ) -> Result<(), RecorderProtoError> { + ) -> Result<(), RecorderProtoError> { + let tmpdir = TempDir::new().expect("could not create temporary directory"); + info!(self.log, "Resuming session"); self.send::( ResumeSessionRequest { @@ -152,6 +164,61 @@ impl RecorderProto { info!(self.log, "Runner became idle"); } + info!(self.log, "Beginning recording..."); + let handle = self + .recorder + .start_recording(tmpdir.path()) + .await + .map_err(RecorderProtoError::Recording)?; + + info!(self.log, "requesting Firefox start..."); + self.send(StartFirefox).await?; + if let Err(e) = self.recv::().await?.result { + error!(self.log, "recorder could not launch firefox"; "error" => %e); + return Err(e.into()); + } + info!(self.log, "runner started Firefox."); + + let recording_path = self + .recorder + .wait_for_recording_finished(handle) + .await + .map_err(RecorderProtoError::Recording)?; + + info!(self.log, "requesting runner stop Firefox..."); + self.send(StopFirefox).await?; + + if let Err(errors) = self.recv::().await?.result { + if errors.len() > 1 { + for error in &errors { + warn!( + self.log, + "recorder could not stop firefox (multiple errors)"; + "error" => %error + ); + } + } else { + assert!(!errors.is_empty()); + warn!( + self.log, + "recorder could not stop Firefox"; + "error" => %errors[0] + ); + } + } + + info!(self.log, "runner stopped Firefox"); + + if let Err(e) = self.recv::().await?.result { + warn!(self.log, "runner did not clean up successfully"; "error" => ?e); + } + + info!( + self.log, + "recorded firefox"; + "path" => %recording_path.display(), + ); + Ok(()) } @@ -160,7 +227,7 @@ impl RecorderProto { &mut self, profile_path: &Path, profile_size: u64, - ) -> Result<(), RecorderProtoError> { + ) -> Result<(), RecorderProtoError> { let RecvProfile { result } = self.recv().await?; match result? { @@ -177,7 +244,7 @@ impl RecorderProto { } let mut stream = self.inner.take().unwrap().into_inner(); - let result = RecorderProto::send_profile_impl(&mut stream, profile_path).await; + let result = Self::send_profile_impl(&mut stream, profile_path).await; self.inner = Some(Proto::new(stream)); result?; @@ -222,7 +289,7 @@ impl RecorderProto { async fn send_profile_impl( stream: &mut TcpStream, profile_path: &Path, - ) -> Result<(), RecorderProtoError> { + ) -> Result<(), RecorderProtoError> { let mut f = File::open(profile_path).await?; tokio::io::copy(&mut f, stream) @@ -230,7 +297,6 @@ impl RecorderProto { .map_err(Into::into) .map(drop) } - /// Send the given message to the recorder. /// /// If the underlying proto is None, this will panic. @@ -252,8 +318,14 @@ impl RecorderProto { } } +/// An error in the RecordingProto. +/// +/// For a `RecordingProto`, `RecordingError` is `::Error`. #[derive(Debug, Error)] -pub enum RecorderProtoError { +pub enum RecorderProtoError +where + RecordingError: Error + 'static, +{ #[error(transparent)] Proto(#[from] ProtoError), @@ -266,15 +338,24 @@ pub enum RecorderProtoError { expected: DownloadStatus, received: DownloadStatus, }, + + #[error(transparent)] + Recording(RecordingError), } -impl From> for RecorderProtoError { +impl From> for RecorderProtoError +where + RecordingError: Error + 'static, +{ fn from(e: ErrorMessage) -> Self { RecorderProtoError::Proto(ProtoError::from(e)) } } -impl From for RecorderProtoError { +impl From for RecorderProtoError +where + RecordingError: Error + 'static, +{ fn from(e: io::Error) -> Self { RecorderProtoError::Proto(ProtoError::from(e)) } diff --git a/fxrunner/src/bin/main.rs b/fxrunner/src/bin/main.rs index 8e0ec6f..3770fc3 100644 --- a/fxrunner/src/bin/main.rs +++ b/fxrunner/src/bin/main.rs @@ -12,6 +12,7 @@ use libfxrunner::config::Config; use libfxrunner::osapi::{WindowsPerfProvider, WindowsShutdownProvider}; use libfxrunner::proto::RunnerProto; use libfxrunner::session::DefaultSessionManager; +use libfxrunner::splash::WindowsSplash; use libfxrunner::taskcluster::FirefoxCi; use slog::{error, info, warn, Logger}; use structopt::StructOpt; @@ -83,8 +84,9 @@ async fn fxrunner(log: Logger, options: Options, config: Config) -> Result<(), B let (stream, addr) = listener.accept().await?; info!(log, "Received connection"; "peer" => addr); - let result = RunnerProto::handle_request( + let result = RunnerProto::<_, _, _, _, WindowsSplash>::handle_request( log.clone(), + config.display_size, stream, shutdown_provider(&options), FirefoxCi::default(), diff --git a/fxrunner/src/lib/config.rs b/fxrunner/src/lib/config.rs index b929eb5..25118fa 100644 --- a/fxrunner/src/lib/config.rs +++ b/fxrunner/src/lib/config.rs @@ -15,4 +15,16 @@ pub struct Config { /// The directory to store session state in. pub session_dir: PathBuf, + + /// The size of the display. + pub display_size: Size, +} + +/// The size of a video. +#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq)] +pub struct Size { + /// The size in the y dimension. + pub y: u16, + /// The size in the x dimension. + pub x: u16, } diff --git a/fxrunner/src/lib/proto.rs b/fxrunner/src/lib/proto.rs index 98f4f73..90a94be 100644 --- a/fxrunner/src/lib/proto.rs +++ b/fxrunner/src/lib/proto.rs @@ -3,7 +3,9 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use std::io; +use std::marker::PhantomData; use std::path::{Path, PathBuf}; +use std::process::Stdio; use indoc::indoc; use libfxrecord::error::ErrorExt; @@ -15,36 +17,45 @@ use thiserror::Error; use tokio::fs::{create_dir, rename, File, OpenOptions}; use tokio::net::TcpStream; use tokio::prelude::*; +use tokio::process::Command; use tokio::task::spawn_blocking; +use crate::config::Size; use crate::fs::PathExt; +use crate::osapi::process::{child_processes, open_process, terminate_process}; use crate::osapi::{cpu_and_disk_idle, PerfProvider, ShutdownProvider, WaitForIdleError}; use crate::session::{ cleanup_session, NewSessionError, ResumeSessionError, SessionInfo, SessionManager, }; +use crate::splash::Splash; use crate::taskcluster::Taskcluster; use crate::zip::{unzip, ZipError}; /// The runner side of the protocol. -pub struct RunnerProto { +pub struct RunnerProto { inner: Option>, log: Logger, + display_size: Size, shutdown_handler: S, tc: T, perf_provider: P, session_manager: R, + + _marker: PhantomData, } -impl RunnerProto +impl RunnerProto where S: ShutdownProvider, T: Taskcluster, P: PerfProvider + 'static, R: SessionManager, + Sp: Splash, { /// Handle a request from the recorder. pub async fn handle_request( log: Logger, + display_size: Size, stream: TcpStream, shutdown_handler: S, tc: T, @@ -53,11 +64,13 @@ where ) -> Result> { let mut proto = Self { inner: Some(Proto::new(stream)), + display_size, log, shutdown_handler, tc, perf_provider, session_manager, + _marker: PhantomData, }; match proto.recv::().await? { @@ -231,6 +244,27 @@ where self.send(WaitForIdle { result: Ok(()) }).await?; } + self.recv::().await?; + + let mut splash = Sp::new(self.display_size.x as u32, self.display_size.y as u32).await?; + let run_firefox_result = self + .run_firefox(&session_info.firefox_path(), &session_info.profile_path()) + .await; + + if let Err(e) = splash.destroy() { + error!(self.log, "Could not destroy splash"; "error" => %e); + + self.send(SessionFinished { + result: Err(e.into_error_message()), + }) + .await?; + } + + if let Err(e) = run_firefox_result { + return Err(e); + } + + self.send(SessionFinished { result: Ok(()) }).await?; Ok(()) } @@ -445,6 +479,95 @@ where Ok(zip_path) } + /// Run the given Firefox binary with the specified profile. + /// + /// The process will be terminated after 45 seconds. + async fn run_firefox( + &mut self, + firefox_bin: &Path, + profile: &Path, + ) -> Result<(), RunnerProtoError> { + info!(self.log, "starting Firefox..."); + let firefox_launcher = match Command::new(firefox_bin) + .arg("--profile") + .arg(profile) + .arg("--new-instance") + .arg("--wait-for-browser") + .stdin(Stdio::piped()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + { + Ok(launcher) => launcher, + Err(e) => { + error!(self.log, "could not start Firefox"; "error" => %e); + self.send(StartedFirefox { + result: Err(e.into_error_message()), + }) + .await?; + return Err(RunnerProtoError::StartFirefox(e)); + } + }; + + self.send(StartedFirefox { result: Ok(()) }).await?; + self.recv::().await?; + + info!(self.log, "stopping Firefox..."); + let mut errors = Vec::new(); + + { + info!(self.log, "opening firefox process..."); + let firefox_launcher_handle = + open_process(firefox_launcher.id(), winapi::um::winnt::PROCESS_ALL_ACCESS)?; + + let mut terminated = false; + + info!(self.log, "iterating child processes..."); + for firefox_main_handle in child_processes( + firefox_launcher_handle, + winapi::um::winnt::PROCESS_TERMINATE, + )? { + let firefox_main_handle = match firefox_main_handle { + Ok(handle) => handle, + Err(e) => { + error!(self.log, "could not retrieve handle to Firefox main process"; "error" => %e); + errors.push(e.into_error_message()); + break; + } + }; + + info!(self.log, "child_process()"; "handle" => ?firefox_main_handle.as_ptr()); + + if let Err(e) = terminate_process(&firefox_main_handle, 1) { + error!(self.log, "could not terminate Firefox main process"; "error" => %e); + errors.push(e.into_error_message()); + continue; + } + + terminated = true; + } + + if let Err(e) = firefox_launcher.await { + error!(self.log, "could not wait for Firefox launcher process to exit"; "error" => %e); + errors.push(e.into_error_message()); + } + + if !errors.is_empty() { + self.send(StoppedFirefox { + result: Err(errors), + }) + .await?; + } else if !terminated { + error!(self.log, "did not find a main Firefox process to terminate"); + } + } + + info!(self.log, "terminated Firefox"); + self.send(StoppedFirefox { result: Ok(()) }).await?; + + Ok(()) + } + /// Send the given message to the runner. /// /// If the underlying proto is None, this will panic. @@ -505,6 +628,9 @@ where #[error(transparent)] EnsureProfile(io::Error), + + #[error("Could not start Firefox: {}", .0)] + StartFirefox(#[source] io::Error), } impl From for RunnerProtoError diff --git a/integration-tests/src/mocks.rs b/integration-tests/src/mocks.rs index 880d4e0..a86080a 100644 --- a/integration-tests/src/mocks.rs +++ b/integration-tests/src/mocks.rs @@ -10,10 +10,12 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use libfxrecord::error::ErrorMessage; +use libfxrecorder::recorder::Recorder; use libfxrunner::osapi::{IoCounters, PerfProvider, ShutdownProvider}; use libfxrunner::session::{ NewSessionError, ResumeSessionError, ResumeSessionErrorKind, SessionInfo, SessionManager, }; +use libfxrunner::splash::Splash; use libfxrunner::taskcluster::Taskcluster; use tempfile::TempDir; use tokio::fs; @@ -333,3 +335,36 @@ fn clone_new_session_err(err: &NewSessionError) -> NewSessionError { } } } + +pub struct TestSplash; + +#[async_trait] +impl Splash for TestSplash { + async fn new(_display_width: u32, _display_height: u32) -> Result { + Ok(TestSplash) + } + + fn destroy(&mut self) -> Result<(), io::Error> { + Ok(()) + } +} + +pub struct TestRecorder; +pub struct TestRecorderHandle(PathBuf); + +#[async_trait] +impl Recorder for TestRecorder { + type Error = io::Error; + type Handle = TestRecorderHandle; + + async fn start_recording(&self, directory: &Path) -> Result { + Ok(TestRecorderHandle(directory.join("recording.mp4"))) + } + + async fn wait_for_recording_finished( + &self, + handle: Self::Handle, + ) -> Result { + Ok(handle.0) + } +} diff --git a/integration-tests/src/test.rs b/integration-tests/src/test.rs index e982f0b..f64dc2d 100644 --- a/integration-tests/src/test.rs +++ b/integration-tests/src/test.rs @@ -14,6 +14,7 @@ use futures::join; use indoc::indoc; use libfxrecord::net::*; use libfxrecorder::proto::{RecorderProto, RecorderProtoError}; +use libfxrunner::config::Size; use libfxrunner::osapi::WaitForIdleError; use libfxrunner::proto::{RunnerProto, RunnerProtoError}; use libfxrunner::session::{ @@ -34,11 +35,20 @@ fn test_logger() -> Logger { Logger::root(slog::Discard, slog::o! {}) } -type TestRunnerProto = - RunnerProto; +type TestRunnerProto = RunnerProto< + TestShutdownProvider, + TestTaskcluster, + TestPerfProvider, + TestSessionManager, + TestSplash, +>; type TestRunnerProtoError = RunnerProtoError; +type TestRecorderProto = RecorderProto; + +const DISPLAY_SIZE: Size = Size { x: 640, y: 480 }; + struct RunnerInfo { result: Result, session_info: Option>, @@ -51,7 +61,7 @@ async fn run_proto_test<'a, Fut>( tc: TestTaskcluster, perf_provider: TestPerfProvider, session_manager: TestSessionManager, - recorder_fn: impl FnOnce(RecorderProto) -> Fut, + recorder_fn: impl FnOnce(TestRecorderProto) -> Fut, runner_fn: impl FnOnce(RunnerInfo), ) where Fut: Future, @@ -65,6 +75,7 @@ async fn run_proto_test<'a, Fut>( let result = TestRunnerProto::handle_request( test_logger(), + DISPLAY_SIZE, stream, shutdown_provider, tc, @@ -81,7 +92,7 @@ async fn run_proto_test<'a, Fut>( let recorder = async { let stream = TcpStream::connect(&addr).await.unwrap(); - let proto = RecorderProto::new(test_logger(), stream); + let proto = TestRecorderProto::new(test_logger(), stream, TestRecorder); recorder_fn(proto).await; }; diff --git a/libfxrecord/src/net/message.rs b/libfxrecord/src/net/message.rs index 569c4f3..61d2d44 100644 --- a/libfxrecord/src/net/message.rs +++ b/libfxrecord/src/net/message.rs @@ -142,6 +142,16 @@ message_type! { /// request](enum.RecorderSession.html#variant.NewSession). ResumeSession(ResumeSessionRequest), } + + /// Request the runner start Firefox. + /// + /// Sent once the recorder has started ffmpeg. + pub struct StartFirefox; + + /// Request the runner to stop Firefox. + /// + /// Send once the recorder has finished recording. + pub struct StopFirefox; } message_type! { @@ -197,4 +207,19 @@ message_type! { pub struct WaitForIdle { pub result: ForeignResult<()>, } + + /// The status of the StartFirefox phase. + pub struct StartedFirefox { + pub result: ForeignResult<()>, + } + + /// The status of the StopFirefox phase. + pub struct StoppedFirefox { + pub result: Result<(), Vec>>, + } + + /// The status of any cleanup or teardown before the session finishes. + pub struct SessionFinished { + pub result: ForeignResult<()>, + } }