Bug 1529581 - Rewrite the pacing part of VideoFrameConverter as Pacer<T>. r=bwc

The new Pacer includes both pacing of incoming frames, and the duplication logic
from VideoFrameConverter. It guarantees that no events happen early, thanks to
MediaTimer.

The largest difference from the old pacing logic is that only one timer is used,
VideoFrameConverter used two -- one for pacing and one for same-frame
duplications. The same-frame timer was positioned later in the pipe, to avoid
convert the same input frame multiple times, something users of the new Pacer
will have to handle. Two timers however uncovered flaws from nsTimer -- namely
that the ordering between two timers is not guaranteed as there is no nsTimer
API using absolute timestamps. Using only one timer avoids this.

Differential Revision: https://phabricator.services.mozilla.com/D129653
This commit is contained in:
Andreas Pehrson 2021-11-03 15:23:27 +00:00
Родитель 163106b58b
Коммит e9d46c62f3
3 изменённых файлов: 342 добавлений и 0 удалений

164
dom/media/Pacer.h Normal file
Просмотреть файл

@ -0,0 +1,164 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "MediaEventSource.h"
#include "MediaTimer.h"
#include "mozilla/TaskQueue.h"
#include "nsDeque.h"
#ifndef DOM_MEDIA_PACER_H_
# define DOM_MEDIA_PACER_H_
namespace mozilla {
/**
* Pacer<T> takes a queue of Ts tied to timestamps, and emits PacedItemEvents
* for every T at its corresponding timestamp.
*
* The queue is ordered. Enqueing an item at time t will drop all items at times
* later than T. This is because of how video sources work (some send out frames
* in the future, some don't), and to allow swapping one source for another.
*
* It supports a duplication interval. If there is no new item enqueued within
* the duplication interval since the last enqueued item, the last enqueud item
* is emitted again.
*/
template <typename T>
class Pacer {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Pacer)
Pacer(RefPtr<TaskQueue> aTaskQueue, TimeDuration aDuplicationInterval)
: mTaskQueue(std::move(aTaskQueue)),
mDuplicationInterval(aDuplicationInterval),
mTimer(MakeAndAddRef<MediaTimer>()) {}
/**
* Enqueues an item and schedules a timer to pass it on to PacedItemEvent() at
* t=aTime. Already queued items with t>=aTime will be dropped.
*/
void Enqueue(T aItem, TimeStamp aTime) {
MOZ_ALWAYS_SUCCEEDS(mTaskQueue->Dispatch(NS_NewRunnableFunction(
__func__,
[this, self = RefPtr<Pacer>(this), aItem = std::move(aItem), aTime] {
MOZ_DIAGNOSTIC_ASSERT(!mIsShutdown);
while (const auto* item = mQueue.Peek()) {
if (item->mTime < aTime) {
break;
}
RefPtr<QueueItem> dropping = mQueue.Pop();
}
mQueue.Push(MakeAndAddRef<QueueItem>(std::move(aItem), aTime));
EnsureTimerScheduled(aTime);
})));
}
RefPtr<GenericPromise> Shutdown() {
return InvokeAsync(
mTaskQueue, __func__, [this, self = RefPtr<Pacer>(this)] {
mIsShutdown = true;
mTimer->Cancel();
mQueue.Erase();
mCurrentTimerTarget = Nothing();
return GenericPromise::CreateAndResolve(true, "Pacer::Shutdown");
});
}
MediaEventSourceExc<T, TimeStamp>& PacedItemEvent() {
return mPacedItemEvent;
}
protected:
~Pacer() = default;
void EnsureTimerScheduled(TimeStamp aTime) {
if (mCurrentTimerTarget && *mCurrentTimerTarget <= aTime) {
return;
}
if (mCurrentTimerTarget) {
mTimer->Cancel();
mCurrentTimerTarget = Nothing();
}
mTimer->WaitUntil(aTime, __func__)
->Then(
mTaskQueue, __func__,
[this, self = RefPtr<Pacer>(this)] { OnTimerTick(); },
[] {
// Timer was rejected. This is fine.
});
mCurrentTimerTarget = Some(aTime);
}
void OnTimerTick() {
MOZ_ASSERT(mTaskQueue->IsOnCurrentThread());
mCurrentTimerTarget = Nothing();
while (RefPtr<QueueItem> item = mQueue.PopFront()) {
auto now = TimeStamp::Now();
if (item->mTime <= now) {
// It's time to process this item.
if (const auto& next = mQueue.PeekFront();
!next || next->mTime > (item->mTime + mDuplicationInterval)) {
// No future frame within the duplication interval exists. Schedule
// a copy.
mQueue.PushFront(MakeAndAddRef<QueueItem>(
item->mItem, item->mTime + mDuplicationInterval));
}
mPacedItemEvent.Notify(std::move(item->mItem), item->mTime);
continue;
}
// This item is in the future. Put it back.
mQueue.PushFront(item.forget());
break;
}
if (const auto& next = mQueue.PeekFront(); next) {
// The queue is not empty. Schedule the timer.
EnsureTimerScheduled(next->mTime);
}
}
public:
const RefPtr<TaskQueue> mTaskQueue;
const TimeDuration mDuplicationInterval;
protected:
struct QueueItem {
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(QueueItem)
QueueItem(T aItem, TimeStamp aTime)
: mItem(std::forward<T>(aItem)), mTime(aTime) {}
T mItem;
TimeStamp mTime;
private:
~QueueItem() = default;
};
// Accessed on mTaskQueue.
nsRefPtrDeque<QueueItem> mQueue;
// Accessed on mTaskQueue.
RefPtr<MediaTimer> mTimer;
// Accessed on mTaskQueue.
Maybe<TimeStamp> mCurrentTimerTarget;
// Accessed on mTaskQueue.
bool mIsShutdown = false;
MediaEventProducerExc<T, TimeStamp> mPacedItemEvent;
};
} // namespace mozilla
#endif

