Bug 1533252 - P2. Make LocalAllocPolicy inherit from AllocPolicy. r=pehrsons

Despite its name and apparent use LocalAllocPolicy could only handle a single call to Alloc() at a time which was okay due to how it was used by the MediaFormatReader.

However, we do want to handle multiple calls to Alloc().

These changes allow to perform simultaneous requests and have them be processed one at a time serially.

Differential Revision: https://phabricator.services.mozilla.com/D23651

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Jean-Yves Avenard 2019-03-15 13:35:42 +00:00
Родитель bbdbe29869
Коммит cbf34767dd
3 изменённых файлов: 170 добавлений и 154 удалений

Просмотреть файл

@ -262,8 +262,8 @@ void MediaFormatReader::DecoderData::Flush() {
class MediaFormatReader::DecoderFactory {
using InitPromise = MediaDataDecoder::InitPromise;
using TokenPromise = GlobalAllocPolicy::Promise;
using Token = GlobalAllocPolicy::Token;
using TokenPromise = AllocPolicy::Promise;
using Token = AllocPolicy::Token;
public:
explicit DecoderFactory(MediaFormatReader* aOwner)

Просмотреть файл

@ -18,18 +18,62 @@ namespace mozilla {
using TrackType = TrackInfo::TrackType;
StaticMutex GlobalAllocPolicy::sMutex;
class GlobalAllocPolicy::AutoDeallocToken : public Token {
class AllocPolicyImpl::AutoDeallocToken : public Token {
public:
explicit AutoDeallocToken(GlobalAllocPolicy& aPolicy) : mPolicy(aPolicy) {}
explicit AutoDeallocToken(const RefPtr<AllocPolicyImpl>& aPolicy)
: mPolicy(aPolicy) {}
private:
~AutoDeallocToken() { mPolicy.Dealloc(); }
~AutoDeallocToken() { mPolicy->Dealloc(); }
GlobalAllocPolicy& mPolicy; // reference to a singleton object.
RefPtr<AllocPolicyImpl> mPolicy;
};
AllocPolicyImpl::AllocPolicyImpl(int aDecoderLimit)
: mMaxDecoderLimit(aDecoderLimit),
mMonitor("AllocPolicyImpl"),
mDecoderLimit(aDecoderLimit) {}
AllocPolicyImpl::~AllocPolicyImpl() { RejectAll(); }
auto AllocPolicyImpl::Alloc() -> RefPtr<Promise> {
ReentrantMonitorAutoEnter mon(mMonitor);
// No decoder limit set.
if (mDecoderLimit < 0) {
return Promise::CreateAndResolve(new Token(), __func__);
}
RefPtr<PromisePrivate> p = new PromisePrivate(__func__);
mPromises.push(p);
ResolvePromise(mon);
return p.forget();
}
void AllocPolicyImpl::Dealloc() {
ReentrantMonitorAutoEnter mon(mMonitor);
++mDecoderLimit;
ResolvePromise(mon);
}
void AllocPolicyImpl::ResolvePromise(ReentrantMonitorAutoEnter& aProofOfLock) {
MOZ_ASSERT(mDecoderLimit >= 0);
if (mDecoderLimit > 0 && !mPromises.empty()) {
--mDecoderLimit;
RefPtr<PromisePrivate> p = mPromises.front().forget();
mPromises.pop();
p->Resolve(new AutoDeallocToken(this), __func__);
}
}
void AllocPolicyImpl::RejectAll() {
ReentrantMonitorAutoEnter mon(mMonitor);
while (!mPromises.empty()) {
RefPtr<PromisePrivate> p = mPromises.front().forget();
mPromises.pop();
p->Reject(true, __func__);
}
}
static int32_t MediaDecoderLimitDefault() {
#ifdef MOZ_WIDGET_ANDROID
if (jni::GetAPIVersion() < 18) {
@ -42,109 +86,88 @@ static int32_t MediaDecoderLimitDefault() {
return -1;
}
GlobalAllocPolicy::GlobalAllocPolicy()
: mMonitor("DecoderAllocPolicy::mMonitor"),
mDecoderLimit(MediaDecoderLimitDefault()) {
SystemGroup::Dispatch(
TaskCategory::Other,
NS_NewRunnableFunction("GlobalAllocPolicy::GlobalAllocPolicy", [this]() {
ClearOnShutdown(this, ShutdownPhase::ShutdownThreads);
}));
}
StaticMutex GlobalAllocPolicy::sMutex;
GlobalAllocPolicy::~GlobalAllocPolicy() {
while (!mPromises.empty()) {
RefPtr<PromisePrivate> p = mPromises.front().forget();
mPromises.pop();
p->Reject(true, __func__);
}
}
GlobalAllocPolicy& GlobalAllocPolicy::Instance(TrackType aTrack) {
NotNull<AllocPolicy*> GlobalAllocPolicy::Instance(TrackType aTrack) {
StaticMutexAutoLock lock(sMutex);
if (aTrack == TrackType::kAudioTrack) {
static auto sAudioPolicy = new GlobalAllocPolicy();
return *sAudioPolicy;
} else {
static auto sVideoPolicy = new GlobalAllocPolicy();
return *sVideoPolicy;
static RefPtr<AllocPolicyImpl> sAudioPolicy = []() {
SystemGroup::Dispatch(
TaskCategory::Other,
NS_NewRunnableFunction(
"GlobalAllocPolicy::GlobalAllocPolicy:Audio", []() {
ClearOnShutdown(&sAudioPolicy, ShutdownPhase::ShutdownThreads);
}));
return new AllocPolicyImpl(MediaDecoderLimitDefault());
}();
return WrapNotNull(sAudioPolicy.get());
}
static RefPtr<AllocPolicyImpl> sVideoPolicy = []() {
SystemGroup::Dispatch(
TaskCategory::Other,
NS_NewRunnableFunction(
"GlobalAllocPolicy::GlobalAllocPolicy:Audio", []() {
ClearOnShutdown(&sVideoPolicy, ShutdownPhase::ShutdownThreads);
}));
return new AllocPolicyImpl(MediaDecoderLimitDefault());
}();
return WrapNotNull(sVideoPolicy.get());
}
auto GlobalAllocPolicy::Alloc() -> RefPtr<Promise> {
// No decoder limit set.
if (mDecoderLimit < 0) {
return Promise::CreateAndResolve(new Token(), __func__);
}
class LocalAllocPolicy::AutoDeallocCombinedToken : public Token {
public:
AutoDeallocCombinedToken(already_AddRefed<Token> aLocalAllocPolicyToken,
already_AddRefed<Token> aGlobalAllocPolicyToken)
: mLocalToken(aLocalAllocPolicyToken),
mGlobalToken(aGlobalAllocPolicyToken) {}
ReentrantMonitorAutoEnter mon(mMonitor);
RefPtr<PromisePrivate> p = new PromisePrivate(__func__);
mPromises.push(p);
ResolvePromise(mon);
return p.forget();
}
private:
// Release tokens allocated from GlobalAllocPolicy and LocalAllocPolicy
// and process next token request if any.
~AutoDeallocCombinedToken() = default;
const RefPtr<Token> mLocalToken;
const RefPtr<Token> mGlobalToken;
};
void GlobalAllocPolicy::Dealloc() {
ReentrantMonitorAutoEnter mon(mMonitor);
++mDecoderLimit;
ResolvePromise(mon);
}
void GlobalAllocPolicy::ResolvePromise(
ReentrantMonitorAutoEnter& aProofOfLock) {
MOZ_ASSERT(mDecoderLimit >= 0);
if (mDecoderLimit > 0 && !mPromises.empty()) {
--mDecoderLimit;
RefPtr<PromisePrivate> p = mPromises.front().forget();
mPromises.pop();
p->Resolve(new AutoDeallocToken(*this), __func__);
}
}
void GlobalAllocPolicy::operator=(std::nullptr_t) { delete this; }
RefPtr<LocalAllocPolicy::Promise> LocalAllocPolicy::Alloc() {
MOZ_ASSERT(mOwnerThread->IsCurrentThreadIn());
MOZ_DIAGNOSTIC_ASSERT(mPendingPromise.IsEmpty());
RefPtr<Promise> p = mPendingPromise.Ensure(__func__);
if (mDecoderLimit > 0) {
ProcessRequest();
}
return p.forget();
}
void LocalAllocPolicy::ProcessRequest() {
MOZ_ASSERT(mOwnerThread->IsCurrentThreadIn());
MOZ_DIAGNOSTIC_ASSERT(mDecoderLimit > 0);
// No pending request.
if (mPendingPromise.IsEmpty()) {
return;
}
RefPtr<AutoDeallocToken> token = new AutoDeallocToken(this);
auto LocalAllocPolicy::Alloc() -> RefPtr<Promise> {
MOZ_DIAGNOSTIC_ASSERT(MaxDecoderLimit() == 1,
"We can only handle at most one token out at a time.");
RefPtr<LocalAllocPolicy> self = this;
return AllocPolicyImpl::Alloc()->Then(
mOwnerThread, __func__,
[self](RefPtr<Token> aToken) {
RefPtr<Token> localToken = aToken.forget();
RefPtr<Promise> p = self->mPendingPromise.Ensure(__func__);
GlobalAllocPolicy::Instance(self->mTrack)
->Alloc()
->Then(self->mOwnerThread, __func__,
[self, localToken = std::move(localToken)](
RefPtr<Token> aToken) mutable {
self->mTokenRequest.Complete();
RefPtr<Token> combinedToken = new AutoDeallocCombinedToken(
localToken.forget(), aToken.forget());
self->mPendingPromise.Resolve(combinedToken, __func__);
},
[self]() {
self->mTokenRequest.Complete();
self->mPendingPromise.Reject(true, __func__);
})
->Track(self->mTokenRequest);
return p;
},
[]() { return Promise::CreateAndReject(true, __func__); });
}
GlobalAllocPolicy::Instance(mTrack)
.Alloc()
->Then(mOwnerThread, __func__,
[self, token](RefPtr<Token> aToken) {
self->mTokenRequest.Complete();
token->Append(aToken);
self->mPendingPromise.Resolve(token, __func__);
},
[self, token]() {
self->mTokenRequest.Complete();
self->mPendingPromise.Reject(true, __func__);
})
->Track(mTokenRequest);
LocalAllocPolicy::~LocalAllocPolicy() {
mPendingPromise.RejectIfExists(true, __func__);
mTokenRequest.DisconnectIfExists();
}
void LocalAllocPolicy::Cancel() {
MOZ_ASSERT(mOwnerThread->IsCurrentThreadIn());
mPendingPromise.RejectIfExists(true, __func__);
mTokenRequest.DisconnectIfExists();
RejectAll();
}
AllocationWrapper::AllocationWrapper(
@ -187,7 +210,7 @@ AllocationWrapper::CreateDecoder(const CreateDecoderParams& aParams) {
RefPtr<AllocateDecoderPromise> p =
GlobalAllocPolicy::Instance(aParams.mType)
.Alloc()
->Alloc()
->Then(AbstractThread::GetCurrent(), __func__,
[=](RefPtr<Token> aToken) {
// result may not always be updated by

Просмотреть файл

@ -12,51 +12,79 @@
#include "PlatformDecoderModule.h"
#include "TimeUnits.h"
#include "mozilla/MozPromise.h"
#include "mozilla/NotNull.h"
#include "mozilla/ReentrantMonitor.h"
#include "mozilla/StaticMutex.h"
namespace mozilla {
/**
* This is a singleton which controls the number of decoders that can be
* created concurrently. Before calling PDMFactory::CreateDecoder(), Alloc()
* must be called to get a token object as a permission to create a decoder.
* The token should stay alive until Shutdown() is called on the decoder.
* The destructor of the token will restore the decoder count so it is available
* Before calling PDMFactory::CreateDecoder(), Alloc() must be called on the
* policy to get a token object as a permission to create a decoder. The
* token should stay alive until Shutdown() is called on the decoder. The
* destructor of the token will restore the decoder count so it is available
* for next calls of Alloc().
*/
class GlobalAllocPolicy {
class AllocPolicy {
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AllocPolicy)
public:
class Token {
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(Token)
protected:
virtual ~Token() {}
virtual ~Token() = default;
};
using Promise = MozPromise<RefPtr<Token>, bool, true>;
// Acquire a token for decoder creation. Thread-safe.
RefPtr<Promise> Alloc();
virtual RefPtr<Promise> Alloc() = 0;
// Called by ClearOnShutdown() to delete the singleton.
void operator=(decltype(nullptr));
protected:
virtual ~AllocPolicy() = default;
};
/**
* This is a singleton which controls the number of decoders that can be created
* concurrently.
* Instance() will return the TrackType global AllocPolicy.
* Instance() will always return a non-null value.
*/
class GlobalAllocPolicy {
public:
// Get the singleton for the given track type. Thread-safe.
static GlobalAllocPolicy& Instance(TrackInfo::TrackType aTrack);
static NotNull<AllocPolicy*> Instance(TrackInfo::TrackType aTrack);
private:
// Protect access to Instance().
static StaticMutex sMutex;
};
/** This the actual base implementation underneath all AllocPolicy objects and
* control how many decoders can be created concurrently.
* Alloc() must be called to get a token object as a permission to perform an
* action. The token should stay alive until Shutdown() is called on the
* decoder. The destructor of the token will restore the decoder count so it is
* available for next calls of Alloc().
**/
class AllocPolicyImpl : public AllocPolicy {
public:
explicit AllocPolicyImpl(int aDecoderLimit);
RefPtr<Promise> Alloc() override;
protected:
virtual ~AllocPolicyImpl();
void RejectAll();
int MaxDecoderLimit() const { return mMaxDecoderLimit; }
private:
class AutoDeallocToken;
using PromisePrivate = Promise::Private;
GlobalAllocPolicy();
~GlobalAllocPolicy();
// Called by the destructor of TokenImpl to restore the decoder limit.
void Dealloc();
// Decrement the decoder limit and resolve a promise if available.
void ResolvePromise(ReentrantMonitorAutoEnter& aProofOfLock);
// Protect access to Instance().
static StaticMutex sMutex;
const int mMaxDecoderLimit;
ReentrantMonitor mMonitor;
// The number of decoders available for creation.
int mDecoderLimit;
@ -68,58 +96,23 @@ class GlobalAllocPolicy {
* This class allows to track and serialise a single decoder allocation at a
* time
*/
class LocalAllocPolicy {
class LocalAllocPolicy : public AllocPolicyImpl {
using TrackType = TrackInfo::TrackType;
using Promise = GlobalAllocPolicy::Promise;
using Token = GlobalAllocPolicy::Token;
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(LocalAllocPolicy)
public:
LocalAllocPolicy(TrackType aTrack, TaskQueue* aOwnerThread)
: mTrack(aTrack), mOwnerThread(aOwnerThread) {}
: AllocPolicyImpl(1), mTrack(aTrack), mOwnerThread(aOwnerThread) {}
// Acquire a token for decoder creation. Note the resolved token will
// aggregate a GlobalAllocPolicy token to comply to its policy. Note
// this function shouldn't be called again until the returned promise
// is resolved or rejected.
RefPtr<Promise> Alloc();
RefPtr<Promise> Alloc() override;
// Cancel the request to GlobalAllocPolicy and reject the current token
// request. Note this must happen before mOwnerThread->BeginShutdown().
void Cancel();
private:
/*
* An RAII class to manage LocalAllocPolicy::mDecoderLimit.
*/
class AutoDeallocToken : public Token {
public:
explicit AutoDeallocToken(LocalAllocPolicy* aOwner) : mOwner(aOwner) {
MOZ_DIAGNOSTIC_ASSERT(mOwner->mDecoderLimit > 0);
--mOwner->mDecoderLimit;
}
// Aggregate a GlobalAllocPolicy token to present a single instance of
// Token to the client so the client doesn't have to deal with
// GlobalAllocPolicy and LocalAllocPolicy separately.
void Append(Token* aToken) { mToken = aToken; }
class AutoDeallocCombinedToken;
virtual ~LocalAllocPolicy();
private:
// Release tokens allocated from GlobalAllocPolicy and LocalAllocPolicy
// and process next token request if any.
~AutoDeallocToken() {
mToken = nullptr; // Dealloc the global token.
++mOwner->mDecoderLimit; // Dealloc the local token.
mOwner->ProcessRequest(); // Process next pending request.
}
RefPtr<LocalAllocPolicy> mOwner;
RefPtr<Token> mToken;
};
~LocalAllocPolicy() = default;
void ProcessRequest();
int mDecoderLimit = 1;
const TrackType mTrack;
RefPtr<TaskQueue> mOwnerThread;
MozPromiseHolder<Promise> mPendingPromise;
@ -127,7 +120,7 @@ class LocalAllocPolicy {
};
class AllocationWrapper : public MediaDataDecoder {
using Token = GlobalAllocPolicy::Token;
using Token = AllocPolicy::Token;
public:
AllocationWrapper(already_AddRefed<MediaDataDecoder> aDecoder,