This commit is contained in:
Mathieu Duponchelle 2021-05-14 03:56:12 +02:00
Родитель c293cf59a5
Коммит bbfaf0e865
13 изменённых файлов: 497 добавлений и 12 удалений

78
Cargo.lock сгенерированный
Просмотреть файл

@ -558,6 +558,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time 0.1.43",
"winapi 0.3.9",
]
@ -571,12 +572,44 @@ dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim",
"textwrap",
"strsim 0.8.0",
"textwrap 0.11.0",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap"
version = "3.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bd1061998a501ee7d4b6d449020df3266ca3124b941ec56cf2005c3779ca142"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"indexmap",
"lazy_static",
"os_str_bytes",
"strsim 0.10.0",
"termcolor",
"textwrap 0.12.1",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap_derive"
version = "3.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "370f715b81112975b1b69db93e0b56ea4cd4e5002ac43b2da8474106a54096a1"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "const_fn"
version = "0.4.7"
@ -1463,6 +1496,12 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "os_str_bytes"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85"
[[package]]
name = "parking_lot"
version = "0.11.1"
@ -1743,12 +1782,29 @@ dependencies = [
"gstreamer-base",
"log",
"openssl",
"rtmp-switcher-controlling",
"serde",
"serde_json",
"structopt",
"uuid",
]
[[package]]
name = "rtmp-switcher-controller"
version = "0.1.0"
dependencies = [
"clap 3.0.0-beta.2",
]
[[package]]
name = "rtmp-switcher-controlling"
version = "0.1.0"
dependencies = [
"chrono",
"serde",
"uuid",
]
[[package]]
name = "rustc_version"
version = "0.2.3"
@ -1943,13 +1999,19 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "structopt"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5277acd7ee46e63e5168a80734c9f6ee81b1367a7d8772a2d765df2a3705d28c"
dependencies = [
"clap",
"clap 2.33.3",
"lazy_static",
"structopt-derive",
]
@ -2029,6 +2091,15 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "textwrap"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "203008d98caf094106cfaba70acfed15e18ed3ddb7d94e49baec153a2b462789"
dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.24"
@ -2316,6 +2387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7"
dependencies = [
"getrandom 0.2.2",
"serde",
]
[[package]]

Просмотреть файл

@ -1,5 +1,7 @@
[workspace]
members = [
"common",
"server",
"controller",
]

11
common/Cargo.toml Normal file
Просмотреть файл

@ -0,0 +1,11 @@
[package]
name = "rtmp-switcher-controlling"
version = "0.1.0"
authors = ["Mathieu Duponchelle <mathieu@centricular.com>"]
edition = "2018"
license = "MIT"
[dependencies]
serde = "1"
uuid = { version = "0.8", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }

75
common/src/controller.rs Normal file
Просмотреть файл

@ -0,0 +1,75 @@
// Copyright (C) 2021 Mathieu Duponchelle <mathieu@centricular.com>
//
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
use chrono::offset::Utc;
use chrono::DateTime;
use serde::{Deserialize, Serialize};
/// Messages sent from the controller to the switcher.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ControllerMessage {
StartChannel {
/// Display name
name: String,
/// RTMP address
destination: String,
},
StopChannel {
/// Assigned identifier
id: uuid::Uuid,
},
GetChannelInfo {
/// Assigned identifier
id: uuid::Uuid,
},
AddSource {
/// What channel the source should be added to
id: uuid::Uuid,
/// URI of the source
uri: String,
/// When the source should be cued, if None the source will be
/// switched to after the last source cued on the channel is over
cue_time: Option<DateTime<Utc>>,
},
ModifySource {
/// The ID of the source to modify
id: uuid::Uuid,
/// When the source should be cued, if None the source will be
/// switched to after the last source cued on the channel is over
cue_time: Option<DateTime<Utc>>,
},
/// Immediately switch to a source
SwitchSource {
/// The ID of the source to switch to. All previous sources
/// on that channel are discarded
id: uuid::Uuid,
},
/// Remove a source
RemoveSource {
/// The ID of the source to remove
id: uuid::Uuid,
},
/// List all channel IDs
ListChannels,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub struct ChannelInfo {
pub id: uuid::Uuid,
pub name: String,
pub destination: String,
}
/// Messages sent from the the server to the controller.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ServerMessage {
Error { message: String },
ChannelList { channels: Vec<uuid::Uuid> },
ChannelStarted { id: uuid::Uuid },
ChannelStopped { id: uuid::Uuid },
ChannelInfo(ChannelInfo),
}