Просмотреть файл

@ -0,0 +1,177 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "Pacer.h"
#include "VideoUtils.h"
#include "WaitFor.h"
using namespace mozilla;
template <typename T>
class PacerTest {
protected:
explicit PacerTest(TimeDuration aDuplicationInterval)
: mTaskQueue(MakeRefPtr<TaskQueue>(
GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), "PacerTest")),
mPacer(MakeRefPtr<Pacer<T>>(mTaskQueue, aDuplicationInterval)),
mInterval(aDuplicationInterval) {}
void TearDown() {
mPacer->Shutdown()->Then(mTaskQueue, __func__,
[tq = mTaskQueue] { tq->BeginShutdown(); });
}
const RefPtr<TaskQueue> mTaskQueue;
const RefPtr<Pacer<T>> mPacer;
const TimeDuration mInterval;
};
class PacerTestInt : public PacerTest<int>, public ::testing::Test {
protected:
explicit PacerTestInt(TimeDuration aDuplicationInterval)
: PacerTest<int>(aDuplicationInterval) {}
void TearDown() override { PacerTest::TearDown(); }
};
class PacerTestIntLongDuplication : public PacerTestInt {
protected:
PacerTestIntLongDuplication() : PacerTestInt(TimeDuration::FromSeconds(10)) {}
};
class PacerTestIntTenMsDuplication : public PacerTestInt {
protected:
PacerTestIntTenMsDuplication()
: PacerTestInt(TimeDuration::FromMilliseconds(10)) {}
};
TEST_F(PacerTestIntLongDuplication, Single) {
auto now = TimeStamp::Now();
auto d1 = TimeDuration::FromMilliseconds(100);
mPacer->Enqueue(1, now + d1);
auto [i, time] = WaitFor(TakeN(mPacer->PacedItemEvent(), 1)).unwrap()[0];
EXPECT_GE(TimeStamp::Now() - now, d1);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, d1);
}
TEST_F(PacerTestIntLongDuplication, Past) {
auto now = TimeStamp::Now();
auto d1 = TimeDuration::FromMilliseconds(100);
mPacer->Enqueue(1, now - d1);
auto [i, time] = WaitFor(TakeN(mPacer->PacedItemEvent(), 1)).unwrap()[0];
EXPECT_GE(TimeStamp::Now() - now, -d1);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, -d1);
}
TEST_F(PacerTestIntLongDuplication, TimeReset) {
auto now = TimeStamp::Now();
auto d1 = TimeDuration::FromMilliseconds(100);
auto d2 = TimeDuration::FromMilliseconds(200);
auto d3 = TimeDuration::FromMilliseconds(300);
mPacer->Enqueue(1, now + d1);
mPacer->Enqueue(2, now + d3);
mPacer->Enqueue(3, now + d2);
auto items = WaitFor(TakeN(mPacer->PacedItemEvent(), 2)).unwrap();
{
auto [i, time] = items[0];
EXPECT_GE(TimeStamp::Now() - now, d1);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, d1);
}
{
auto [i, time] = items[1];
EXPECT_GE(TimeStamp::Now() - now, d2);
EXPECT_EQ(i, 3);
EXPECT_EQ(time - now, d2);
}
}
TEST_F(PacerTestIntTenMsDuplication, SingleDuplication) {
auto now = TimeStamp::Now();
auto d1 = TimeDuration::FromMilliseconds(100);
mPacer->Enqueue(1, now + d1);
auto items = WaitFor(TakeN(mPacer->PacedItemEvent(), 2)).unwrap();
{
auto [i, time] = items[0];
EXPECT_GE(TimeStamp::Now() - now, d1);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, d1);
}
{
auto [i, time] = items[1];
EXPECT_GE(TimeStamp::Now() - now, d1 + mInterval);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, d1 + mInterval);
}
}
TEST_F(PacerTestIntTenMsDuplication, RacyDuplication1) {
auto now = TimeStamp::Now();
auto d1 = TimeDuration::FromMilliseconds(100);
auto d2 = d1 + mInterval - TimeDuration::FromMicroseconds(1);
mPacer->Enqueue(1, now + d1);
mPacer->Enqueue(2, now + d2);
auto items = WaitFor(TakeN(mPacer->PacedItemEvent(), 3)).unwrap();
{
auto [i, time] = items[0];
EXPECT_GE(TimeStamp::Now() - now, d1);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, d1);
}
{
auto [i, time] = items[1];
EXPECT_GE(TimeStamp::Now() - now, d2);
EXPECT_EQ(i, 2);
EXPECT_EQ(time - now, d2);
}
{
auto [i, time] = items[2];
EXPECT_GE(TimeStamp::Now() - now, d2 + mInterval);
EXPECT_EQ(i, 2);
EXPECT_EQ(time - now, d2 + mInterval);
}
}
TEST_F(PacerTestIntTenMsDuplication, RacyDuplication2) {
auto now = TimeStamp::Now();
auto d1 = TimeDuration::FromMilliseconds(100);
auto d2 = d1 + mInterval + TimeDuration::FromMicroseconds(1);
mPacer->Enqueue(1, now + d1);
mPacer->Enqueue(2, now + d2);
auto items = WaitFor(TakeN(mPacer->PacedItemEvent(), 3)).unwrap();
{
auto [i, time] = items[0];
EXPECT_GE(TimeStamp::Now() - now, d1);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, d1);
}
{
auto [i, time] = items[1];
EXPECT_GE(TimeStamp::Now() - now, d1 + mInterval);
EXPECT_EQ(i, 1);
EXPECT_EQ(time - now, d1 + mInterval);
}
{
auto [i, time] = items[2];
EXPECT_GE(TimeStamp::Now() - now, d2);
EXPECT_EQ(i, 2);
EXPECT_EQ(time - now, d2);
}
}

Просмотреть файл

@ -54,6 +54,7 @@ UNIFIED_SOURCES += [
"TestMuxer.cpp",
"TestOggWriter.cpp",
"TestOpusParser.cpp",
"TestPacer.cpp",
"TestRTCStatsTimestampMaker.cpp",
"TestRust.cpp",
"TestTimeUnit.cpp",