Bug 1214710 - [1.11] Implement ReaderQueue for simultaneous decoder limit enforcement. r=jya,jwwang

This commit is contained in:
Eugen Sawin 2016-02-08 21:48:25 +01:00
Родитель f8ebb731be
Коммит 5a015f6bec
5 изменённых файлов: 188 добавлений и 1 удалений

Просмотреть файл

@ -9,11 +9,14 @@
#include "MediaResource.h"
#include "VideoUtils.h"
#include "ImageContainer.h"
#include "MediaPrefs.h"
#include "nsPrintfCString.h"
#include "mozilla/mozalloc.h"
#include "mozilla/Mutex.h"
#include <stdint.h>
#include <algorithm>
#include <list>
using namespace mozilla::media;
@ -62,6 +65,154 @@ public:
size_t mSize;
};
// The ReaderQueue is used to keep track of the numer of active readers to
// enforce a given limit on the number of simultaneous active decoders.
// Readers are added/removed during construction/destruction and are
// suspended and resumed by the queue. The max number of active decoders is
// controlled by the "media.decoder.limit" pref.
class ReaderQueue
{
public:
static ReaderQueue& Instance()
{
static StaticMutex sMutex;
StaticMutexAutoLock lock(sMutex);
if (!sInstance) {
sInstance = new ReaderQueue;
sInstance->MaxNumActive(MediaPrefs::MediaDecoderLimit());
ClearOnShutdown(&sInstance);
}
MOZ_ASSERT(sInstance);
return *sInstance;
}
void MaxNumActive(int32_t aNumActive)
{
MutexAutoLock lock(mMutex);
if (aNumActive < 0) {
mNumMaxActive = std::numeric_limits<uint32_t>::max();
} else {
mNumMaxActive = aNumActive;
}
}
void Add(MediaDecoderReader* aReader)
{
MutexAutoLock lock(mMutex);
if (mActive.Length() < mNumMaxActive) {
// Below active limit, resume the new reader.
mActive.AppendElement(aReader);
DispatchResume(aReader);
} else if (mActive.IsEmpty()) {
MOZ_ASSERT(mNumMaxActive == 0);
mSuspended.AppendElement(aReader);
} else {
// We're past the active limit, suspend an old reader and resume the new.
mActive.AppendElement(aReader);
MediaDecoderReader* suspendReader = mActive.ElementAt(0);
mSuspended.AppendElement(suspendReader);
mActive.RemoveElementAt(0);
DispatchSuspendResume(suspendReader, aReader);
}
}
void Remove(MediaDecoderReader* aReader)
{
MutexAutoLock lock(mMutex);
if (aReader->IsSuspended()) {
// Removing suspended readers has no immediate side-effects.
DebugOnly<bool> result = mSuspended.RemoveElement(aReader);
MOZ_ASSERT(result, "Suspended reader must be in mSuspended");
} else {
// For each removed active reader, we resume a suspended one.
DebugOnly<bool> result = mActive.RemoveElement(aReader);
MOZ_ASSERT(result, "Non-suspended reader must be in mActive");
if (mSuspended.IsEmpty()) {
return;
}
MediaDecoderReader* resumeReader = mSuspended.LastElement();
mActive.AppendElement(resumeReader);
mSuspended.RemoveElementAt(mSuspended.Length() - 1);
DispatchResume(resumeReader);
}
}
private:
ReaderQueue()
: mNumMaxActive(std::numeric_limits<uint32_t>::max())
, mMutex("ReaderQueue:mMutex")
{
}
static void Resume(MediaDecoderReader* aReader)
{
if (!aReader->IsSuspended()) {
return;
}
aReader->SetIsSuspended(false);
}
static void Suspend(MediaDecoderReader* aReader)
{
if (aReader->IsSuspended()) {
return;
}
aReader->SetIsSuspended(true);
aReader->ReleaseMediaResources();
}
static void DispatchResume(MediaDecoderReader* aReader)
{
RefPtr<MediaDecoderReader> reader = aReader;
nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
[reader]() {
Resume(reader);
});
reader->OwnerThread()->Dispatch(task.forget());
}
static void DispatchSuspend(MediaDecoderReader* aReader)
{
RefPtr<MediaDecoderReader> reader = aReader;
nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
[reader]() {
Suspend(reader);
});
reader->OwnerThread()->Dispatch(task.forget());
}
static void DispatchSuspendResume(MediaDecoderReader* aSuspend,
MediaDecoderReader* aResume)
{
RefPtr<MediaDecoderReader> suspend = aSuspend;
RefPtr<MediaDecoderReader> resume = aResume;
nsCOMPtr<nsIRunnable> task = NS_NewRunnableFunction(
[suspend, resume] () {
Suspend(suspend);
DispatchResume(resume);
});
suspend->OwnerThread()->Dispatch(task.forget());
}
static StaticAutoPtr<ReaderQueue> sInstance;
nsTArray<RefPtr<MediaDecoderReader>> mActive;
nsTArray<RefPtr<MediaDecoderReader>> mSuspended;
uint32_t mNumMaxActive;
mutable Mutex mMutex;
};
StaticAutoPtr<ReaderQueue> ReaderQueue::sInstance;
MediaDecoderReader::MediaDecoderReader(AbstractMediaDecoder* aDecoder)
: mAudioCompactor(mAudioQueue)
, mDecoder(aDecoder)
@ -75,6 +226,7 @@ MediaDecoderReader::MediaDecoderReader(AbstractMediaDecoder* aDecoder)
, mShutdown(false)
, mAudioDiscontinuity(false)
, mVideoDiscontinuity(false)
, mIsSuspended(true)
{
MOZ_COUNT_CTOR(MediaDecoderReader);
MOZ_ASSERT(NS_IsMainThread());
@ -84,6 +236,8 @@ MediaDecoderReader::MediaDecoderReader(AbstractMediaDecoder* aDecoder)
mTaskQueue, this, &MediaDecoderReader::NotifyDataArrived);
}
ReaderQueue::Instance().Add(this);
// Dispatch initialization that needs to happen on that task queue.
mTaskQueue->Dispatch(NewRunnableMethod(this, &MediaDecoderReader::InitializationTask));
}
@ -376,6 +530,8 @@ MediaDecoderReader::Shutdown()
mDecoder = nullptr;
ReaderQueue::Instance().Remove(this);
return mTaskQueue->BeginShutdown();
}

