Handshake between the runner and recorder

The runner and recorder are now networked. The runner sets up a TCP
server and listens for incoming connections. The recorder connects to
the runner.

The bulk of this change is implementing the datatypes for messages and
protocols.

The `Proto` struct represents a protocol, which provides a way to send
and receive typed messages across a TCP socket. The `RunnerProto` and
`RecorderProto` are specializations of this type that implement that
actual business logic of sending and receiving messages in a certain
order for `fxrecorder` and `fxrunner` respectively.

Messages are the actual data sent across the socket. They are encoded by
the Proto as length-prefixed JSON blobs for transfer. There are two
types of messages:

1. `RecorderMessage`, which is sent from `fxrecorder` to `fxrunner`.
2. `RunnerMessage`, which is sent from `fxrecorder` to `fxrunner`.

Each type of message implements the `Message` interface, which allows
discriminationg between them with the `Message::kind()` method (and
imposes the serialization and deserialization constraints).

Message types are generated by the `impl_message!` and
`impl_message_inner!` macros to avoid writing boilerplate for each
message type.
This commit is contained in:
Barret Rennie 2020-06-10 18:24:57 -04:00
Родитель 7a0ceb3ae2
Коммит be29d2c9f2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4D71D86C09132D72
12 изменённых файлов: 1035 добавлений и 255 удалений

676
Cargo.lock сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -11,4 +11,5 @@ libfxrecord = { path = "../libfxrecord" }
serde = { version = "1.0.110", features = ["derive"] }
slog = "2.5.2"
structopt = "0.3.14"
tokio = { version = "0.2.21", features = ["tcp", "rt-threaded"] }
toml = "0.5.6"

Просмотреть файл

