Request a restart from the runner

fxrecorder will now request fxrunner to restart, wait, and re-connect.
This is accomplished with a exponential-backoff retrying algorithm.
The runner doesn't actually restart yet; instead, it just disconnects
from the recorder and sleeps for a duration.
This commit is contained in:
Barret Rennie 2019-11-18 16:01:16 -05:00
Родитель 482f734a54
Коммит 1f24929893
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4D71D86C09132D72
11 изменённых файлов: 162 добавлений и 25 удалений

9
Cargo.lock сгенерированный
Просмотреть файл

@ -21,6 +21,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
[[package]]
name = "assert_matches"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7deb0a829ca7bcfaf5da70b073a8d128619259a7be8216a355e23f00763059e5"
[[package]]
name = "atty"
version = "0.2.14"
@ -340,8 +346,10 @@ dependencies = [
name = "integration-tests"
version = "0.1.0"
dependencies = [
"assert_matches",
"fxrecorder",
"fxrunner",
"libfxrecord",
"slog",
"tokio",
]
@ -798,6 +806,7 @@ dependencies = [
"mio",
"num_cpus",
"pin-project-lite",
"slab",
"tokio-macros",
]

Просмотреть файл

@ -19,5 +19,5 @@ libfxrecord = { path = "../libfxrecord" }
serde = { version = "1.0.110", features = ["derive"] }
slog = "2.5.2"
structopt = "0.3.14"
tokio = { version = "0.2.21", features = ["tcp", "rt-threaded"] }
tokio = { version = "0.2.21", features = ["tcp", "rt-threaded", "time"] }
toml = "0.5.6"

Просмотреть файл

@ -4,11 +4,15 @@
use std::error::Error;
use std::path::{Path, PathBuf};
use std::time::Duration;
use libfxrecord::{run, CommonOptions};
use libfxrecorder::config::Config;
use slog::{info, Logger};
use libfxrecorder::proto::RecorderProto;
use libfxrecorder::retry::delayed_exponential_retry;
use slog::{error, info, Logger};
use structopt::StructOpt;
use tokio::net::TcpStream;
#[derive(Debug, StructOpt)]
#[structopt(name = "fxrecorder", about = "Start FxRecorder")]
@ -29,15 +33,39 @@ fn main() {
}
async fn fxrecorder(log: Logger, _options: Options, config: Config) -> Result<(), Box<dyn Error>> {
use libfxrecorder::proto::RecorderProto;
use tokio::net::TcpStream;
{
let stream = TcpStream::connect(&config.host).await?;
info!(log, "Connected"; "peer" => config.host);
let stream = TcpStream::connect(&config.host).await?;
info!(log, "Connected"; "peer" => config.host);
let mut proto = RecorderProto::new(log.clone(), stream);
let mut proto = RecorderProto::new(log, stream);
proto.handshake(true).await?;
}
proto.handshake().await?;
{
let reconnect = || {
info!(log, "Attempting re-connection to runner...");
TcpStream::connect(&config.host)
};
// This will attempt to reconnect for 0:30 + 1:00 + 2:00 + 4:00 = 7:30.
let stream = delayed_exponential_retry(reconnect, Duration::from_secs(30), 4)
.await
.map_err(|e| {
error!(
log,
"Could not connect to runner";
"last_error" => ?e.source().unwrap()
);
e
})?;
info!(log, "Re-connected"; "peer" => config.host);
let mut proto = RecorderProto::new(log, stream);
proto.handshake(false).await?;
}
Ok(())
}

Просмотреть файл

@ -4,3 +4,4 @@
pub mod config;
pub mod proto;
pub mod retry;

Просмотреть файл

@ -21,10 +21,17 @@ impl RecorderProto {
}
}
/// Handshake with Fxrunner.
pub async fn handshake(&mut self) -> Result<(), ProtoError<RunnerMessageKind>> {
/// Consume the RecorderProto and return the underlying `Proto`.
pub fn into_inner(
self,
) -> Proto<RunnerMessage, RecorderMessage, RunnerMessageKind, RecorderMessageKind> {
self.inner
}
/// Handshake with FxRunner.
pub async fn handshake(&mut self, restart: bool) -> Result<(), ProtoError<RunnerMessageKind>> {
info!(self.log, "Handshaking ...");
self.inner.send(Handshake).await?;
self.inner.send(Handshake { restart }).await?;
self.inner.recv::<HandshakeReply>().await?;
info!(self.log, "Handshake complete");
Ok(())

Просмотреть файл

@ -0,0 +1,60 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use std::error::Error;
use std::future::Future;
use std::time::Duration;
use derive_more::Display;
use tokio::time::delay_for;
#[derive(Debug, Display)]
#[display(fmt = "failed after {} retries", retries)]
/// An error that occurred when retrying a fallable operation.
pub struct RetryError<E: Error + 'static> {
/// The last error that occurred.
source: E,
/// The number of retries.
retries: u32,
}
impl<E: Error + 'static> Error for RetryError<E> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.source)
}
}
/// Attempt to resolve the future returned by the given function `retries` times
/// using exponential backoff before the first attempt and between subsequent
/// attempts.
pub async fn delayed_exponential_retry<F, Fut, T, E>(
f: F,
wait: Duration,
retries: u32,
) -> Result<T, RetryError<E>>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: Error + 'static,
{
let mut t = wait;
let mut last_error = None;
for _ in 0..retries {
delay_for(t).await;
match f().await {
Ok(r) => return Ok(r),
Err(e) => last_error = Some(e),
}
t *= 2;
}
Err(RetryError {
source: last_error.unwrap(),
retries,
})
}

Просмотреть файл

@ -4,11 +4,15 @@
use std::error::Error;
use std::path::{Path, PathBuf};
use std::time::Duration;
use libfxrecord::{run, CommonOptions};
use libfxrunner::config::Config;
use libfxrunner::proto::RunnerProto;
use slog::{info, Logger};
use structopt::StructOpt;
use tokio::net::TcpListener;
use tokio::time::delay_for;
#[derive(Debug, StructOpt)]
#[structopt(name = "fxrunner", about = "Start FxRunner")]
@ -29,16 +33,25 @@ fn main() {
}
async fn fxrunner(log: Logger, _options: Options, config: Config) -> Result<(), Box<dyn Error>> {
use libfxrunner::proto::RunnerProto;
use tokio::net::TcpListener;
loop {
let mut listener = TcpListener::bind(&config.host).await?;
let mut listener = TcpListener::bind(&config.host).await?;
let (stream, addr) = listener.accept().await?;
loop {
let (stream, addr) = listener.accept().await?;
info!(log, "Received connection"; "peer" => addr);
let mut proto = RunnerProto::new(log.clone(), stream);
info!(log, "Received connection"; "peer" => addr);
let mut proto = RunnerProto::new(log, stream);
let restart = proto.handshake_reply().await?;
proto.handshake_reply().await?;
if restart {
drop(proto);
drop(listener);
Ok(())
delay_for(Duration::from_secs(30)).await;
info!(log, "\"Restarted\"");
listener = TcpListener::bind(&config.host).await?;
}
}
}
}

Просмотреть файл

@ -7,6 +7,8 @@ use tokio::net::TcpStream;
use libfxrecord::net::*;
type Error = ProtoError<RecorderMessageKind>;
/// The runner side of the protocol.
pub struct RunnerProto {
inner: Proto<RecorderMessage, RunnerMessage, RecorderMessageKind, RunnerMessageKind>,
@ -21,12 +23,20 @@ impl RunnerProto {
}
}
/// Consume the RunnerProto and return the underlying `Proto`.
pub fn into_inner(
self,
) -> Proto<RecorderMessage, RunnerMessage, RecorderMessageKind, RunnerMessageKind> {
self.inner
}
/// Handshake with FxRecorder.
pub async fn handshake_reply(&mut self) -> Result<(), ProtoError<RecorderMessageKind>> {
pub async fn handshake_reply(&mut self) -> Result<bool, Error> {
info!(self.log, "Handshaking ...");
self.inner.recv::<Handshake>().await?;
let Handshake { restart } = self.inner.recv().await?;
self.inner.send(HandshakeReply).await?;
info!(self.log, "Handshake complete");
Ok(())
Ok(restart)
}
}

Просмотреть файл

@ -10,6 +10,7 @@ name = "integration-tests"
path = "src/test.rs"
[dev-dependencies]
assert_matches = "1.3.0"
slog = "2.5.2"
tokio = { version = "0.2.21", features = ["dns", "macros", "rt-threaded", "tcp"] }
@ -18,3 +19,6 @@ path = "../fxrecorder"
[dev-dependencies.fxrunner]
path = "../fxrunner"
[dev-dependencies.libfxrecord]
path = "../libfxrecord"

Просмотреть файл

@ -18,15 +18,17 @@ async fn test_handshake() {
tokio::spawn(async move {
let (runner, _) = listener.accept().await.unwrap();
RunnerProto::new(test_logger(), runner)
let should_restart = RunnerProto::new(test_logger(), runner)
.handshake_reply()
.await
.unwrap();
assert!(should_restart);
});
let recorder = TcpStream::connect(&addr).await.unwrap();
RecorderProto::new(test_logger(), recorder)
.handshake()
.handshake(true)
.await
.unwrap();
}

Просмотреть файл

@ -333,7 +333,10 @@ impl_message! {
RecorderMessageKind;
/// A handshake from FxRecorder to FxRunner.
Handshake;
Handshake {
/// Whether or not the runner should restart.
restart: bool,
};
}
impl_message! {