/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=2 sw=2 sts=2 et cindent: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #if !defined(TaskDispatcher_h_) #define TaskDispatcher_h_ #include "mozilla/AbstractThread.h" #include "mozilla/UniquePtr.h" #include "mozilla/unused.h" #include "nsISupportsImpl.h" #include "nsTArray.h" #include "nsThreadUtils.h" #include namespace mozilla { /* * A classic approach to cross-thread communication is to dispatch asynchronous * runnables to perform updates on other threads. This generally works well, but * there are sometimes reasons why we might want to delay the actual dispatch of * these tasks until a specified moment. At present, this is primarily useful to * ensure that mirrored state gets updated atomically - but there may be other * applications as well. * * TaskDispatcher is a general abstract class that accepts tasks and dispatches * them at some later point. These groups of tasks are per-target-thread, and * contain separate queues for several kinds of tasks (see comments below). - "state change tasks" (which * run first, and are intended to be used to update the value held by mirrors), * and regular tasks, which are other arbitrary operations that the are gated * to run after all the state changes have completed. */ class TaskDispatcher { public: TaskDispatcher() {} virtual ~TaskDispatcher() {} // Direct tasks are run directly (rather than dispatched asynchronously) when // the tail dispatcher fires. A direct task may cause other tasks to be added // to the tail dispatcher. virtual void AddDirectTask(already_AddRefed aRunnable) = 0; // State change tasks are dispatched asynchronously always run before regular // tasks. They are intended to be used to update the value held by mirrors // before any other dispatched tasks are run on the target thread. virtual void AddStateChangeTask(AbstractThread* aThread, already_AddRefed aRunnable) = 0; // Regular tasks are dispatched asynchronously, and run after state change // tasks. virtual void AddTask(AbstractThread* aThread, already_AddRefed aRunnable, AbstractThread::DispatchFailureHandling aFailureHandling = AbstractThread::AssertDispatchSuccess) = 0; virtual void DispatchTasksFor(AbstractThread* aThread) = 0; virtual bool HasTasksFor(AbstractThread* aThread) = 0; virtual void DrainDirectTasks() = 0; }; /* * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires * its queued tasks when it is popped off the stack. */ class AutoTaskDispatcher : public TaskDispatcher { public: explicit AutoTaskDispatcher(bool aIsTailDispatcher = false) : mIsTailDispatcher(aIsTailDispatcher) {} ~AutoTaskDispatcher() { // Given that direct tasks may trigger other code that uses the tail // dispatcher, it's better to avoid processing them in the tail dispatcher's // destructor. So we require TailDispatchers to manually invoke // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth, // this is only necessary in the case where this AutoTaskDispatcher can be // accessed by the direct tasks it dispatches (true for TailDispatchers, but // potentially not true for other hypothetical AutoTaskDispatchers). Feel // free to loosen this restriction to apply only to mIsTailDispatcher if a // use-case requires it. MOZ_ASSERT(mDirectTasks.empty()); for (size_t i = 0; i < mTaskGroups.Length(); ++i) { DispatchTaskGroup(Move(mTaskGroups[i])); } } void DrainDirectTasks() override { while (!mDirectTasks.empty()) { nsCOMPtr r = mDirectTasks.front(); mDirectTasks.pop(); r->Run(); } } void AddDirectTask(already_AddRefed aRunnable) override { mDirectTasks.push(Move(aRunnable)); } void AddStateChangeTask(AbstractThread* aThread, already_AddRefed aRunnable) override { EnsureTaskGroup(aThread).mStateChangeTasks.AppendElement(aRunnable); } void AddTask(AbstractThread* aThread, already_AddRefed aRunnable, AbstractThread::DispatchFailureHandling aFailureHandling) override { PerThreadTaskGroup& group = EnsureTaskGroup(aThread); group.mRegularTasks.AppendElement(aRunnable); // The task group needs to assert dispatch success if any of the runnables // it's dispatching want to assert it. if (aFailureHandling == AbstractThread::AssertDispatchSuccess) { group.mFailureHandling = AbstractThread::AssertDispatchSuccess; } } bool HasTasksFor(AbstractThread* aThread) override { return !!GetTaskGroup(aThread) || (aThread == AbstractThread::GetCurrent() && !mDirectTasks.empty()); } void DispatchTasksFor(AbstractThread* aThread) override { for (size_t i = 0; i < mTaskGroups.Length(); ++i) { if (mTaskGroups[i]->mThread == aThread) { DispatchTaskGroup(Move(mTaskGroups[i])); mTaskGroups.RemoveElementAt(i); return; } } } private: struct PerThreadTaskGroup { public: explicit PerThreadTaskGroup(AbstractThread* aThread) : mThread(aThread), mFailureHandling(AbstractThread::DontAssertDispatchSuccess) { MOZ_COUNT_CTOR(PerThreadTaskGroup); } ~PerThreadTaskGroup() { MOZ_COUNT_DTOR(PerThreadTaskGroup); } nsRefPtr mThread; nsTArray> mStateChangeTasks; nsTArray> mRegularTasks; AbstractThread::DispatchFailureHandling mFailureHandling; }; class TaskGroupRunnable : public nsRunnable { public: explicit TaskGroupRunnable(UniquePtr&& aTasks) : mTasks(Move(aTasks)) {} NS_IMETHODIMP Run() { // State change tasks get run all together before any code is run, so // that all state changes are made in an atomic unit. for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) { mTasks->mStateChangeTasks[i]->Run(); } // Once the state changes have completed, drain any direct tasks // generated by those state changes (i.e. watcher notification tasks). // This needs to be outside the loop because we don't want to run code // that might observe intermediate states. MaybeDrainDirectTasks(); for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) { mTasks->mRegularTasks[i]->Run(); // Scope direct tasks tightly to the task that generated them. MaybeDrainDirectTasks(); } return NS_OK; } private: void MaybeDrainDirectTasks() { AbstractThread* currentThread = AbstractThread::GetCurrent(); if (currentThread) { currentThread->TailDispatcher().DrainDirectTasks(); } } UniquePtr mTasks; }; PerThreadTaskGroup& EnsureTaskGroup(AbstractThread* aThread) { PerThreadTaskGroup* existing = GetTaskGroup(aThread); if (existing) { return *existing; } mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread)); return *mTaskGroups.LastElement(); } PerThreadTaskGroup* GetTaskGroup(AbstractThread* aThread) { for (size_t i = 0; i < mTaskGroups.Length(); ++i) { if (mTaskGroups[i]->mThread == aThread) { return mTaskGroups[i].get(); } } // Not found. return nullptr; } void DispatchTaskGroup(UniquePtr aGroup) { nsRefPtr thread = aGroup->mThread; AbstractThread::DispatchFailureHandling failureHandling = aGroup->mFailureHandling; AbstractThread::DispatchReason reason = mIsTailDispatcher ? AbstractThread::TailDispatch : AbstractThread::NormalDispatch; nsCOMPtr r = new TaskGroupRunnable(Move(aGroup)); thread->Dispatch(r.forget(), failureHandling, reason); } // Direct tasks. std::queue> mDirectTasks; // Task groups, organized by thread. nsTArray> mTaskGroups; // True if this TaskDispatcher represents the tail dispatcher for the thread // upon which it runs. const bool mIsTailDispatcher; }; // Little utility class to allow declaring AutoTaskDispatcher as a default // parameter for methods that take a TaskDispatcher&. template class PassByRef { public: PassByRef() {} operator T&() { return mVal; } private: T mVal; }; } // namespace mozilla #endif