@ -3,12 +3,13 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
mod config;
mod proto;
use std::error::Error;
use std::path::{Path, PathBuf};
use libfxrecord::{run, CommonOptions};
use slog::Logger;
use slog::{info, Logger};
use structopt::StructOpt;
use crate::config::Config;
@ -31,10 +32,16 @@ fn main() {
run::<Options, Config, _, _>(fxrecorder, "fxrecorder");
}
async fn fxrecorder(
_log: Logger,
_options: Options,
_config: Config,
) -> Result<(), Box<dyn Error>> {
async fn fxrecorder(log: Logger, _options: Options, config: Config) -> Result<(), Box<dyn Error>> {
use crate::proto::RecorderProto;
use tokio::net::TcpStream;
let stream = TcpStream::connect(&config.host).await?;
info!(log, "Connected"; "peer" => config.host);
let mut proto = RecorderProto::new(log, stream);
proto.handshake().await?;
Ok(())
}

32
fxrecorder/src/proto.rs Normal file
Просмотреть файл

@ -0,0 +1,32 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use slog::{info, Logger};
use tokio::net::TcpStream;
use libfxrecord::net::*;
/// The recorder side of the protocol.
pub struct RecorderProto {
inner: Proto<RunnerMessage, RecorderMessage, RunnerMessageKind, RecorderMessageKind>,
log: Logger,
}
impl RecorderProto {
pub fn new(log: Logger, stream: TcpStream) -> RecorderProto {
Self {
inner: Proto::new(stream),
log,
}
}
/// Handshake with Fxrunner.
pub async fn handshake(&mut self) -> Result<(), ProtoError<RunnerMessageKind>> {
info!(self.log, "Handshaking ...");
self.inner.send(Handshake).await?;
self.inner.recv::<HandshakeReply>().await?;
info!(self.log, "Handshake complete");
Ok(())
}
}

Просмотреть файл

@ -11,4 +11,5 @@ libfxrecord = { path = "../libfxrecord" }
serde = { version = "1.0.110", features = ["derive"] }
slog = "2.5.2"
structopt = "0.3.14"
tokio = { version = "0.2.21", features = ["tcp", "rt-threaded"] }
toml = "0.5.6"

Просмотреть файл

@ -3,12 +3,13 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
mod config;
mod proto;
use std::error::Error;
use std::path::{Path, PathBuf};
use libfxrecord::{run, CommonOptions};
use slog::Logger;
use slog::{info, Logger};
use structopt::StructOpt;
use crate::config::Config;
@ -31,6 +32,17 @@ fn main() {
run::<Options, Config, _, _>(fxrunner, "fxrunner");
}
async fn fxrunner(_log: Logger, _options: Options, _config: Config) -> Result<(), Box<dyn Error>> {
async fn fxrunner(log: Logger, _options: Options, config: Config) -> Result<(), Box<dyn Error>> {
use crate::proto::RunnerProto;
use tokio::net::TcpListener;
let mut listener = TcpListener::bind(&config.host).await?;
let (stream, addr) = listener.accept().await?;
info!(log, "Received connection"; "peer" => addr);
let mut proto = RunnerProto::new(log, stream);
proto.handshake_reply().await?;
Ok(())
}

32
fxrunner/src/proto.rs Normal file
Просмотреть файл

@ -0,0 +1,32 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use slog::{info, Logger};
use tokio::net::TcpStream;
use libfxrecord::net::*;
/// The runner side of the protocol.
pub struct RunnerProto {
inner: Proto<RecorderMessage, RunnerMessage, RecorderMessageKind, RunnerMessageKind>,
log: Logger,
}
impl RunnerProto {
pub fn new(log: Logger, stream: TcpStream) -> RunnerProto {
Self {
inner: Proto::new(stream),
log,
}
}
/// Handshake with FxRecorder.
pub async fn handshake_reply(&mut self) -> Result<(), ProtoError<RecorderMessageKind>> {
info!(self.log, "Handshaking ...");
self.inner.recv::<Handshake>().await?;
self.inner.send(HandshakeReply).await?;
info!(self.log, "Handshake complete");
Ok(())
}
}

Просмотреть файл

@ -14,4 +14,6 @@ slog-async = "2.5.0"
slog-term = "2.5.0"
structopt = "0.3.14"
toml = "0.5.6"
tokio = "0.2.21"
tokio = { version = "0.2.21", features = ["tcp"] }
tokio-util = { version = "0.3.1", features = ["codec"] }
tokio-serde = { version = "0.6.1", features = ["json"] }

Просмотреть файл

@ -3,8 +3,8 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use std::error::Error;
use std::future::Future;
use std::fmt::Debug;
use std::future::Future;
use std::path::Path;
use std::process::exit;
@ -20,6 +20,7 @@ use crate::logging::build_logger;
pub mod config;
pub mod error;
pub mod logging;
pub mod net;
/// A trait for exposing options common to both fxrunner and fxrecorder.
pub trait CommonOptions: StructOpt + Debug {

9
libfxrecord/src/net.rs Normal file
Просмотреть файл

@ -0,0 +1,9 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
pub mod message;
pub mod proto;
pub use message::*;
pub use proto::*;

Просмотреть файл

@ -0,0 +1,348 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//! Message types used throughout `fxrunner` and `fxrecorder`.
//!
//! This module consists of various helper traits and structures, as well as
//! the [`impl_message!`][impl_message] macro, which together provide a
//! convenient way for [`Proto`][Proto] instances to send and receive typed
//! messages at a high level.
//!
//! Each invocation of [`impl_message!`][impl_message] will generate several types:
//!
//! 1. The message type. This is a wrapper enum type that contains all message
//! variants. It is the type that is serialized/deserialized by the
//! [`Proto`][Proto]. It will also implement the [`Message`][Message] so that
//! its variants can be differentiated by the message kind type.
//!
//! For example, for this invocation of [`impl_message!`][impl_message]
//!
//! ```ignore
//! impl_message! {
//! Msg,
//! MsgKind;
//! FooMsg;
//! BarMsg {
//! field: u32,
//! };
//! }
//! ```
//!
//! the following message type would be generated:
//!
//! ```ignore
//! pub enum Msg {
//! FooMsg(FooMsg),
//! BarMsg(BarMsg),
//! }
//!
//! impl Message<'_> for Msg { /* ... */ }
//! ```
//!
//! Here `FooMsg` and `BarMsg` will be generated structures containing actual
//! message data.
//!
//! 2. A message kind type. This is an enum with one variant for each kind of message.
//!
//! For example, for the same invocation of `impl_message!` as before, the
//! following kind type would be generated:
//!
//! ```ignore
//! pub enum MsgKind {
//! FooMsg,
//! BarMsg,
//! }
//! ```
//!
//! 3. Message content types for each message.
//!
//! In the example above, this is the `FooMsg` and `BarMsg` structures. The
//! following would be generated for that invocation:
//!
//! ```ignore
//! pub struct FooMsg;
//!
//! pub struct BarMsg {
//! pub enum field: u32,
//! }
//! ```
//!
//! as well as implementations for [`MessageContent`][MessageContent] and
//! conversion traits.
//!
//! These are the concrete message types that
//! [`Proto::recv_kind`][Proto::recv_kind] will receive.
//!
//! [Proto]: ../proto/struct.Proto.html
//! [Proto::recv_kind]: ../proto/struct.Proto.html#fn.recv_kind
//! [Message]: trait.Message.html
//! [MessageContent]: trait.MessageContent.html
//! [impl_message]: ../../macro.impl_message.html
use std::convert::TryFrom;
use std::error::Error;
use std::fmt::{Debug, Display};
use derive_more::Display;
use serde::{Deserialize, Serialize};
/// A message is a serializable and deserializable type.
pub trait Message<'de>: Serialize + Deserialize<'de> + Unpin {
/// Each message has a kind that uniquely identifies it.
type Kind: Debug + Display;
/// Return the kind of the message.
fn kind(&self) -> Self::Kind;
}
/// A trait that links message contents to their message wrapper enums.
pub trait MessageContent<'de, M, K>:
Serialize + Deserialize<'de> + Unpin + Into<M> + TryFrom<M, Error = KindMismatch<K>>
where
M: Message<'de, Kind = K>,
K: Debug + Display,
{
/// Return the kind of the message.
fn kind() -> K;
}
/// An error that occurs when attempting to extract a message variant..
#[derive(Debug, Display)]
#[display(
fmt = "could not convert message of kind `{}' to kind `{}'",
expected,
actual
)]
pub struct KindMismatch<K: Debug + Display> {
pub expected: K,
pub actual: K,
}
impl<K: Debug + Display> Error for KindMismatch<K> {}
/// Generate an inner message type.
///
/// The generated type will either be a unit struct or a non-empty struct with
/// named fields.
#[macro_export] // Only exported for doctests.
macro_rules! impl_message_inner {
// Generate a unit struct.
(
$(#[doc = $doc:expr])*
$name:ident
) => {
$(#[doc = $doc])*
#[derive(Debug, Deserialize, Serialize)]
pub struct $name;
};
// Generate a struct with named fields.
(
$(#[doc = $doc:expr])*
$name:ident {
$(
$(#[doc = $field_doc:expr])?
$field:ident : $field_ty:ty,
)*
}) => {
$(#[doc = $doc])*
#[derive(Debug, Deserialize, Serialize)]
pub struct $name {
$(
$(#[doc = $field_doc])?
pub $field: $field_ty,
)*
}
};
}
/// Generate messages and their implementations.
///
/// The first argument is the name of the message type. This will generate a
/// wrapper enum with this name that contains tuple variants for each named message.
///
/// The second argument is the name of the message kind type. This will generate
/// an enum with unit variants for each named message.
///
/// The rest of the arguments are the message variants, which will generate both
/// an enum variant in the message type and a standalone struct.
///
/// Doc-comments applied to items are persisted.
///
/// This will take a macro of the form:
///
/// ```
/// # #[macro_use] extern crate libfxrecord;
/// # use std::convert::{TryFrom, TryInto};
/// # use derive_more::Display;
/// # use serde::{Serialize, Deserialize};
/// # use libfxrecord::net::message::*;
/// impl_message! {
/// Msg,
/// MsgKind;
/// FooMsg;
/// BarMsg {
/// field: u32,
/// };
/// }
///
/// # let _m: FooMsg = Msg::FooMsg(FooMsg).try_into().unwrap();
/// # assert_eq!(FooMsg::kind(), MsgKind::FooMsg);
/// # assert_eq!(Msg::FooMsg(FooMsg).kind(), MsgKind::FooMsg);
/// #
/// # let _m: BarMsg = Msg::BarMsg(BarMsg { field: 1 }).try_into().unwrap();
/// # assert_eq!(BarMsg::kind(), MsgKind::BarMsg);
/// # assert_eq!(Msg::BarMsg(BarMsg { field: 1 }).kind(), MsgKind::BarMsg);
/// ```
///
/// and generate:
///
/// ```ignore
/// pub enum MsgKind {
/// FooMsg,
/// BarMsg,
/// }
///
/// pub enum Msg {
/// FooMsg(FooMsg),
/// BarMsg(BarMsg),
/// }
///
/// pub struct FooMsg;
///
/// pub struct BarMsg {
/// pub field: u32,
/// }
///
/// impl Message for Msg {
/// type Kind = MsgKind;
///
/// fn kind(&self) -> MsgKind {
/// match self {
/// Msg::FooMsg(..) => MsgKind::FooMsg,
/// Msg::BarMsg(..) => MsgKind::BarMsg,
/// }
/// }
/// }
///
/// impl Into<Msg> for FooMsg { /* ... */ }
/// impl TryFrom<Msg> for FooMsg { /* ... */ }
/// impl MessageContent<'_, Msg, MsgKind> for FooMsg { /* ... */ }
///
/// impl TryFrom<Msg> for BarMsg { /*... */ }
/// impl Into<Msg> for BarMsg { /* ... */ }
/// impl MessageContent<'_, Msg, MsgKind> for BarMsg { /* ... */ }
/// ```
#[macro_export] // Only exported for doctests.
macro_rules! impl_message {
(
$(#[doc = $msg_doc:expr])*
$msg_ty:ident,
$(#[doc = $kind_doc:expr])*
$kind_ty:ident;
$(
$(#[doc = $inner_ty_doc:expr])*
$inner_ty:ident $({
$(
$(#[doc = $field_doc:expr])*
$field:ident: $field_ty:ty,
)*
})?;
)*
) => {
$(#[doc = $kind_doc])*
#[derive(Clone, Copy, Debug, Display, Eq, PartialEq)]
pub enum $kind_ty {
$(
$(#[doc = $inner_ty_doc])*
$inner_ty,
)*
}
$(#[doc = $msg_doc])*
#[derive(Debug, Deserialize, Serialize)]
pub enum $msg_ty {
$(
$(#[doc = $inner_ty_doc])*
$inner_ty($inner_ty),
)*
}
impl Message<'_> for $msg_ty {
type Kind = $kind_ty;
fn kind(&self) -> Self::Kind {
match self {
$(
$msg_ty::$inner_ty(..) => $kind_ty::$inner_ty,
)*
}
}
}
$(
impl_message_inner! {
$(#[doc = $inner_ty_doc])*
$inner_ty $({
$(
$(#[doc = $field_doc])*
$field: $field_ty,
)*
})?
}
impl From<$inner_ty> for $msg_ty {
fn from(m: $inner_ty) -> Self {
$msg_ty::$inner_ty(m)
}
}
impl TryFrom<$msg_ty> for $inner_ty {
type Error = KindMismatch<$kind_ty>;
fn try_from(msg: $msg_ty) -> Result<Self, Self::Error> {
if let $msg_ty::$inner_ty(msg) = msg {
Ok(msg)
} else {
Err(KindMismatch {
expected: $kind_ty::$inner_ty,
actual: msg.kind(),
})
}
}
}
impl MessageContent<'_, $msg_ty, $kind_ty> for $inner_ty {
fn kind() -> $kind_ty {
$kind_ty::$inner_ty
}
}
)*
};
}
impl_message! {
/// A message from FxRecorder to FxRunner.
RecorderMessage,
/// The kind of a [`RecorderMessage`](struct.RecorderMessage.html).
RecorderMessageKind;
/// A handshake from FxRecorder to FxRunner.
Handshake;
}
impl_message! {
/// A message from FxRunner to FxRecorder.
RunnerMessage,
/// The kind of a [`RunnerMessage`](struct.RunnerMessage.html).
RunnerMessageKind;
/// A reply to a [`Handshake`](struct.Handshake.html) from FxRecorder.
HandshakeReply;
}

Просмотреть файл

@ -0,0 +1,149 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
use std::error::Error;
use std::fmt::{Debug, Display};
use std::io;
use derive_more::Display;
use futures::prelude::*;
use tokio::net::TcpStream;
use tokio_serde::formats::Json;
use tokio_util::codec::LengthDelimitedCodec;
use crate::error::ErrorMessage;
pub use crate::net::message::{Message, MessageContent, RecorderMessage, RunnerMessage};
/// A protocol for receiving messages of type `R` and sending messages of type
/// `S` over a `TcpStream`.
///
/// Messages are JSON-encoded and prefixed with their length before transmission.
///
/// Here `RK` and `SK` are the kinds of the message types `R` and `S`
/// respectively, as per the [`Message`](trait.Message.html#associatedtype.Kind) trait.
pub struct Proto<R, S, RK, SK>
where
for<'de> R: Message<'de, Kind = RK>,
for<'de> S: Message<'de, Kind = SK>,
RK: Debug + Display + Eq + PartialEq,
SK: Debug + Display + Eq + PartialEq,
{
stream: tokio_serde::Framed<
tokio_util::codec::Framed<TcpStream, LengthDelimitedCodec>,
R,
S,
Json<R, S>,
>,
// We need to include `RK` and `SK ` in the type signature for this struct
// to get around limitations with HKT.
_marker: std::marker::PhantomData<(RK, SK)>,
}
impl<R, S, RK, SK> Proto<R, S, RK, SK>
where
for<'de> R: Message<'de, Kind = RK>,
for<'de> S: Message<'de, Kind = SK>,
RK: Debug + Display + Eq + PartialEq,
SK: Debug + Display + Eq + PartialEq,
{
/// Wrap the stream for communicating via messages.
pub fn new(stream: TcpStream) -> Self {
Self {
stream: tokio_serde::Framed::new(
tokio_util::codec::Framed::new(stream, LengthDelimitedCodec::new()),
Json::default(),
),
_marker: std::marker::PhantomData,
}
}
/// Send a message.
pub async fn send<M>(&mut self, msg: M) -> Result<(), ProtoError<RK>>
where
for<'de> M: MessageContent<'de, S, SK>,
{
self.stream.send(msg.into()).await.map_err(Into::into)
}
/// Receive a specific message kind.
///
/// Any message returned that is not of the specified kind will cause an error.
pub async fn recv<M>(&mut self) -> Result<M, ProtoError<RK>>
where
for<'de> M: MessageContent<'de, R, RK>,
{
let msg = self
.stream
.try_next()
.await?
.ok_or(ProtoError::EndOfStream)?;
let received = msg.kind();
if M::kind() != received {
return Err(ProtoError::Unexpected {
expected: M::kind(),
received,
});
}
// We know that `M::kind() == msg.kind()` and this is true if and only
// if `msg` matches the enum variant for the type `M`.
Ok(M::try_from(msg).expect("M::kind() and msg.kind() are equal"))
}
/// Consume the `Proto`, returning the underlying stream.
pub fn into_inner(self) -> TcpStream {
self.stream.into_inner().into_inner()
}
}
/// An error in the protocol.
#[derive(Debug, Display)]
pub enum ProtoError<K: Debug + Display> {
/// An IO error occurred.
#[display(fmt = "IO error: {}", _0)]
Io(io::Error),
/// An error occurred on the remote side of the protocol.
///
/// Due to the error being serialized across the protocol, the underlying
/// error cannot have a cause.
#[display(fmt = "a remote error occurred: {}", _0)]
Foreign(ErrorMessage<String>),
/// The stream was closed unexpectedly.
#[display(fmt = "unexpected end of stream")]
EndOfStream,
/// An unexpected message type arrived.
#[display(
fmt = "expected message of kind `{}' but received message of kind `{}'",
expected,
received
)]
Unexpected {
/// The type of message that was expected.
expected: K,
/// The type of message that was received.
received: K,
},
}
impl<K: Debug + Display> Error for ProtoError<K> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
ProtoError::Io(ref e) => Some(e),
ProtoError::Foreign(ref e) => Some(e),
ProtoError::EndOfStream => None,
ProtoError::Unexpected { .. } => None,
}
}
}
impl<K: Debug + Display> From<io::Error> for ProtoError<K> {
fn from(e: io::Error) -> Self {
ProtoError::Io(e)
}
}