Просмотреть файл

@ -290,6 +290,18 @@ public:
// Notified by the OggReader during playback when chained ogg is detected.
MediaEventSource<void>& OnMediaNotSeekable() { return mOnMediaNotSeekable; }
bool IsSuspended() const
{
MOZ_ASSERT(OnTaskQueue());
return mIsSuspended;
}
void SetIsSuspended(bool aState)
{
MOZ_ASSERT(OnTaskQueue());
mIsSuspended = aState;
}
protected:
virtual ~MediaDecoderReader();
@ -435,6 +447,7 @@ private:
// "discontinuity" in the stream. For example after a seek.
bool mAudioDiscontinuity;
bool mVideoDiscontinuity;
bool mIsSuspended;
MediaEventListener mDataArrivedListener;
};

Просмотреть файл

@ -135,7 +135,6 @@ MediaFormatReader::Shutdown()
MOZ_ASSERT(!mVideo.HasPromise());
mDemuxer = nullptr;
mPlatform = nullptr;
return MediaDecoderReader::Shutdown();
@ -459,6 +458,11 @@ MediaFormatReader::EnsureDecoderInitialized(TrackType aTrack)
[self] (TrackType aTrack) {
auto& decoder = self->GetDecoderData(aTrack);
decoder.mInitPromise.Complete();
if (self->IsSuspended()) {
return;
}
decoder.mDecoderInitialized = true;
MonitorAutoLock mon(decoder.mMonitor);
decoder.mDescription = decoder.mDecoder->GetDescriptionName();
@ -532,6 +536,10 @@ MediaFormatReader::RequestVideoData(bool aSkipToNextKeyframe,
return MediaDataPromise::CreateAndReject(CANCELED, __func__);
}
if (IsSuspended()) {
return MediaDataPromise::CreateAndReject(CANCELED, __func__);
}
media::TimeUnit timeThreshold{media::TimeUnit::FromMicroseconds(aTimeThreshold)};
// Ensure we have no pending seek going as ShouldSkip could return out of date
// information.
@ -623,6 +631,10 @@ MediaFormatReader::RequestAudioData()
return MediaDataPromise::CreateAndReject(DECODE_ERROR, __func__);
}
if (IsSuspended()) {
return MediaDataPromise::CreateAndReject(CANCELED, __func__);
}
if (IsSeeking()) {
LOG("called mid-seek. Rejecting.");
return MediaDataPromise::CreateAndReject(CANCELED, __func__);
@ -917,6 +929,7 @@ MediaFormatReader::HandleDemuxedSamples(TrackType aTrack,
AbstractMediaDecoder::AutoNotifyDecoded& aA)
{
MOZ_ASSERT(OnTaskQueue());
auto& decoder = GetDecoderData(aTrack);
if (decoder.mQueuedSamples.IsEmpty()) {
@ -1835,6 +1848,9 @@ void MediaFormatReader::ReleaseMediaResources()
}
mVideo.mInitPromise.DisconnectIfExists();
mVideo.ShutdownDecoder();
mAudio.mInitPromise.DisconnectIfExists();
mAudio.ShutdownDecoder();
}
bool

Просмотреть файл

@ -101,6 +101,7 @@ public:
void GetMozDebugReaderData(nsAString& aString);
private:
bool HasVideo() { return mVideo.mTrackDemuxer; }
bool HasAudio() { return mAudio.mTrackDemuxer; }

Просмотреть файл

@ -120,6 +120,7 @@ private:
DECL_MEDIA_PREF("media.webspeech.recognition.force_enable", WebSpeechRecognitionForceEnabled, bool, false);
DECL_MEDIA_PREF("media.num-decode-threads", MediaThreadPoolDefaultCount, uint32_t, 4);
DECL_MEDIA_PREF("media.decoder.limit", MediaDecoderLimit, uint32_t, -1);
public:
// Manage the singleton: