server: replace our StreamProducer implementation with the upstream one

We still keep a wrapper around it as we need our implementation to
keep track of consumer ids.
This commit is contained in:
Mathieu Duponchelle 2022-05-26 17:06:01 +02:00
Родитель 1376a0aeea
Коммит d8aef952d0
6 изменённых файлов: 58 добавлений и 282 удалений

16
Cargo.lock сгенерированный
Просмотреть файл

@ -353,6 +353,7 @@ dependencies = [
"gstreamer",
"gstreamer-app",
"gstreamer-base",
"gstreamer-utils",
"gstreamer-video",
"openssl",
"priority-queue",
@ -1037,6 +1038,18 @@ dependencies = [
"system-deps",
]
[[package]]
name = "gstreamer-utils"
version = "0.19.0"
source = "git+https://gitlab.freedesktop.org/gstreamer/gstreamer-rs#50f3eee8ebd85a1d71c83e317383e070201e8695"
dependencies = [
"gstreamer",
"gstreamer-app",
"gstreamer-video",
"once_cell",
"thiserror",
]
[[package]]
name = "gstreamer-video"
version = "0.19.0"
@ -1999,7 +2012,8 @@ dependencies = [
[[package]]
name = "tracing-actix"
version = "0.4.0"
source = "git+https://github.com/MathieuDuponchelle/tracing-actix.git?branch=actix-0.13#1722abd361f88b19e6ec26aeb97802a5d007ae28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c817336e87c6e0c86d1fc220be24ebfb18ad3f3a14d80c0ca8bdba9f3ea261e7"
dependencies = [
"actix",
"futures",

Просмотреть файл

@ -11,6 +11,7 @@ gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/g
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
gst-utils = { package = "gstreamer-utils", git="https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
glib = "0.15"
futures = "0.3"
tokio = { version = "1.6.1", features = ["time", "test-util"] }

Просмотреть файл

@ -130,9 +130,7 @@ impl Destination {
};
for appsrc in [&video_appsrc, &audio_appsrc].iter().copied().flatten() {
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
appsrc.set_handle_segment_change(true);
gst_utils::StreamProducer::configure_consumer(appsrc);
}
let pipeline = gst::Pipeline::new(None);

Просмотреть файл

@ -1016,9 +1016,7 @@ impl Mixer {
}
for slot in [&video_slot, &audio_slot].iter().copied().flatten() {
slot.appsrc.set_format(gst::Format::Time);
slot.appsrc.set_is_live(true);
slot.appsrc.set_handle_segment_change(true);
gst_utils::StreamProducer::configure_consumer(&slot.appsrc);
}
let mut slot = ConsumerSlot {

Просмотреть файл

@ -125,7 +125,7 @@ impl Source {
pipeline.add(&deinterlace)?;
let appsink: &gst::Element = video_producer.appsink().upcast_ref();
let appsink = video_producer.appsink();
debug!(appsink = %appsink.name(), "linking video stream");
@ -133,9 +133,9 @@ impl Source {
let sinkpad = deinterlace.static_pad("sink").unwrap();
pad.link(&sinkpad)?;
deinterlace.link(appsink)?;
deinterlace.link(&appsink)?;
Ok(Some(appsink.clone()))
Ok(Some(appsink.upcast()))
} else {
Ok(None)
}
@ -145,19 +145,19 @@ impl Source {
pipeline.add_many(&[&aconv, &level])?;
let appsink: &gst::Element = audio_producer.appsink().upcast_ref();
let appsink = audio_producer.appsink();
debug!(appsink = %appsink.name(), "linking audio stream to appsink");
aconv.sync_state_with_parent()?;
level.sync_state_with_parent()?;
gst::Element::link_many(&[&aconv, &level, appsink])?;
gst::Element::link_many(&[&aconv, &level, appsink.upcast_ref()])?;
let sinkpad = aconv.static_pad("sink").unwrap();
pad.link(&sinkpad)?;
Ok(Some(appsink.clone()))
Ok(Some(appsink.upcast()))
} else {
Ok(None)
}

Просмотреть файл

@ -1,79 +1,46 @@
//! Data interface between nodes
use std::collections::HashMap;
use std::mem;
use std::sync::{atomic, Arc, Mutex};
use gst::prelude::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::debug;
use tracing::{debug, error, trace, warn};
#[derive(Debug)]
struct StreamProducerInner {
producer: gst_utils::StreamProducer,
links: HashMap<String, gst_utils::ConsumptionLink>,
}
/// The interface for transporting media data from one node
/// to another.
///
/// A producer is essentially a GStreamer `appsink` whose output
/// is sent to a set of consumers, who are essentially `appsrc` wrappers
#[derive(Debug, Clone)]
/// Wrapper around `gst_utils::StreamProducer`, additionally keeping
/// track of consumer ids
pub struct StreamProducer {
/// The appsink to dispatch data for
appsink: gst_app::AppSink,
/// The consumers to dispatch data to
consumers: Arc<Mutex<StreamConsumers>>,
inner: Arc<Mutex<StreamProducerInner>>,
}
impl PartialEq for StreamProducer {
fn eq(&self, other: &Self) -> bool {
self.appsink.eq(&other.appsink)
}
}
impl Eq for StreamProducer {}
impl StreamProducer {
/// Add an appsrc to dispatch data to
pub fn add_consumer(&self, consumer: &gst_app::AppSrc, consumer_id: &str) {
let mut consumers = self.consumers.lock().unwrap();
if consumers.consumers.get(consumer_id).is_some() {
error!(appsink = %self.appsink.name(), appsrc = %consumer.name(), "Consumer already added");
return;
}
let mut inner = self.inner.lock().unwrap();
debug!(appsink = %self.appsink.name(), appsrc = %consumer.name(), "Adding consumer");
let link = inner.producer.add_consumer(consumer).unwrap();
consumer.set_property("max-buffers", 0u64);
consumer.set_property("max-bytes", 0u64);
consumer.set_property("max-time", 500 * gst::ClockTime::MSECOND);
consumer.set_property_from_str("leaky-type", "downstream");
// Forward force-keyunit events upstream to the appsink
let srcpad = consumer.static_pad("src").unwrap();
let appsink_clone = self.appsink.clone();
let appsrc = consumer.clone();
let fku_probe_id = srcpad
.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |_pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() {
trace!(appsink = %appsink_clone.name(), appsrc = %appsrc.name(), "Requesting keyframe");
let _ = appsink_clone.send_event(ev.clone());
}
}
gst::PadProbeReturn::Ok
})
.unwrap();
consumers.consumers.insert(
consumer_id.to_string(),
StreamConsumer::new(consumer, fku_probe_id, consumer_id),
);
inner.links.insert(consumer_id.to_string(), link);
}
/// Remove a consumer appsrc by id
pub fn remove_consumer(&self, consumer_id: &str) {
if let Some(consumer) = self.consumers.lock().unwrap().consumers.remove(consumer_id) {
debug!(appsink = %self.appsink.name(), appsrc = %consumer.appsrc.name(), "Removed consumer");
if self
.inner
.lock()
.unwrap()
.links
.remove(consumer_id)
.is_some()
{
debug!(appsink = %self.appsink().name(), consumer_id = %consumer_id, "Removed consumer");
} else {
debug!(appsink = %self.appsink.name(), consumer_id = %consumer_id, "Consumer not found");
debug!(appsink = %self.appsink().name(), consumer_id = %consumer_id, "Consumer not found");
}
}
@ -81,12 +48,12 @@ impl StreamProducer {
///
/// This is useful for example for prerolling live sources.
pub fn forward(&self) {
self.consumers.lock().unwrap().discard = false;
self.inner.lock().unwrap().producer.forward();
}
/// Get the GStreamer `appsink` wrapped by this producer
pub fn appsink(&self) -> &gst_app::AppSink {
&self.appsink
pub fn appsink(&self) -> gst_app::AppSink {
self.inner.lock().unwrap().producer.appsink().clone()
}
/// Get the unique identifiers of all the consumers currently connected
@ -95,10 +62,10 @@ impl StreamProducer {
/// This is useful for disconnecting those automatically when the parent node
/// stops
pub fn get_consumer_ids(&self) -> Vec<String> {
self.consumers
self.inner
.lock()
.unwrap()
.consumers
.links
.keys()
.map(|id| id.to_string())
.collect()
@ -107,213 +74,11 @@ impl StreamProducer {
impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
fn from(appsink: &'a gst_app::AppSink) -> Self {
let consumers = Arc::new(Mutex::new(StreamConsumers {
current_latency: None,
latency_updated: false,
consumers: HashMap::new(),
discard: true,
}));
let consumers_clone = consumers.clone();
let consumers_clone2 = consumers.clone();
appsink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |appsink| {
let mut consumers = consumers_clone.lock().unwrap();
let sample = match appsink.pull_sample() {
Ok(sample) => sample,
Err(_err) => {
debug!("Failed to pull sample");
return Err(gst::FlowError::Flushing);
}
};
if consumers.discard {
return Ok(gst::FlowSuccess::Ok);
}
let span = tracing::trace_span!("New sample", appsink = %appsink.name());
let _guard = span.enter();
let latency = consumers.current_latency;
let latency_updated = mem::replace(&mut consumers.latency_updated, false);
let mut requested_keyframe = false;
let current_consumers = consumers
.consumers
.values()
.map(|c| {
if let Some(latency) = latency {
if c.forwarded_latency
.compare_exchange(
false,
true,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok()
|| latency_updated
{
c.appsrc.set_latency(latency, gst::ClockTime::NONE);
}
}
if c.first_buffer
.compare_exchange(
true,
false,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
)
.is_ok() && !requested_keyframe {
trace!(appsrc = %c.appsrc.name(), "Requesting keyframe for first buffer");
appsink.send_event(
gst_video::UpstreamForceKeyUnitEvent::builder()
.all_headers(true)
.build(),
);
requested_keyframe = true;
}
c.appsrc.clone()
})
.collect::<smallvec::SmallVec<[_; 16]>>();
drop(consumers);
//trace!("Appsink pushing sample {:?}, current running time: {}", sample, appsink.current_running_time());
for consumer in current_consumers {
if let Err(err) = consumer.push_sample(&sample) {
warn!(appsrc = %consumer.name(), "Failed to push sample: {}", err);
}
}
Ok(gst::FlowSuccess::Ok)
})
.eos(move |appsink| {
let span = tracing::debug_span!("EOS", appsink = %appsink.name());
let _guard = span.enter();
let current_consumers = consumers_clone2
.lock()
.unwrap()
.consumers
.values()
.map(|c| c.appsrc.clone())
.collect::<smallvec::SmallVec<[_; 16]>>();
for consumer in current_consumers {
let _ = consumer.end_of_stream();
}
})
.build(),
);
let consumers_clone = consumers.clone();
let sinkpad = appsink.static_pad("sink").unwrap();
sinkpad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
use gst::EventView;
if let EventView::Latency(ev) = ev.view() {
if let Some(parent) = pad.parent() {
let latency = ev.latency();
trace!(appsink = %parent.name(), latency = %latency, "Latency updated");
let mut consumers = consumers_clone.lock().unwrap();
consumers.current_latency = Some(latency);
consumers.latency_updated = true;
}
}
}
gst::PadProbeReturn::Ok
});
StreamProducer {
appsink: appsink.clone(),
consumers,
Self {
inner: Arc::new(Mutex::new(StreamProducerInner {
producer: gst_utils::StreamProducer::from(appsink),
links: HashMap::new(),
})),
}
}
}
/// Wrapper around a HashMap of consumers, exists for thread safety
/// and also protects some of the producer state
#[derive(Debug)]
struct StreamConsumers {
/// The currently-observed latency
current_latency: Option<gst::ClockTime>,
/// Whether the consumers' appsrc latency needs updating
latency_updated: bool,
/// The consumers, link id -> consumer
consumers: HashMap<String, StreamConsumer>,
/// Whether appsrc samples should be forwarded to consumers yet
discard: bool,
}
/// Wrapper around a consumer's `appsrc`
#[derive(Debug)]
struct StreamConsumer {
/// The GStreamer `appsrc` of the consumer
appsrc: gst_app::AppSrc,
/// The id of a pad probe that intercepts force-key-unit events
fku_probe_id: Option<gst::PadProbeId>,
/// Whether an initial latency was forwarded to the `appsrc`
forwarded_latency: atomic::AtomicBool,
/// Whether a first buffer has made it through, used to determine
/// whether a new key unit should be requested. Only useful for encoded
/// streams.
first_buffer: atomic::AtomicBool,
}
impl StreamConsumer {
/// Create a new consumer
fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId, consumer_id: &str) -> Self {
let consumer_id = consumer_id.to_string();
appsrc.set_callbacks(
gst_app::AppSrcCallbacks::builder()
.enough_data(move |_appsrc| {
trace!(
"consumer {} is not consuming fast enough, old samples are getting dropped",
consumer_id
);
})
.build(),
);
StreamConsumer {
appsrc: appsrc.clone(),
fku_probe_id: Some(fku_probe_id),
forwarded_latency: atomic::AtomicBool::new(false),
first_buffer: atomic::AtomicBool::new(true),
}
}
}
impl Drop for StreamConsumer {
fn drop(&mut self) {
if let Some(fku_probe_id) = self.fku_probe_id.take() {
let srcpad = self.appsrc.static_pad("src").unwrap();
srcpad.remove_probe(fku_probe_id);
}
}
}
impl PartialEq for StreamConsumer {
fn eq(&self, other: &Self) -> bool {
self.appsrc.eq(&other.appsrc)
}
}
impl Eq for StreamConsumer {}
impl std::hash::Hash for StreamConsumer {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
std::hash::Hash::hash(&self.appsrc, state);
}
}
impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
fn borrow(&self) -> &gst_app::AppSrc {
&self.appsrc
}
}