1
common/src/lib.rs Normal file
Просмотреть файл

@ -0,0 +1 @@
pub mod controller;

9
controller/Cargo.toml Normal file
Просмотреть файл

@ -0,0 +1,9 @@
[package]
name = "rtmp-switcher-controller"
version = "0.1.0"
authors = ["Mathieu Duponchelle <mathieu@centricular.com>"]
edition = "2018"
license = "MIT"
[dependencies]
clap = "3.0.0-beta.2"

55
controller/src/main.rs Normal file
Просмотреть файл

@ -0,0 +1,55 @@
// Copyright (C) 2021 Mathieu Duponchelle <mathieu@centricular.com>
//
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
use clap::{AppSettings, Clap};
use std::path::PathBuf;
#[derive(Clap, Debug)]
#[clap(author = "Mathieu Duponchelle <mathieu@centricular.com>")]
#[clap(setting = AppSettings::ColoredHelp)]
struct Opts {
/// Address of the rtmp switcher, e.g. https://localhost:8080
server: String,
/// TLS Certificate chain file.
pub certificate_file: Option<PathBuf>,
#[clap(subcommand)]
subcmd: SubCommand,
}
#[derive(Clap, Debug)]
enum SubCommand {
/// List all currently-running channels
List,
/// Control individual channels
Channel {
#[clap(subcommand)]
subcmd: ChannelSubCommand,
},
}
#[derive(Clap, Debug)]
enum ChannelSubCommand {
/// Start a channel
Start {
/// The name of the new channel
name: String,
},
/// Stop a channel
Stop {
/// The id of an existing channel
id: String,
},
/// Display information about a channel
Show {
/// The id of an existing channel
id: String,
},
}
fn main() {
let opts: Opts = Opts::parse();
eprintln!("Opts: {:?}", opts);
}

Просмотреть файл

@ -27,3 +27,5 @@ serde_json = "1"
structopt = "0.3"
uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4"
rtmp-switcher-controlling = { path = "../common" }

47
server/src/channel.rs Normal file
Просмотреть файл

@ -0,0 +1,47 @@
use crate::config::Config;
use actix::{Actor, Context, Handler, Message, MessageResult};
use rtmp_switcher_controlling::controller::ChannelInfo;
use std::sync::Arc;
/// Actor that represents a channel
#[derive(Debug)]
pub struct Channel {
cfg: Arc<Config>,
pub id: uuid::Uuid,
name: String,
destination: String,
}
impl Channel {
pub fn new(cfg: Arc<Config>, name: &str, destination: &str) -> Self {
Self {
cfg,
id: uuid::Uuid::new_v4(),
name: name.to_string(),
destination: destination.to_string(),
}
}
}
impl Actor for Channel {
type Context = Context<Self>;
}
#[derive(Debug)]
pub struct GetInfoMessage {}
impl Message for GetInfoMessage {
type Result = ChannelInfo;
}
impl Handler<GetInfoMessage> for Channel {
type Result = MessageResult<GetInfoMessage>;
fn handle(&mut self, msg: GetInfoMessage, ctx: &mut Context<Self>) -> Self::Result {
MessageResult(ChannelInfo {
id: self.id,
name: self.name.to_string(),
destination: self.destination.to_string(),
})
}
}

Просмотреть файл

@ -2,8 +2,8 @@
//
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
use structopt::StructOpt;
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(
@ -23,5 +23,4 @@ pub struct Config {
/// Certificate private key file.
#[structopt(short = "k", long)]
pub key_file: Option<PathBuf>,
}

202
server/src/controller.rs Normal file
Просмотреть файл

@ -0,0 +1,202 @@
// Copyright (C) 2021 Mathieu Duponchelle <mathieu@centricular.com>
//
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
use crate::channel::Channel;
use crate::{channel::GetInfoMessage, config::Config};
use anyhow::{format_err, Error};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use actix::{
Actor, ActorContext, ActorFuture, Addr, AsyncContext, Handler, Message, StreamHandler,
WrapFuture,
};
use actix_web::dev::ConnectionInfo;
use actix_web_actors::ws;
use log::{debug, error, trace};
use rtmp_switcher_controlling::controller::{ControllerMessage, ServerMessage};
/// Actor that represents an application controller.
#[derive(Debug)]
pub struct Controller {
cfg: Arc<Config>,
channels: Arc<Mutex<HashMap<uuid::Uuid, Addr<Channel>>>>,
remote_addr: String,
}
impl Controller {
/// Create a new `Controller` actor.
pub fn new(
cfg: Arc<Config>,
channels: Arc<Mutex<HashMap<uuid::Uuid, Addr<Channel>>>>,
connection_info: &ConnectionInfo,
) -> Result<Self, Error> {
debug!("Creating new controller {:?}", connection_info);
let remote_addr = connection_info
.realip_remote_addr()
.ok_or_else(|| format_err!("WebSocket connection without remote address"))?;
Ok(Controller {
cfg,
channels,
remote_addr: String::from(remote_addr),
})
}
fn get_channel_info_future(
&self,
channel: Addr<Channel>,
) -> impl ActorFuture<Actor = Self, Output = ()> {
async move { channel.send(crate::channel::GetInfoMessage {}).await }
.into_actor(self)
.then(move |res, _, ctx| {
ctx.text(
serde_json::to_string(&res.unwrap())
.expect("failed to serialize ChannelList message"),
);
actix::fut::ready(())
})
}
/// Handle JSON messages from the controller.
fn handle_message(&mut self, ctx: &mut ws::WebsocketContext<Self>, text: &str) {
trace!("Handling message: {}", text);
match serde_json::from_str::<ControllerMessage>(text) {
Ok(ControllerMessage::StartChannel { name, destination }) => {
let channel = Channel::new(self.cfg.clone(), &name, &destination);
let id = channel.id;
self.channels.lock().unwrap().insert(id, channel.start());
ctx.text(
serde_json::to_string(&ServerMessage::ChannelStarted { id })
.expect("failed to serialize ChannelStarted message"),
);
}
Ok(ControllerMessage::StopChannel { id }) => {
self.channels.lock().unwrap().remove(&id);
ctx.text(
serde_json::to_string(&ServerMessage::ChannelStopped { id })
.expect("failed to serialize ChannelStopped message"),
);
}
Ok(ControllerMessage::ListChannels) => {
ctx.text(
serde_json::to_string(&ServerMessage::ChannelList {
channels: self
.channels
.lock()
.unwrap()
.keys()
.map(|id| id.clone())
.collect(),
})
.expect("failed to serialize ChannelList message"),
);
}
Ok(ControllerMessage::GetChannelInfo { id }) => {
if let Some(channel) = self.channels.lock().unwrap().get(&id) {
ctx.spawn(self.get_channel_info_future(channel.clone()));
}
}
Ok(_) => {}
Err(err) => {
error!(
"Controller {} has websocket error: {}",
self.remote_addr, err
);
ctx.notify(ErrorMessage(String::from("Internal processing error")));
self.shutdown(ctx, false);
}
}
}
fn shutdown(&mut self, ctx: &mut ws::WebsocketContext<Self>, from_close: bool) {
debug!("Shutting down controller {}", self.remote_addr);
if !from_close {
ctx.close(None);
}
ctx.stop();
}
}
impl Actor for Controller {
type Context = ws::WebsocketContext<Self>;
/// Called once the controller is started.
fn started(&mut self, ctx: &mut Self::Context) {
trace!("Started controller {}", self.remote_addr);
}
/// Called when the controller is fully stopped.
fn stopped(&mut self, _ctx: &mut Self::Context) {
trace!("Controller {} stopped", self.remote_addr);
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Controller {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
self.handle_message(ctx, &text);
}
Ok(ws::Message::Close(reason)) => {
debug!(
"Controller {} websocket connection closed: {:?}",
self.remote_addr, reason
);
self.shutdown(ctx, true);
}
Ok(ws::Message::Binary(_binary)) => {
error!("Unsupported binary message, ignoring");
}
Ok(ws::Message::Continuation(_)) => {
error!("Unsupported continuation message, ignoring");
}
Ok(ws::Message::Nop) | Ok(ws::Message::Pong(_)) => {
// Do nothing
}
Err(err) => {
error!(
"Controller {} websocket connection error: {:?}",
self.remote_addr, err
);
self.shutdown(ctx, false);
}
}
}
}
/// Message to report pipeline errors to the actor.
#[derive(Debug)]
struct ErrorMessage(String);
impl Message for ErrorMessage {
type Result = ();
}
impl Handler<ErrorMessage> for Controller {
type Result = ();
fn handle(&mut self, msg: ErrorMessage, ctx: &mut ws::WebsocketContext<Self>) -> Self::Result {
error!(
"Got error message '{}' on controller {}",
msg.0, self.remote_addr
);
ctx.text(
serde_json::to_string(&ServerMessage::Error { message: msg.0 })
.expect("Failed to serialize error message"),
);
self.shutdown(ctx, false);
}
}

