diff --git a/Cargo.lock b/Cargo.lock index d94c1d0..efe5ff6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -558,6 +558,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "serde", "time 0.1.43", "winapi 0.3.9", ] @@ -571,12 +572,44 @@ dependencies = [ "ansi_term", "atty", "bitflags", - "strsim", - "textwrap", + "strsim 0.8.0", + "textwrap 0.11.0", "unicode-width", "vec_map", ] +[[package]] +name = "clap" +version = "3.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd1061998a501ee7d4b6d449020df3266ca3124b941ec56cf2005c3779ca142" +dependencies = [ + "atty", + "bitflags", + "clap_derive", + "indexmap", + "lazy_static", + "os_str_bytes", + "strsim 0.10.0", + "termcolor", + "textwrap 0.12.1", + "unicode-width", + "vec_map", +] + +[[package]] +name = "clap_derive" +version = "3.0.0-beta.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "370f715b81112975b1b69db93e0b56ea4cd4e5002ac43b2da8474106a54096a1" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "const_fn" version = "0.4.7" @@ -1463,6 +1496,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "os_str_bytes" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85" + [[package]] name = "parking_lot" version = "0.11.1" @@ -1743,12 +1782,29 @@ dependencies = [ "gstreamer-base", "log", "openssl", + "rtmp-switcher-controlling", "serde", "serde_json", "structopt", "uuid", ] +[[package]] +name = "rtmp-switcher-controller" +version = "0.1.0" +dependencies = [ + "clap 3.0.0-beta.2", +] + +[[package]] +name = "rtmp-switcher-controlling" +version = "0.1.0" +dependencies = [ + "chrono", + "serde", + "uuid", +] + [[package]] name = "rustc_version" version = "0.2.3" @@ -1943,13 +1999,19 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "structopt" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5277acd7ee46e63e5168a80734c9f6ee81b1367a7d8772a2d765df2a3705d28c" dependencies = [ - "clap", + "clap 2.33.3", "lazy_static", "structopt-derive", ] @@ -2029,6 +2091,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "textwrap" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "203008d98caf094106cfaba70acfed15e18ed3ddb7d94e49baec153a2b462789" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "1.0.24" @@ -2316,6 +2387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ "getrandom 0.2.2", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 582c8e6..cca74ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,7 @@ [workspace] members = [ + "common", "server", + "controller", ] diff --git a/common/Cargo.toml b/common/Cargo.toml new file mode 100644 index 0000000..3c9ba88 --- /dev/null +++ b/common/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "rtmp-switcher-controlling" +version = "0.1.0" +authors = ["Mathieu Duponchelle "] +edition = "2018" +license = "MIT" + +[dependencies] +serde = "1" +uuid = { version = "0.8", features = ["serde"] } +chrono = { version = "0.4", features = ["serde"] } diff --git a/common/src/controller.rs b/common/src/controller.rs new file mode 100644 index 0000000..853fbb5 --- /dev/null +++ b/common/src/controller.rs @@ -0,0 +1,75 @@ +// Copyright (C) 2021 Mathieu Duponchelle +// +// Licensed under the MIT license, see the LICENSE file or + +use chrono::offset::Utc; +use chrono::DateTime; +use serde::{Deserialize, Serialize}; + +/// Messages sent from the controller to the switcher. +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ControllerMessage { + StartChannel { + /// Display name + name: String, + /// RTMP address + destination: String, + }, + StopChannel { + /// Assigned identifier + id: uuid::Uuid, + }, + GetChannelInfo { + /// Assigned identifier + id: uuid::Uuid, + }, + AddSource { + /// What channel the source should be added to + id: uuid::Uuid, + /// URI of the source + uri: String, + /// When the source should be cued, if None the source will be + /// switched to after the last source cued on the channel is over + cue_time: Option>, + }, + ModifySource { + /// The ID of the source to modify + id: uuid::Uuid, + /// When the source should be cued, if None the source will be + /// switched to after the last source cued on the channel is over + cue_time: Option>, + }, + /// Immediately switch to a source + SwitchSource { + /// The ID of the source to switch to. All previous sources + /// on that channel are discarded + id: uuid::Uuid, + }, + /// Remove a source + RemoveSource { + /// The ID of the source to remove + id: uuid::Uuid, + }, + /// List all channel IDs + ListChannels, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub struct ChannelInfo { + pub id: uuid::Uuid, + pub name: String, + pub destination: String, +} + +/// Messages sent from the the server to the controller. +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ServerMessage { + Error { message: String }, + ChannelList { channels: Vec }, + ChannelStarted { id: uuid::Uuid }, + ChannelStopped { id: uuid::Uuid }, + ChannelInfo(ChannelInfo), +} diff --git a/common/src/lib.rs b/common/src/lib.rs new file mode 100644 index 0000000..cb9e0ac --- /dev/null +++ b/common/src/lib.rs @@ -0,0 +1 @@ +pub mod controller; diff --git a/controller/Cargo.toml b/controller/Cargo.toml new file mode 100644 index 0000000..e9ea8b4 --- /dev/null +++ b/controller/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "rtmp-switcher-controller" +version = "0.1.0" +authors = ["Mathieu Duponchelle "] +edition = "2018" +license = "MIT" + +[dependencies] +clap = "3.0.0-beta.2" diff --git a/controller/src/main.rs b/controller/src/main.rs new file mode 100644 index 0000000..b2db783 --- /dev/null +++ b/controller/src/main.rs @@ -0,0 +1,55 @@ +// Copyright (C) 2021 Mathieu Duponchelle +// +// Licensed under the MIT license, see the LICENSE file or + +use clap::{AppSettings, Clap}; +use std::path::PathBuf; + +#[derive(Clap, Debug)] +#[clap(author = "Mathieu Duponchelle ")] +#[clap(setting = AppSettings::ColoredHelp)] +struct Opts { + /// Address of the rtmp switcher, e.g. https://localhost:8080 + server: String, + /// TLS Certificate chain file. + pub certificate_file: Option, + + #[clap(subcommand)] + subcmd: SubCommand, +} + +#[derive(Clap, Debug)] +enum SubCommand { + /// List all currently-running channels + List, + /// Control individual channels + Channel { + #[clap(subcommand)] + subcmd: ChannelSubCommand, + }, +} + +#[derive(Clap, Debug)] +enum ChannelSubCommand { + /// Start a channel + Start { + /// The name of the new channel + name: String, + }, + /// Stop a channel + Stop { + /// The id of an existing channel + id: String, + }, + /// Display information about a channel + Show { + /// The id of an existing channel + id: String, + }, +} + +fn main() { + let opts: Opts = Opts::parse(); + + eprintln!("Opts: {:?}", opts); +} diff --git a/server/Cargo.toml b/server/Cargo.toml index be9016e..d1989ff 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -27,3 +27,5 @@ serde_json = "1" structopt = "0.3" uuid = { version = "0.8", features = ["v4"] } chrono = "0.4" + +rtmp-switcher-controlling = { path = "../common" } diff --git a/server/src/channel.rs b/server/src/channel.rs new file mode 100644 index 0000000..643187a --- /dev/null +++ b/server/src/channel.rs @@ -0,0 +1,47 @@ +use crate::config::Config; +use actix::{Actor, Context, Handler, Message, MessageResult}; +use rtmp_switcher_controlling::controller::ChannelInfo; +use std::sync::Arc; + +/// Actor that represents a channel +#[derive(Debug)] +pub struct Channel { + cfg: Arc, + pub id: uuid::Uuid, + name: String, + destination: String, +} + +impl Channel { + pub fn new(cfg: Arc, name: &str, destination: &str) -> Self { + Self { + cfg, + id: uuid::Uuid::new_v4(), + name: name.to_string(), + destination: destination.to_string(), + } + } +} + +impl Actor for Channel { + type Context = Context; +} + +#[derive(Debug)] +pub struct GetInfoMessage {} + +impl Message for GetInfoMessage { + type Result = ChannelInfo; +} + +impl Handler for Channel { + type Result = MessageResult; + + fn handle(&mut self, msg: GetInfoMessage, ctx: &mut Context) -> Self::Result { + MessageResult(ChannelInfo { + id: self.id, + name: self.name.to_string(), + destination: self.destination.to_string(), + }) + } +} diff --git a/server/src/config.rs b/server/src/config.rs index a4892eb..854054b 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -2,8 +2,8 @@ // // Licensed under the MIT license, see the LICENSE file or -use structopt::StructOpt; use std::path::PathBuf; +use structopt::StructOpt; #[derive(Debug, StructOpt)] #[structopt( @@ -23,5 +23,4 @@ pub struct Config { /// Certificate private key file. #[structopt(short = "k", long)] pub key_file: Option, - } diff --git a/server/src/controller.rs b/server/src/controller.rs new file mode 100644 index 0000000..d45cb3b --- /dev/null +++ b/server/src/controller.rs @@ -0,0 +1,202 @@ +// Copyright (C) 2021 Mathieu Duponchelle +// +// Licensed under the MIT license, see the LICENSE file or + +use crate::channel::Channel; +use crate::{channel::GetInfoMessage, config::Config}; + +use anyhow::{format_err, Error}; + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use actix::{ + Actor, ActorContext, ActorFuture, Addr, AsyncContext, Handler, Message, StreamHandler, + WrapFuture, +}; +use actix_web::dev::ConnectionInfo; +use actix_web_actors::ws; + +use log::{debug, error, trace}; + +use rtmp_switcher_controlling::controller::{ControllerMessage, ServerMessage}; + +/// Actor that represents an application controller. +#[derive(Debug)] +pub struct Controller { + cfg: Arc, + channels: Arc>>>, + remote_addr: String, +} + +impl Controller { + /// Create a new `Controller` actor. + pub fn new( + cfg: Arc, + channels: Arc>>>, + connection_info: &ConnectionInfo, + ) -> Result { + debug!("Creating new controller {:?}", connection_info); + + let remote_addr = connection_info + .realip_remote_addr() + .ok_or_else(|| format_err!("WebSocket connection without remote address"))?; + + Ok(Controller { + cfg, + channels, + remote_addr: String::from(remote_addr), + }) + } + + fn get_channel_info_future( + &self, + channel: Addr, + ) -> impl ActorFuture { + async move { channel.send(crate::channel::GetInfoMessage {}).await } + .into_actor(self) + .then(move |res, _, ctx| { + ctx.text( + serde_json::to_string(&res.unwrap()) + .expect("failed to serialize ChannelList message"), + ); + actix::fut::ready(()) + }) + } + + /// Handle JSON messages from the controller. + fn handle_message(&mut self, ctx: &mut ws::WebsocketContext, text: &str) { + trace!("Handling message: {}", text); + match serde_json::from_str::(text) { + Ok(ControllerMessage::StartChannel { name, destination }) => { + let channel = Channel::new(self.cfg.clone(), &name, &destination); + let id = channel.id; + + self.channels.lock().unwrap().insert(id, channel.start()); + + ctx.text( + serde_json::to_string(&ServerMessage::ChannelStarted { id }) + .expect("failed to serialize ChannelStarted message"), + ); + } + Ok(ControllerMessage::StopChannel { id }) => { + self.channels.lock().unwrap().remove(&id); + + ctx.text( + serde_json::to_string(&ServerMessage::ChannelStopped { id }) + .expect("failed to serialize ChannelStopped message"), + ); + } + Ok(ControllerMessage::ListChannels) => { + ctx.text( + serde_json::to_string(&ServerMessage::ChannelList { + channels: self + .channels + .lock() + .unwrap() + .keys() + .map(|id| id.clone()) + .collect(), + }) + .expect("failed to serialize ChannelList message"), + ); + } + Ok(ControllerMessage::GetChannelInfo { id }) => { + if let Some(channel) = self.channels.lock().unwrap().get(&id) { + ctx.spawn(self.get_channel_info_future(channel.clone())); + } + } + Ok(_) => {} + Err(err) => { + error!( + "Controller {} has websocket error: {}", + self.remote_addr, err + ); + ctx.notify(ErrorMessage(String::from("Internal processing error"))); + self.shutdown(ctx, false); + } + } + } + + fn shutdown(&mut self, ctx: &mut ws::WebsocketContext, from_close: bool) { + debug!("Shutting down controller {}", self.remote_addr); + + if !from_close { + ctx.close(None); + } + ctx.stop(); + } +} + +impl Actor for Controller { + type Context = ws::WebsocketContext; + + /// Called once the controller is started. + fn started(&mut self, ctx: &mut Self::Context) { + trace!("Started controller {}", self.remote_addr); + } + + /// Called when the controller is fully stopped. + fn stopped(&mut self, _ctx: &mut Self::Context) { + trace!("Controller {} stopped", self.remote_addr); + } +} + +impl StreamHandler> for Controller { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + match msg { + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Text(text)) => { + self.handle_message(ctx, &text); + } + Ok(ws::Message::Close(reason)) => { + debug!( + "Controller {} websocket connection closed: {:?}", + self.remote_addr, reason + ); + self.shutdown(ctx, true); + } + Ok(ws::Message::Binary(_binary)) => { + error!("Unsupported binary message, ignoring"); + } + Ok(ws::Message::Continuation(_)) => { + error!("Unsupported continuation message, ignoring"); + } + Ok(ws::Message::Nop) | Ok(ws::Message::Pong(_)) => { + // Do nothing + } + Err(err) => { + error!( + "Controller {} websocket connection error: {:?}", + self.remote_addr, err + ); + self.shutdown(ctx, false); + } + } + } +} + +/// Message to report pipeline errors to the actor. +#[derive(Debug)] +struct ErrorMessage(String); + +impl Message for ErrorMessage { + type Result = (); +} + +impl Handler for Controller { + type Result = (); + + fn handle(&mut self, msg: ErrorMessage, ctx: &mut ws::WebsocketContext) -> Self::Result { + error!( + "Got error message '{}' on controller {}", + msg.0, self.remote_addr + ); + + ctx.text( + serde_json::to_string(&ServerMessage::Error { message: msg.0 }) + .expect("Failed to serialize error message"), + ); + self.shutdown(ctx, false); + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 28fda43..3f6937c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,7 +2,9 @@ // // Licensed under the MIT license, see the LICENSE file or +mod channel; mod config; +mod controller; mod server; use anyhow::Error; diff --git a/server/src/server.rs b/server/src/server.rs index 9f27fdd..a01609c 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -2,36 +2,40 @@ // // Licensed under the MIT license, see the LICENSE file or +use crate::channel::Channel; use crate::config::Config; +use crate::controller::Controller; +use actix::Addr; use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; use actix_web_actors::ws; +use std::collections::HashMap; +use std::sync::Mutex; + use log::error; /// Create Subscriber/Publisher WebSocket actors. async fn ws( cfg: web::Data, + channels: web::Data>>>, path: web::Path, req: HttpRequest, stream: web::Payload, ) -> Result { match path.as_str() { "control" => { - /* - let publisher = Publisher::new( + let controller = Controller::new( cfg.into_inner(), - rooms.as_ref().clone(), + channels.into_inner(), &req.connection_info(), ) .map_err(|err| { - error!("Failed to create publisher: {}", err); + error!("Failed to create controller: {}", err); HttpResponse::InternalServerError() })?; - ws::start(publisher, &req, stream) - */ - Ok(HttpResponse::NotFound().finish()) + ws::start(controller, &req, stream) } _ => Ok(HttpResponse::NotFound().finish()), } @@ -39,8 +43,11 @@ async fn ws( /// Start the server based on the passed `Config`. pub async fn run(cfg: Config) -> Result<(), anyhow::Error> { + let channels: HashMap> = HashMap::new(); + let channels = web::Data::new(Mutex::new(channels)); let cfg = web::Data::new(cfg); let cfg_clone = cfg.clone(); + let channels_clone = channels.clone(); let server = HttpServer::new(move || { let cors = actix_cors::Cors::default().allow_any_origin().max_age(3600); @@ -49,6 +56,7 @@ pub async fn run(cfg: Config) -> Result<(), anyhow::Error> { .wrap(actix_web::middleware::Logger::default()) .wrap(cors) .app_data(cfg_clone.clone()) + .app_data(channels_clone.clone()) .route("/ws/{mode:(control)}", web::get().to(ws)) });