Refresh all outdated dependencies

This commit is contained in:
Mathieu Duponchelle 2022-02-27 03:13:41 +01:00
Родитель 4247ca2c33
Коммит 205bd41de3
14 изменённых файлов: 904 добавлений и 1841 удалений

2161
Cargo.lock сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -6,7 +6,7 @@ edition = "2018"
license = "MIT"
[dependencies]
serde = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
uuid = { version = "0.8", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }

Просмотреть файл

@ -8,14 +8,14 @@ license = "MIT"
[dependencies]
anyhow = "1"
futures = "0.3"
tokio = { version = "0.2", features = ["signal"] }
clap = "3.0.0-beta.2"
async-tungstenite = { version = "0.9", features = ["tokio-runtime", "tokio-openssl"] }
tokio = { version = "1.0", features = ["signal", "rt"] }
clap = { version = "3", features = ["derive"] }
async-tungstenite = { version = "0.17", features = ["tokio-runtime", "tokio-openssl"] }
serde_json = "1"
log = "0.4"
openssl = "0.10"
env_logger = "0.8"
uuid = { version = "0.8", features = ["serde"] }
env_logger = "0.9"
uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = "0.4"
auteur-controlling = { path = "../common" }

Просмотреть файл

@ -2,7 +2,7 @@
use anyhow::Error;
use chrono::{DateTime, Utc};
use clap::{AppSettings, Clap};
use clap::{ArgEnum, Parser, Subcommand};
use std::path::PathBuf;
mod controller;
@ -10,9 +10,8 @@ use controller::Controller;
use auteur_controlling::controller::{Command, ControlMode, ControlPoint, DestinationFamily};
#[derive(Clap, Debug)]
#[derive(Parser, Debug)]
#[clap(author = "Mathieu Duponchelle <mathieu@centricular.com>")]
#[clap(setting = AppSettings::ColoredHelp)]
/// Top-level options
struct Opts {
/// Address of the Auteur server, e.g. https://localhost:8080
@ -25,7 +24,7 @@ struct Opts {
}
/// Top-level subcommands
#[derive(Clap, Debug)]
#[derive(Subcommand, Debug)]
enum SubCommand {
/// Create and connect nodes
Node {
@ -34,7 +33,7 @@ enum SubCommand {
},
}
#[derive(Clap, Debug)]
#[derive(ArgEnum, Debug, Clone)]
enum ArgControlMode {
Interpolate,
Set,
@ -74,7 +73,7 @@ where
}
/// Create and connect nodes
#[derive(Clap, Debug)]
#[derive(Subcommand, Debug)]
enum NodeSubCommand {
/// Create a new node
Create {
@ -164,7 +163,7 @@ enum NodeSubCommand {
}
/// Node-specific creation commands
#[derive(Clap, Debug)]
#[derive(Subcommand, Debug)]
enum CreateNodeSubCommand {
/// Create a new source
Source {
@ -207,7 +206,7 @@ enum CreateNodeSubCommand {
}
/// Create a destination
#[derive(Clap, Debug)]
#[derive(Subcommand, Debug)]
enum CreateDestinationSubCommand {
/// Create a new RTMP destination
Rtmp {
@ -234,18 +233,6 @@ enum CreateDestinationSubCommand {
},
}
/// Source-specific commands
#[derive(Clap, Debug)]
enum SourceSubCommand {}
/// Destination-specific commands
#[derive(Clap, Debug)]
enum DestinationSubCommand {}
/// Mixer-specific commands
#[derive(Clap, Debug)]
enum MixerSubCommand {}
/// Client application entry point
fn main() -> Result<(), Error> {
let opts: Opts = Opts::parse();
@ -255,8 +242,7 @@ fn main() -> Result<(), Error> {
.write_style("AUTEUR_CONTROLLER_LOG_STYLE");
env_logger::init_from_env(env);
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;

Просмотреть файл

@ -11,27 +11,23 @@ gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/g
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
glib = "0.10"
glib = "0.15"
futures = "0.3"
tokio = { version = "1.6.1", features = ["time", "test-util"] }
actix = "0.11"
actix = "0.12"
actix-rt = "2"
actix-web = { version = "4.0.0-beta.5", features = ["openssl"] }
actix-web-actors = "4.0.0-beta.4"
actix-service = "=2.0.0-beta.5"
actix-files = "0.6.0-beta.4"
actix-http = "=3.0.0-beta.5"
actix-web-httpauth = "0.6.0-beta.1"
actix-web = { version = "4", features = ["openssl"] }
actix-web-actors = "4"
openssl = "0.10"
tracing-actix-web = "0.4.0-beta.1"
tracing-actix-web = "0.5"
tracing = { version = "0.1", features = ["log"] }
tracing-actix = { git = "https://github.com/MathieuDuponchelle/tracing-actix.git", branch="actix-0.11" }
tracing-futures = { version = "0.1", features = ["std-future"] }
tracing-subscriber = { version = "0.2", features = ["registry", "env-filter"] }
tracing-actix = "0.3.0"
tracing-futures = { version = "0.2", features = ["std-future"] }
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
tracing-log = "0.1"
tracing-error = "0.1"
tracing-appender = "0.1"
test-env-log = { version = "0.2", features = ["trace"], default-features = false }
tracing-error = "0.2"
tracing-appender = "0.2"
test-log = { version = "0.2", features = ["trace"], default-features = false }
serde = "1"
serde_json = "1"
structopt = "0.3"

Просмотреть файл

@ -203,16 +203,15 @@ impl Destination {
self.pipeline.add_many(&[&mux, &mux_queue, &sink])?;
sink.set_property("location", uri).unwrap();
sink.set_property("location", uri);
mux.set_property("streamable", &true).unwrap();
mux.set_property("latency", &1000000000u64).unwrap();
mux.set_property("streamable", &true);
mux.set_property("latency", &1000000000u64);
mux.set_property(
"start-time-selection",
gst_base::AggregatorStartTimeSelection::First,
)
.unwrap();
);
gst::Element::link_many(&[&mux, &mux_queue, &sink])?;
@ -237,25 +236,23 @@ impl Destination {
if venc.has_property("tune", None) {
venc.set_property_from_str("tune", "zerolatency");
} else if venc.has_property("zerolatency", None) {
venc.set_property("zerolatency", &true).unwrap();
venc.set_property("zerolatency", &true);
}
if venc.has_property("key-int-max", None) {
venc.set_property("key-int-max", &30u32).unwrap();
venc.set_property("key-int-max", &30u32);
} else if venc.has_property("gop-size", None) {
venc.set_property("gop-size", &30i32).unwrap();
venc.set_property("gop-size", &30i32);
}
vparse.set_property("config-interval", &-1i32).unwrap();
vparse.set_property("config-interval", &-1i32);
timecodestamper.set_property_from_str("source", "rtc");
timeoverlay.set_property_from_str("time-mode", "time-code");
venc_queue
.set_properties(&[
("max-size-buffers", &0u32),
("max-size-bytes", &0u32),
("max-size-time", &(3 * gst::SECOND)),
])
.unwrap();
venc_queue.set_properties(&[
("max-size-buffers", &0u32),
("max-size-bytes", &0u32),
("max-size-time", &(3 * gst::ClockTime::SECOND)),
]);
gst::Element::link_many(&[
appsrc.upcast_ref(),
@ -283,13 +280,11 @@ impl Destination {
&aenc_queue,
])?;
aenc_queue
.set_properties(&[
("max-size-buffers", &0u32),
("max-size-bytes", &0u32),
("max-size-time", &(3 * gst::SECOND)),
])
.unwrap();
aenc_queue.set_properties(&[
("max-size-buffers", &0u32),
("max-size-bytes", &0u32),
("max-size-time", &(3 * gst::ClockTime::SECOND)),
]);
gst::Element::link_many(&[
appsrc.upcast_ref(),
@ -307,8 +302,7 @@ impl Destination {
let id_clone = self.id.clone();
ctx.run_interval(std::time::Duration::from_secs(1), move |_s, _ctx| {
if let Some(sink) = sink_clone.upgrade() {
let val = sink.property("stats").unwrap();
let s = val.get::<gst::Structure>().unwrap();
let s = sink.property::<gst::Structure>("stats");
trace!(id = %id_clone, "rtmp destination statistics: {}", s.to_string());
}
@ -342,18 +336,19 @@ impl Destination {
self.pipeline.add_many(&[&multiqueue, &sink])?;
if let Some(max_size_time) = max_size_time {
sink.set_property("max-size-time", (max_size_time as u64) * gst::MSECOND)
.unwrap();
sink.set_property("use-robust-muxing", &true).unwrap();
sink.set_property(
"max-size-time",
(max_size_time as u64) * gst::ClockTime::MSECOND,
);
sink.set_property("use-robust-muxing", &true);
let mux = make_element("qtmux", None)?;
mux.set_property("reserved-moov-update-period", &gst::SECOND)
.unwrap();
sink.set_property("muxer", &mux).unwrap();
mux.set_property("reserved-moov-update-period", &gst::ClockTime::SECOND);
sink.set_property("muxer", &mux);
let location = base_name.to_owned() + "%05d.mp4";
sink.set_property("location", &location).unwrap();
sink.set_property("location", &location);
} else {
let location = base_name.to_owned() + ".mp4";
sink.set_property("location", &location).unwrap();
sink.set_property("location", &location);
}
if let Some(ref appsrc) = self.video_appsrc {
@ -738,7 +733,7 @@ mod tests {
use super::*;
use crate::utils::tests::*;
use tempfile::tempdir;
use test_env_log::test;
use test_log::test;
#[actix_rt::test]
#[test]

Просмотреть файл

@ -50,7 +50,7 @@ struct ConsumerSlot {
#[derive(Debug)]
pub struct VideoMixingState {
/// For how long no pad other than our base plate has selected samples
base_plate_timeout: gst::ClockTime,
base_plate_timeout: Option<gst::ClockTime>,
/// Whether our base plate is opaque
showing_base_plate: bool,
/// Our slot controllers
@ -58,7 +58,7 @@ pub struct VideoMixingState {
/// Our controllers (width, height, ...)
mixer_controllers: Option<HashMap<String, SettingController>>,
/// The last observed PTS, for interpolating
last_pts: gst::ClockTime,
last_pts: Option<gst::ClockTime>,
/// For resizing our output video stream
capsfilter: Option<gst::Element>,
}
@ -69,7 +69,7 @@ pub struct AudioMixingState {
/// Our slot controllers
slot_controllers: Option<HashMap<String, PropertyController>>,
/// The last observed PTS, for interpolating
last_pts: gst::ClockTime,
last_pts: Option<gst::ClockTime>,
}
/// One audio or video output branch
@ -121,11 +121,11 @@ impl VideoOutput {
id,
},
mixing_state: Arc::new(Mutex::new(VideoMixingState {
base_plate_timeout: gst::CLOCK_TIME_NONE,
base_plate_timeout: gst::ClockTime::NONE,
showing_base_plate: false,
slot_controllers: Some(HashMap::new()),
mixer_controllers: Some(HashMap::new()),
last_pts: gst::CLOCK_TIME_NONE,
last_pts: gst::ClockTime::NONE,
capsfilter: None,
})),
}
@ -144,7 +144,7 @@ impl VideoOutput {
let ghost = match fallback_image {
"" => {
let vsrc = make_element("videotestsrc", None)?;
vsrc.set_property("is-live", &true).unwrap();
vsrc.set_property("is-live", &true);
vsrc.set_property_from_str("pattern", "black");
bin.add(&vsrc)?;
@ -157,8 +157,8 @@ impl VideoOutput {
let vconv = make_element("videoconvert", None)?;
let imagefreeze = make_element("imagefreeze", None)?;
filesrc.set_property("location", fallback_image).unwrap();
imagefreeze.set_property("is-live", &true).unwrap();
filesrc.set_property("location", fallback_image);
imagefreeze.set_property("is-live", &true);
bin.add_many(&[&filesrc, &decodebin, &imagefreeze, &vconv])?;
@ -191,19 +191,15 @@ impl VideoOutput {
agg: &gst_base::Aggregator,
base_plate_pad: &gst::Pad,
id: &str,
duration: gst::ClockTime,
duration: Option<gst::ClockTime>,
controllers: &mut HashMap<String, SettingController>,
capsfilter: &Option<gst::Element>,
) -> HashMap<String, SettingController> {
let now = get_now();
let mut updated_controllers = HashMap::new();
let mut caps = capsfilter.as_ref().map(|capsfilter| {
capsfilter
.property("caps")
.unwrap()
.get::<gst::Caps>()
.unwrap()
});
let mut caps = capsfilter
.as_ref()
.map(|capsfilter| capsfilter.property::<gst::Caps>("caps"));
for (id, mut controller) in controllers.drain() {
let setting = controller.setting.clone();
@ -215,17 +211,17 @@ impl VideoOutput {
if id == "width" {
let width = setting.lock().unwrap().as_i32().unwrap();
caps.make_mut().set_simple(&[("width", &width)]);
base_plate_pad.set_property("width", &width).unwrap();
base_plate_pad.set_property("width", &width);
} else if id == "height" {
let height = setting.lock().unwrap().as_i32().unwrap();
caps.make_mut().set_simple(&[("height", &height)]);
base_plate_pad.set_property("height", &height).unwrap();
base_plate_pad.set_property("height", &height);
}
}
}
if let Some(capsfilter) = capsfilter {
capsfilter.set_property("caps", &caps.unwrap()).unwrap();
capsfilter.set_property("caps", &caps.unwrap());
}
updated_controllers
@ -263,27 +259,27 @@ impl VideoOutput {
if base_plate_only {
if mixing_state.base_plate_timeout.is_none() {
mixing_state.base_plate_timeout = pts;
mixing_state.base_plate_timeout = Some(pts);
} else if !mixing_state.showing_base_plate
&& pts - mixing_state.base_plate_timeout > timeout
&& pts - mixing_state.base_plate_timeout.unwrap() > timeout
{
debug!("falling back to base plate {:?}", base_plate_pad);
base_plate_pad.set_property("alpha", &1.0f64).unwrap();
base_plate_pad.set_property("alpha", &1.0f64);
mixing_state.showing_base_plate = true;
}
} else {
if mixing_state.showing_base_plate {
debug!("hiding base plate: {:?}", base_plate_pad);
base_plate_pad.set_property("alpha", &0.0f64).unwrap();
base_plate_pad.set_property("alpha", &0.0f64);
mixing_state.showing_base_plate = false;
}
mixing_state.base_plate_timeout = gst::CLOCK_TIME_NONE;
mixing_state.base_plate_timeout = gst::ClockTime::NONE;
}
let duration = if mixing_state.last_pts.is_none() {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
} else {
pts - mixing_state.last_pts
Some(pts - mixing_state.last_pts.unwrap())
};
mixing_state.slot_controllers = Some(Mixer::synchronize_slot_controllers(
@ -302,7 +298,7 @@ impl VideoOutput {
&mixing_state.capsfilter,
));
mixing_state.last_pts = pts;
mixing_state.last_pts = Some(pts);
}
/// Fill the pipeline for this output branch
@ -327,33 +323,27 @@ impl VideoOutput {
self.output
.mixer
.set_property_from_str("background", "black");
self.output.mixer.set_property(
"start-time-selection",
&gst_base::AggregatorStartTimeSelection::First,
);
self.output
.mixer
.set_property(
"start-time-selection",
&gst_base::AggregatorStartTimeSelection::First,
)
.unwrap();
self.output
.mixer
.set_property("ignore-inactive-pads", &true)
.unwrap();
.set_property("ignore-inactive-pads", &true);
vcapsfilter
.set_property(
"caps",
&gst::Caps::builder("video/x-raw")
.field("width", &width)
.field("height", &height)
.field("framerate", &gst::Fraction::new(30, 1))
.field("pixel-aspect-ratio", &gst::Fraction::new(1, 1))
.field("format", &"AYUV")
.field("colorimetry", &"bt601")
.field("chroma-site", &"jpeg")
.field("interlace-mode", &"progressive")
.build(),
)
.unwrap();
vcapsfilter.set_property(
"caps",
&gst::Caps::builder("video/x-raw")
.field("width", &width)
.field("height", &height)
.field("framerate", &gst::Fraction::new(30, 1))
.field("pixel-aspect-ratio", &gst::Fraction::new(1, 1))
.field("format", &"AYUV")
.field("colorimetry", &"bt601")
.field("chroma-site", &"jpeg")
.field("interlace-mode", &"progressive")
.build(),
);
pipeline.add_many(&[&vsrc, &vqueue, &self.output.mixer, &vcapsfilter])?;
@ -372,20 +362,17 @@ impl VideoOutput {
let base_plate_pad = self.output.mixer.static_pad("sink_0").unwrap();
base_plate_pad.set_property("alpha", &0.0f64).unwrap();
base_plate_pad.set_property("width", &width).unwrap();
base_plate_pad.set_property("height", &height).unwrap();
base_plate_pad.set_property("alpha", &0.0f64);
base_plate_pad.set_property("width", &width);
base_plate_pad.set_property("height", &height);
base_plate_pad.set_property_from_str("sizing-policy", "keep-aspect-ratio");
let mixing_state = self.mixing_state.clone();
mixing_state.lock().unwrap().capsfilter = Some(vcapsfilter);
let id = self.output.id.clone();
let timeout = timeout as u64 * gst::MSECOND;
let timeout = timeout as u64 * gst::ClockTime::MSECOND;
self.output
.mixer
.set_property("emit-signals", &true)
.unwrap();
self.output.mixer.set_property("emit-signals", &true);
self.output
.mixer
.downcast_ref::<gst_base::Aggregator>()
@ -393,7 +380,7 @@ impl VideoOutput {
.connect_samples_selected(
move |agg: &gst_base::Aggregator, _segment, pts, _dts, _duration, _info| {
let mut mixing_state = mixing_state.lock().unwrap();
Self::update_mixing_state(agg, &id, pts, &mut *mixing_state, timeout);
Self::update_mixing_state(agg, &id, pts.unwrap(), &mut *mixing_state, timeout);
},
);
@ -424,7 +411,7 @@ impl AudioOutput {
},
mixing_state: Arc::new(Mutex::new(AudioMixingState {
slot_controllers: Some(HashMap::new()),
last_pts: gst::CLOCK_TIME_NONE,
last_pts: gst::ClockTime::NONE,
})),
}
}
@ -438,9 +425,9 @@ impl AudioOutput {
mixing_state: &mut AudioMixingState,
) {
let duration = if mixing_state.last_pts.is_none() {
gst::CLOCK_TIME_NONE
gst::ClockTime::NONE
} else {
pts - mixing_state.last_pts
Some(pts - mixing_state.last_pts.unwrap())
};
mixing_state.slot_controllers = Some(Mixer::synchronize_slot_controllers(
@ -450,7 +437,7 @@ impl AudioOutput {
&mut mixing_state.slot_controllers.take().unwrap(),
));
mixing_state.last_pts = pts;
mixing_state.last_pts = Some(pts);
}
/// Fill the pipeline for this output branch
@ -469,50 +456,40 @@ impl AudioOutput {
let aresample = make_element("audioresample", None)?;
let aresamplecapsfilter = make_element("capsfilter", None)?;
asrc.set_property("is-live", &true).unwrap();
asrc.set_property("volume", &0.).unwrap();
asrc.set_property("is-live", true);
asrc.set_property("volume", 0f64);
self.output.mixer.set_property(
"start-time-selection",
&gst_base::AggregatorStartTimeSelection::First,
);
self.output
.mixer
.set_property(
"start-time-selection",
&gst_base::AggregatorStartTimeSelection::First,
)
.unwrap();
self.output
.mixer
.set_property("ignore-inactive-pads", &true)
.unwrap();
.set_property("ignore-inactive-pads", &true);
asrccapsfilter
.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("channels", &2)
.field("format", &"S16LE")
.field("rate", &sample_rate)
.build(),
)
.unwrap();
asrccapsfilter.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("channels", 2i32)
.field("format", &"S16LE")
.field("rate", &sample_rate)
.build(),
);
acapsfilter
.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("channels", &2)
.field("format", &"S16LE")
.field("rate", &sample_rate)
.build(),
)
.unwrap();
acapsfilter.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("channels", 2i32)
.field("format", &"S16LE")
.field("rate", &sample_rate)
.build(),
);
aresamplecapsfilter
.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("rate", &sample_rate)
.build(),
)
.unwrap();
aresamplecapsfilter.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("rate", &sample_rate)
.build(),
);
pipeline.add_many(&[
&asrc,
@ -544,10 +521,7 @@ impl AudioOutput {
let mixing_state = self.mixing_state.clone();
let id = self.output.id.clone();
self.output
.mixer
.set_property("emit-signals", &true)
.unwrap();
self.output.mixer.set_property("emit-signals", &true);
self.output
.mixer
.downcast_ref::<gst_base::Aggregator>()
@ -555,7 +529,7 @@ impl AudioOutput {
.connect_samples_selected(
move |agg: &gst_base::Aggregator, _segment, pts, _dts, _duration, _info| {
let mut mixing_state = mixing_state.lock().unwrap();
Self::update_mixing_state(agg, &id, pts, &mut *mixing_state);
Self::update_mixing_state(agg, &id, pts.unwrap(), &mut *mixing_state);
},
);
@ -803,7 +777,7 @@ impl Mixer {
"video" => Ok((true, split[1])),
"audio" => Ok((false, split[1])),
_ => Err(anyhow!(
"Slot controller property media type must be one of {audio, video}"
"Slot controller property media type must be one of [audio, video]"
)),
},
_ => Err(anyhow!(
@ -856,16 +830,14 @@ impl Mixer {
let queue = make_element("queue", None)?;
let appsrc_elem: &gst::Element = slot.appsrc.upcast_ref();
capsfilter
.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("channels", &2)
.field("format", &"S16LE")
.field("rate", &sample_rate)
.build(),
)
.unwrap();
capsfilter.set_property(
"caps",
&gst::Caps::builder("audio/x-raw")
.field("channels", 2i32)
.field("format", &"S16LE")
.field("rate", &sample_rate)
.build(),
);
bin.add_many(&[appsrc_elem, &conv, &resample, &capsfilter, &queue])?;
pipeline.add(&bin)?;
@ -875,7 +847,7 @@ impl Mixer {
gst::GhostPad::with_target(Some("src"), &queue.static_pad("src").unwrap()).unwrap();
bin.add_pad(&ghost).unwrap();
slot.pad.set_property("volume", &volume).unwrap();
slot.pad.set_property("volume", &volume);
gst::Element::link_many(&[appsrc_elem, &conv, &resample, &capsfilter, &queue])?;
let srcpad = bin.static_pad("src").unwrap();
@ -897,7 +869,7 @@ impl Mixer {
fn synchronize_slot_controllers(
agg: &gst_base::Aggregator,
id: &str,
duration: gst::ClockTime,
duration: Option<gst::ClockTime>,
controllers: &mut HashMap<String, PropertyController>,
) -> HashMap<String, PropertyController> {
let now = get_now();

Просмотреть файл

@ -991,7 +991,7 @@ impl Handler<NodeStatusMessage> for NodeManager {
mod tests {
use super::*;
use crate::utils::tests::*;
use test_env_log::test;
use test_log::test;
#[actix_rt::test]
#[test]

Просмотреть файл

@ -5,7 +5,7 @@ use crate::controller::Controller;
use crate::node::{NodeManager, StopMessage};
use actix::SystemService;
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use actix_web::{error, web, App, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use tracing::error;
@ -20,7 +20,7 @@ async fn ws(
"control" => {
let controller = Controller::new(&req.connection_info()).map_err(|err| {
error!("Failed to create controller: {}", err);
HttpResponse::InternalServerError()
error::ErrorInternalServerError(err)
})?;
ws::start(controller, &req, stream)

Просмотреть файл

@ -179,13 +179,11 @@ impl Source {
let src = make_element("fallbacksrc", None)?;
pipeline.add(&src)?;
src.set_property("uri", &self.uri).unwrap();
src.set_property("manual-unblock", &true).unwrap();
src.set_property("immediate-fallback", &true).unwrap();
src.set_property("enable-audio", &self.audio_producer.is_some())
.unwrap();
src.set_property("enable-video", &self.video_producer.is_some())
.unwrap();
src.set_property("uri", &self.uri);
src.set_property("manual-unblock", &true);
src.set_property("immediate-fallback", &true);
src.set_property("enable-audio", &self.audio_producer.is_some());
src.set_property("enable-video", &self.video_producer.is_some());
let pipeline_clone = pipeline.downgrade();
let addr = ctx.address();
@ -234,15 +232,13 @@ impl Source {
src.connect("notify::status", false, move |_args| {
let _ = addr_clone.do_send(SourceStatusMessage);
None
})
.unwrap();
});
let addr_clone = ctx.address();
src.connect("notify::statistics", false, move |_args| {
let _ = addr_clone.do_send(SourceStatusMessage);
None
})
.unwrap();
});
let src_bin: &gst::Bin = src.downcast_ref().unwrap();
@ -291,7 +287,7 @@ impl Source {
fn unblock(&mut self, ctx: &mut Context<Self>) -> Result<StateChangeResult, Error> {
let media = self.media.as_ref().unwrap();
media.src.emit_by_name("unblock", &[]).unwrap();
media.src.emit_by_name::<()>("unblock", &[]);
if let Some(ref producer) = self.video_producer {
producer.forward();
@ -309,8 +305,7 @@ impl Source {
move |s, _ctx| {
if let Some(ref media) = s.media {
if let Some(ref source_bin) = media.source_bin {
let val = source_bin.property("statistics").unwrap();
let s = val.get::<gst::Structure>().unwrap();
let s = source_bin.property::<gst::Structure>("statistics");
trace!(id = %id_clone, "source statistics: {}", s.to_string());
}
@ -345,20 +340,16 @@ impl Source {
fn monitor_switch(&mut self, ctx: &mut Context<Self>, switch: gst::Element) {
if let Some(ref mut media) = self.media {
let addr_clone = ctx.address();
switch
.connect("notify::primary-health", false, move |_args| {
let _ = addr_clone.do_send(SourceStatusMessage);
None
})
.unwrap();
switch.connect("notify::primary-health", false, move |_args| {
let _ = addr_clone.do_send(SourceStatusMessage);
None
});
let addr_clone = ctx.address();
switch
.connect("notify::fallback-health", false, move |_args| {
let _ = addr_clone.do_send(SourceStatusMessage);
None
})
.unwrap();
switch.connect("notify::fallback-health", false, move |_args| {
let _ = addr_clone.do_send(SourceStatusMessage);
None
});
media.switches.push(switch);
}
@ -368,12 +359,12 @@ impl Source {
#[instrument(level = "trace", name = "new-source-status", skip(self), fields(id = %self.id))]
fn log_source_status(&mut self) {
if let Some(ref media) = self.media {
let value = media.src.property("status").unwrap();
let value = media.src.property("status");
let status = gst::glib::EnumValue::from_value(&value).expect("Not an enum type");
trace!("Source status: {}", status.nick());
trace!("Source status: {}", status.1.nick());
trace!(
"Source statistics: {:?}",
media.src.property("statistics").unwrap()
media.src.property::<gst::Structure>("statistics")
);
for switch in &media.switches {
@ -385,13 +376,17 @@ impl Source {
None => "ANY",
};
let value = switch.property("primary-health").unwrap();
let value = switch.property_value("primary-health");
let health = gst::glib::EnumValue::from_value(&value).expect("Not an enum type");
trace!("switch {} primary health: {}", switch_name, health.nick());
trace!("switch {} primary health: {}", switch_name, health.1.nick());
let value = switch.property("fallback-health").unwrap();
let value = switch.property_value("fallback-health");
let health = gst::glib::EnumValue::from_value(&value).expect("Not an enum type");
trace!("switch {} fallback health: {}", switch_name, health.nick());
trace!(
"switch {} fallback health: {}",
switch_name,
health.1.nick()
);
}
}
}
@ -665,7 +660,7 @@ mod tests {
use crate::utils::get_now;
use crate::utils::tests::*;
use std::collections::VecDeque;
use test_env_log::test;
use test_log::test;
#[actix_rt::test]
#[test]

Просмотреть файл

@ -182,8 +182,8 @@ impl PipelineManager {
let (eos_sender, eos_receiver) = oneshot::channel::<()>();
pipeline.use_clock(Some(&gst::SystemClock::obtain()));
pipeline.set_start_time(gst::CLOCK_TIME_NONE);
pipeline.set_base_time(gst::ClockTime::from(0));
pipeline.set_start_time(gst::ClockTime::NONE);
pipeline.set_base_time(gst::ClockTime::from_nseconds(0));
Self {
pipeline,

Просмотреть файл

@ -68,27 +68,28 @@ impl PropertyController {
///
/// This function returns whether control points are still pending
#[instrument(level = "trace", name = "synchronizing controller", skip(self), fields(id = %self.controllee_id, propname = %self.propname))]
pub fn synchronize(&mut self, now: DateTime<Utc>, duration: gst::ClockTime) -> bool {
pub fn synchronize(&mut self, now: DateTime<Utc>, duration: Option<gst::ClockTime>) -> bool {
let mut control_points = self.control_points.take().unwrap();
if let Some((_id, Reverse(point))) = control_points.peek() {
let mut do_trace = false;
let initial = self.obj.property(self.propname.as_str()).unwrap();
let initial = self.obj.property_value(self.propname.as_str());
if match point.mode {
ControlMode::Interpolate => match duration {
gst::CLOCK_TIME_NONE => false,
_ => {
ControlMode::Interpolate => {
if let Some(duration) = duration {
do_trace = true;
PropertyController::interpolate_property(
&self.obj,
now,
duration.nseconds().unwrap(),
duration.nseconds(),
&self.propname,
point,
)
} else {
false
}
},
}
ControlMode::Set => {
PropertyController::set_property(&self.obj, now, &self.propname, point)
}
@ -98,7 +99,7 @@ impl PropertyController {
}
if do_trace {
let new = self.obj.property(self.propname.as_str()).unwrap();
let new = self.obj.property_value(self.propname.as_str());
trace!(obj = %self.obj.name(), property = %self.propname, "Synchronized controller: {:?} -> {:?}", initial, new);
}
@ -470,7 +471,7 @@ impl PropertyController {
property: &str,
point: &ControlPoint,
) -> bool {
let current = obj.property(property).unwrap();
let current = obj.property_value(property);
let period = if point.time < now {
duration
@ -489,7 +490,7 @@ impl PropertyController {
.mul_div_round(duration as i64, period as i64)
.unwrap();
obj.set_property(property, (current + step) as i32).unwrap();
obj.set_property(property, (current + step) as i32);
}
Type::U32 => {
let current: i64 = current.get::<u32>().unwrap() as i64;
@ -499,7 +500,7 @@ impl PropertyController {
.mul_div_round(duration as i64, period as i64)
.unwrap();
obj.set_property(property, (current + step) as u32).unwrap();
obj.set_property(property, (current + step) as u32);
}
Type::I_LONG | Type::I64 => {
let current: i64 = current.get().unwrap();
@ -511,13 +512,13 @@ impl PropertyController {
.mul_div_round(duration as i64, period as i64)
.unwrap();
obj.set_property(property, (current + step) as i64).unwrap();
obj.set_property(property, (current + step) as i64);
} else {
let step = (current - target)
.mul_div_round(duration as i64, period as i64)
.unwrap();
obj.set_property(property, (current - step) as i64).unwrap();
obj.set_property(property, (current - step) as i64);
}
}
Type::U_LONG | Type::U64 => {
@ -528,11 +529,11 @@ impl PropertyController {
if target >= current {
let step = (target - current).mul_div_round(duration, period).unwrap();
obj.set_property(property, (current + step) as u64).unwrap();
obj.set_property(property, (current + step) as u64);
} else {
let step = (current - target).mul_div_round(duration, period).unwrap();
obj.set_property(property, (current - step) as u64).unwrap();
obj.set_property(property, (current - step) as u64);
}
}
Type::F32 => {
@ -551,7 +552,7 @@ impl PropertyController {
new
};
obj.set_property(property, new as f32).unwrap();
obj.set_property(property, new as f32);
}
Type::F64 => {
let current: f64 = current.get().unwrap();
@ -569,7 +570,7 @@ impl PropertyController {
new
};
obj.set_property(property, new).unwrap();
obj.set_property(property, new);
}
_ => unreachable!(),
}
@ -587,42 +588,42 @@ impl PropertyController {
Type::STRING => {
let target = value.as_str().unwrap();
obj.set_property(property, target).unwrap();
obj.set_property(property, target);
}
Type::BOOL => {
let target = value.as_bool().unwrap();
obj.set_property(property, target).unwrap();
obj.set_property(property, target);
}
Type::I32 => {
let target = value.as_i64().unwrap();
obj.set_property(property, target as i32).unwrap();
obj.set_property(property, target as i32);
}
Type::U32 => {
let target = value.as_i64().unwrap();
obj.set_property(property, target as u32).unwrap();
obj.set_property(property, target as u32);
}
Type::I_LONG | Type::I64 => {
let target = value.as_i64().unwrap();
obj.set_property(property, target as i64).unwrap();
obj.set_property(property, target as i64);
}
Type::U_LONG | Type::U64 => {
let target = value.as_u64().unwrap();
obj.set_property(property, target as u64).unwrap();
obj.set_property(property, target as u64);
}
Type::F32 => {
let target = value.as_f64().unwrap();
obj.set_property(property, target as f32).unwrap();
obj.set_property(property, target as f32);
}
Type::F64 => {
let target = value.as_f64().unwrap();
obj.set_property(property, target as f64).unwrap();
obj.set_property(property, target as f64);
}
_ => {
if pspec.downcast_ref::<gst::glib::ParamSpecEnum>().is_some() {
@ -654,7 +655,7 @@ impl PropertyController {
pub fn properties(obj: &gst::Object, prefix: &str) -> HashMap<String, serde_json::Value> {
let mut ret = HashMap::new();
for pspec in obj.list_properties() {
for pspec in obj.list_properties().iter() {
if !pspec.flags().contains(gst::glib::ParamFlags::READABLE) {
continue;
}
@ -667,7 +668,7 @@ impl PropertyController {
continue;
}
let prop_value = obj.property(pspec.name()).unwrap();
let prop_value = obj.property_value(pspec.name());
let value = match pspec.value_type() {
Type::STRING => prop_value.get::<String>().unwrap().into(),
@ -798,7 +799,7 @@ pub mod tests {
gst::init().unwrap();
let queue = make_element("queue", None).unwrap();
queue.set_property("max-size-bytes", &10u32).unwrap();
queue.set_property("max-size-bytes", &10u32);
let now = get_now();
let point = ControlPoint {
id: "test-controller".to_string(),
@ -815,62 +816,34 @@ pub mod tests {
controller.push_control_point(point);
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
10
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 10);
assert_eq!(controller.synchronize(now, gst::CLOCK_TIME_NONE), false);
assert_eq!(controller.synchronize(now, gst::ClockTime::NONE), false);
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
10
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 10);
assert_eq!(
controller.synchronize(
now + chrono::Duration::nanoseconds(1),
gst::ClockTime::from_nseconds(1)
Some(gst::ClockTime::from_nseconds(1))
),
false
);
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
10
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 10);
// Control point should be consumed
assert_eq!(
controller.synchronize(
now + chrono::Duration::nanoseconds(2),
gst::ClockTime::from_nseconds(1)
Some(gst::ClockTime::from_nseconds(1))
),
true
);
assert!(controller.control_points().is_empty());
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
0
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 0);
}
#[test]
@ -878,7 +851,7 @@ pub mod tests {
gst::init().unwrap();
let queue = make_element("queue", None).unwrap();
queue.set_property("max-size-bytes", &10u32).unwrap();
queue.set_property("max-size-bytes", &10u32);
let now = get_now();
let point = ControlPoint {
id: "test-controller".to_string(),
@ -895,61 +868,33 @@ pub mod tests {
controller.push_control_point(point);
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
10
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 10);
assert_eq!(controller.synchronize(now, gst::CLOCK_TIME_NONE), false);
assert_eq!(controller.synchronize(now, gst::ClockTime::NONE), false);
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
10
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 10);
assert_eq!(
controller.synchronize(
now + chrono::Duration::nanoseconds(1),
gst::ClockTime::from_nseconds(1)
Some(gst::ClockTime::from_nseconds(1))
),
false
);
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
5
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 5);
// Control point should be consumed
assert_eq!(
controller.synchronize(
now + chrono::Duration::nanoseconds(2),
gst::ClockTime::from_nseconds(1)
Some(gst::ClockTime::from_nseconds(1))
),
true
);
assert!(controller.control_points().is_empty());
assert_eq!(
queue
.property("max-size-bytes")
.unwrap()
.get::<u32>()
.unwrap(),
0
);
assert_eq!(queue.property::<u32>("max-size-bytes"), 0);
}
}

Просмотреть файл

@ -109,7 +109,7 @@ impl SettingController {
///
/// This function returns whether control points are still pending
#[instrument(level = "trace", name = "synchronizing controller", skip(self), fields(id = %self.controllee_id, setting = %self.setting.lock().unwrap().name))]
pub fn synchronize(&mut self, now: DateTime<Utc>, duration: gst::ClockTime) -> bool {
pub fn synchronize(&mut self, now: DateTime<Utc>, duration: Option<gst::ClockTime>) -> bool {
let mut control_points = self.control_points.take().unwrap();
let mut setting = self.setting.lock().unwrap();
@ -122,18 +122,19 @@ impl SettingController {
};
if match point.mode {
ControlMode::Interpolate => match duration {
gst::CLOCK_TIME_NONE => false,
_ => {
ControlMode::Interpolate => {
if let Some(duration) = duration {
do_trace = true;
SettingController::interpolate(
&mut setting,
now,
duration.nseconds().unwrap(),
duration.nseconds(),
point,
)
} else {
false
}
},
}
ControlMode::Set => SettingController::set(&mut setting, now, point),
} {
do_trace = true;

Просмотреть файл

@ -40,11 +40,9 @@ impl StreamProducer {
debug!(appsink = %self.appsink.name(), appsrc = %consumer.name(), "Adding consumer");
consumer.set_property("max-buffers", 0u64).unwrap();
consumer.set_property("max-bytes", 0u64).unwrap();
consumer
.set_property("max-time", 500 * gst::MSECOND)
.unwrap();
consumer.set_property("max-buffers", 0u64);
consumer.set_property("max-bytes", 0u64);
consumer.set_property("max-time", 500 * gst::ClockTime::MSECOND);
consumer.set_property_from_str("leaky-type", "downstream");
// Forward force-keyunit events upstream to the appsink
@ -157,7 +155,7 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
.is_ok()
|| latency_updated
{
c.appsrc.set_latency(latency, gst::CLOCK_TIME_NONE);
c.appsrc.set_latency(latency, gst::ClockTime::NONE);
}
}