Просмотреть файл

@ -2,7 +2,9 @@
//
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
mod channel;
mod config;
mod controller;
mod server;
use anyhow::Error;

Просмотреть файл

@ -2,36 +2,40 @@
//
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
use crate::channel::Channel;
use crate::config::Config;
use crate::controller::Controller;
use actix::Addr;
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use std::collections::HashMap;
use std::sync::Mutex;
use log::error;
/// Create Subscriber/Publisher WebSocket actors.
async fn ws(
cfg: web::Data<Config>,
channels: web::Data<Mutex<HashMap<uuid::Uuid, Addr<Channel>>>>,
path: web::Path<String>,
req: HttpRequest,
stream: web::Payload,
) -> Result<HttpResponse, actix_web::Error> {
match path.as_str() {
"control" => {
/*
let publisher = Publisher::new(
let controller = Controller::new(
cfg.into_inner(),
rooms.as_ref().clone(),
channels.into_inner(),
&req.connection_info(),
)
.map_err(|err| {
error!("Failed to create publisher: {}", err);
error!("Failed to create controller: {}", err);
HttpResponse::InternalServerError()
})?;
ws::start(publisher, &req, stream)
*/
Ok(HttpResponse::NotFound().finish())
ws::start(controller, &req, stream)
}
_ => Ok(HttpResponse::NotFound().finish()),
}
@ -39,8 +43,11 @@ async fn ws(
/// Start the server based on the passed `Config`.
pub async fn run(cfg: Config) -> Result<(), anyhow::Error> {
let channels: HashMap<uuid::Uuid, Addr<Channel>> = HashMap::new();
let channels = web::Data::new(Mutex::new(channels));
let cfg = web::Data::new(cfg);
let cfg_clone = cfg.clone();
let channels_clone = channels.clone();
let server = HttpServer::new(move || {
let cors = actix_cors::Cors::default().allow_any_origin().max_age(3600);
@ -49,6 +56,7 @@ pub async fn run(cfg: Config) -> Result<(), anyhow::Error> {
.wrap(actix_web::middleware::Logger::default())
.wrap(cors)
.app_data(cfg_clone.clone())
.app_data(channels_clone.clone())
.route("/ws/{mode:(control)}", web::get().to(ws))
});