gecko-dev/gfx/2d/JobScheduler.cpp

251 строка
7.1 KiB
C++

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "JobScheduler.h"
#include "Logging.h"
namespace mozilla {
namespace gfx {
JobScheduler* JobScheduler::sSingleton = nullptr;
bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues) {
MOZ_ASSERT(!sSingleton);
MOZ_ASSERT(aNumThreads >= aNumQueues);
sSingleton = new JobScheduler();
sSingleton->mNextQueue = 0;
for (uint32_t i = 0; i < aNumQueues; ++i) {
sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
}
for (uint32_t i = 0; i < aNumThreads; ++i) {
sSingleton->mWorkerThreads.push_back(
WorkerThread::Create(sSingleton->mDrawingQueues[i % aNumQueues]));
}
return true;
}
void JobScheduler::ShutDown() {
MOZ_ASSERT(IsEnabled());
if (!IsEnabled()) {
return;
}
for (auto queue : sSingleton->mDrawingQueues) {
queue->ShutDown();
delete queue;
}
for (WorkerThread* thread : sSingleton->mWorkerThreads) {
// this will block until the thread is joined.
delete thread;
}
sSingleton->mWorkerThreads.clear();
delete sSingleton;
sSingleton = nullptr;
}
JobStatus JobScheduler::ProcessJob(Job* aJob) {
MOZ_ASSERT(aJob);
auto status = aJob->Run();
if (status == JobStatus::Error || status == JobStatus::Complete) {
delete aJob;
}
return status;
}
void JobScheduler::SubmitJob(Job* aJob) {
MOZ_ASSERT(aJob);
RefPtr<SyncObject> start = aJob->GetStartSync();
if (start && start->Register(aJob)) {
// The Job buffer starts with a non-signaled sync object, it
// is now registered in the list of task buffers waiting on the
// sync object, so we should not place it in the queue.
return;
}
GetQueueForJob(aJob)->SubmitJob(aJob);
}
void JobScheduler::Join(SyncObject* aCompletion) {
RefPtr<EventObject> waitForCompletion = new EventObject();
JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
waitForCompletion->Wait();
}
MultiThreadedJobQueue* JobScheduler::GetQueueForJob(Job* aJob) {
return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
: GetDrawingQueue();
}
Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
: mNextWaitingJob(nullptr),
mStartSync(aStart),
mCompletionSync(aCompletion),
mPinToThread(aThread) {
if (mStartSync) {
mStartSync->AddSubsequent(this);
}
if (mCompletionSync) {
mCompletionSync->AddPrerequisite(this);
}
}
Job::~Job() {
if (mCompletionSync) {
// printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
mCompletionSync->Signal();
mCompletionSync = nullptr;
}
}
JobStatus SetEventJob::Run() {
mEvent->Set();
return JobStatus::Complete;
}
SetEventJob::SetEventJob(EventObject* aEvent, SyncObject* aStart,
SyncObject* aCompletion, WorkerThread* aWorker)
: Job(aStart, aCompletion, aWorker), mEvent(aEvent) {}
SetEventJob::~SetEventJob() {}
SyncObject::SyncObject(uint32_t aNumPrerequisites)
: mSignals(aNumPrerequisites),
mFirstWaitingJob(nullptr)
#ifdef DEBUG
,
mNumPrerequisites(aNumPrerequisites),
mAddedPrerequisites(0)
#endif
{
}
SyncObject::~SyncObject() { MOZ_ASSERT(mFirstWaitingJob == nullptr); }
bool SyncObject::Register(Job* aJob) {
MOZ_ASSERT(aJob);
// For now, ensure that when we schedule the first subsequent, we have already
// created all of the prerequisites. This is an arbitrary restriction because
// we specify the number of prerequisites in the constructor, but in the
// typical scenario, if the assertion FreezePrerequisite blows up here it
// probably means we got the initial nmber of prerequisites wrong. We can
// decide to remove this restriction if needed.
FreezePrerequisites();
int32_t signals = mSignals;
if (signals > 0) {
AddWaitingJob(aJob);
// Since Register and Signal can be called concurrently, it can happen that
// reading mSignals in Register happens before decrementing mSignals in
// Signal, but SubmitWaitingJobs happens before AddWaitingJob. This ordering
// means the SyncObject ends up in the signaled state with a task sitting in
// the waiting list. To prevent that we check mSignals a second time and
// submit again if signals reached zero in the mean time. We do this instead
// of holding a mutex around mSignals+mJobs to reduce lock contention.
int32_t signals2 = mSignals;
if (signals2 == 0) {
SubmitWaitingJobs();
}
return true;
}
return false;
}
void SyncObject::Signal() {
int32_t signals = --mSignals;
MOZ_ASSERT(signals >= 0);
if (signals == 0) {
SubmitWaitingJobs();
}
}
void SyncObject::AddWaitingJob(Job* aJob) {
// Push (using atomics) the task into the list of waiting tasks.
for (;;) {
Job* first = mFirstWaitingJob;
aJob->mNextWaitingJob = first;
if (mFirstWaitingJob.compareExchange(first, aJob)) {
break;
}
}
}
void SyncObject::SubmitWaitingJobs() {
// Scheduling the tasks can cause code that modifies <this>'s reference
// count to run concurrently, and cause the caller of this function to
// be owned by another thread. We need to make sure the reference count
// does not reach 0 on another thread before the end of this method, so
// hold a strong ref to prevent that!
RefPtr<SyncObject> kungFuDeathGrip(this);
// First atomically swap mFirstWaitingJob and waitingJobs...
Job* waitingJobs = nullptr;
for (;;) {
waitingJobs = mFirstWaitingJob;
if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
break;
}
}
// ... and submit all of the waiting tasks in waitingJob now that they belong
// to this thread.
while (waitingJobs) {
Job* next = waitingJobs->mNextWaitingJob;
waitingJobs->mNextWaitingJob = nullptr;
JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
waitingJobs = next;
}
}
bool SyncObject::IsSignaled() { return mSignals == 0; }
void SyncObject::FreezePrerequisites() {
MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
}
void SyncObject::AddPrerequisite(Job* aJob) {
MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
}
void SyncObject::AddSubsequent(Job* aJob) {}
WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
: mQueue(aJobQueue) {
aJobQueue->RegisterThread();
}
void WorkerThread::Run() {
SetName("gfx worker");
for (;;) {
Job* commands = nullptr;
if (!mQueue->WaitForJob(commands)) {
mQueue->UnregisterThread();
return;
}
JobStatus status = JobScheduler::ProcessJob(commands);
if (status == JobStatus::Error) {
// Don't try to handle errors for now, but that's open to discussions.
// I expect errors to be mostly OOM issues.
gfxDevCrash(LogReason::JobStatusError)
<< "Invalid job status " << (int)status;
}
}
}
} // namespace gfx
} // namespace mozilla