/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=2 sw=2 sts=2 et cindent: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #if !defined(StateMirroring_h_) #define StateMirroring_h_ #include "MediaPromise.h" #include "StateWatching.h" #include "TaskDispatcher.h" #include "mozilla/Maybe.h" #include "mozilla/UniquePtr.h" #include "mozilla/unused.h" #include "prlog.h" #include "nsISupportsImpl.h" /* * The state-mirroring machinery allows pieces of interesting state to be * observed on multiple thread without locking. The basic strategy is to track * changes in a canonical value and post updates to other threads that hold * mirrors for that value. * * One problem with the naive implementation of such a system is that some pieces * of state need to be updated atomically, and certain other operations need to * wait for these atomic updates to complete before executing. The state-mirroring * machinery solves this problem by requiring that its owner thread uses tail * dispatch, and posting state update events (which should always be run first by * TaskDispatcher implementations) to that tail dispatcher. This ensures that * state changes are always atomic from the perspective of observing threads. * * Given that state-mirroring is an automatic background process, we try to avoid * burdening the caller with worrying too much about teardown. To that end, we * don't assert dispatch success for any of the notifications, and assume that * any canonical or mirror owned by a thread for whom dispatch fails will soon * be disconnected by its holder anyway. * * Given that semantics may change and comments tend to go out of date, we * deliberately don't provide usage examples here. Grep around to find them. */ namespace mozilla { // Mirror and Canonical inherit WatchTarget, so we piggy-back on the // logging that WatchTarget already does. Given that, it makes sense to share // the same log module. #define MIRROR_LOG(x, ...) \ MOZ_ASSERT(gStateWatchingLog); \ PR_LOG(gStateWatchingLog, PR_LOG_DEBUG, (x, ##__VA_ARGS__)) template class AbstractMirror; /* * AbstractCanonical is a superclass from which all Canonical values must * inherit. It serves as the interface of operations which may be performed (via * asynchronous dispatch) by other threads, in particular by the corresponding * Mirror value. */ template class AbstractCanonical { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractCanonical) AbstractCanonical(AbstractThread* aThread) : mOwnerThread(aThread) {} virtual void AddMirror(AbstractMirror* aMirror) = 0; virtual void RemoveMirror(AbstractMirror* aMirror) = 0; AbstractThread* OwnerThread() const { return mOwnerThread; } protected: virtual ~AbstractCanonical() {} nsRefPtr mOwnerThread; }; /* * AbstractMirror is a superclass from which all Mirror values must * inherit. It serves as the interface of operations which may be performed (via * asynchronous dispatch) by other threads, in particular by the corresponding * Canonical value. */ template class AbstractMirror { public: NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AbstractMirror) AbstractMirror(AbstractThread* aThread) : mOwnerThread(aThread) {} virtual void UpdateValue(const T& aNewValue) = 0; virtual void NotifyDisconnected() = 0; AbstractThread* OwnerThread() const { return mOwnerThread; } protected: virtual ~AbstractMirror() {} nsRefPtr mOwnerThread; }; /* * Canonical is a wrapper class that allows a given value to be mirrored by other * threads. It maintains a list of active mirrors, and queues updates for them * when the internal value changes. When changing the value, the caller needs to * pass a TaskDispatcher object, which fires the updates at the appropriate time. * Canonical is also a WatchTarget, and may be set up to trigger other routines * (on the same thread) when the canonical value changes. * * Do not instantiate a Canonical directly as a member. Instead, instantiate a * Canonical::Holder, which handles lifetime issues and may eventually be * extended to do other things as well. */ template class Canonical : public AbstractCanonical, public WatchTarget { public: using AbstractCanonical::OwnerThread; Canonical(AbstractThread* aThread, const T& aInitialValue, const char* aName) : AbstractCanonical(aThread), WatchTarget(aName), mValue(aInitialValue) { MIRROR_LOG("%s [%p] initialized", mName, this); MOZ_ASSERT(aThread->RequiresTailDispatch(), "Can't get coherency without tail dispatch"); } void AddMirror(AbstractMirror* aMirror) override { MIRROR_LOG("%s [%p] adding mirror %p", mName, this, aMirror); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(!mMirrors.Contains(aMirror)); mMirrors.AppendElement(aMirror); aMirror->OwnerThread()->Dispatch(MakeNotifier(aMirror), AbstractThread::DontAssertDispatchSuccess); } void RemoveMirror(AbstractMirror* aMirror) override { MIRROR_LOG("%s [%p] removing mirror %p", mName, this, aMirror); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(mMirrors.Contains(aMirror)); mMirrors.RemoveElement(aMirror); } void DisconnectAll() { MIRROR_LOG("%s [%p] Disconnecting all mirrors", mName, this); for (size_t i = 0; i < mMirrors.Length(); ++i) { nsCOMPtr r = NS_NewRunnableMethod(mMirrors[i], &AbstractMirror::NotifyDisconnected); mMirrors[i]->OwnerThread()->Dispatch(r.forget(), AbstractThread::DontAssertDispatchSuccess); } mMirrors.Clear(); } operator const T&() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); return mValue; } void Set(const T& aNewValue) { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); if (aNewValue == mValue) { return; } // Notify same-thread watchers. The state watching machinery will make sure // that notifications run at the right time. NotifyWatchers(); // Check if we've already got a pending update. If so we won't schedule another // one. bool alreadyNotifying = mInitialValue.isSome(); // Stash the initial value if needed, then update to the new value. if (mInitialValue.isNothing()) { mInitialValue.emplace(mValue); } mValue = aNewValue; // We wait until things have stablized before sending state updates so that // we can avoid sending multiple updates, and possibly avoid sending any // updates at all if the value ends up where it started. if (!alreadyNotifying) { nsCOMPtr r = NS_NewRunnableMethod(this, &Canonical::DoNotify); AbstractThread::GetCurrent()->TailDispatcher().AddDirectTask(r.forget()); } } Canonical& operator=(const T& aNewValue) { Set(aNewValue); return *this; } Canonical& operator=(const Canonical& aOther) { Set(aOther); return *this; } Canonical(const Canonical& aOther) = delete; class Holder { public: Holder() {} ~Holder() { MOZ_DIAGNOSTIC_ASSERT(mCanonical, "Should have initialized me"); } // NB: Because mirror-initiated disconnection can race with canonical- // initiated disconnection, a canonical should never be reinitialized. void Init(AbstractThread* aThread, const T& aInitialValue, const char* aName) { mCanonical = new Canonical(aThread, aInitialValue, aName); } // Forward control operations to the Canonical. void DisconnectAll() { return mCanonical->DisconnectAll(); } // Access to the Canonical. operator Canonical&() { return *mCanonical; } Canonical* operator&() { return mCanonical; } // Access to the T. const T& Ref() const { return *mCanonical; } operator const T&() const { return Ref(); } void Set(const T& aNewValue) { mCanonical->Set(aNewValue); } Holder& operator=(const T& aNewValue) { Set(aNewValue); return *this; } Holder& operator=(const Holder& aOther) { Set(aOther); return *this; } Holder(const Holder& aOther) = delete; private: nsRefPtr> mCanonical; }; protected: ~Canonical() { MOZ_DIAGNOSTIC_ASSERT(mMirrors.IsEmpty()); } private: void DoNotify() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); MOZ_ASSERT(mInitialValue.isSome()); bool same = mInitialValue.ref() == mValue; mInitialValue.reset(); if (same) { MIRROR_LOG("%s [%p] unchanged - not sending update", mName, this); return; } for (size_t i = 0; i < mMirrors.Length(); ++i) { OwnerThread()->TailDispatcher().AddStateChangeTask(mMirrors[i]->OwnerThread(), MakeNotifier(mMirrors[i])); } } already_AddRefed MakeNotifier(AbstractMirror* aMirror) { nsCOMPtr r = NS_NewRunnableMethodWithArg(aMirror, &AbstractMirror::UpdateValue, mValue); return r.forget(); } T mValue; Maybe mInitialValue; nsTArray>> mMirrors; }; /* * Mirror is a wrapper class that allows a given value to mirror that of a * Canonical owned by another thread. It registers itself with a Canonical, * and is periodically updated with new values. Mirror is also a WatchTarget, * and may be set up to trigger other routines (on the same thread) when the * mirrored value changes. * * Do not instantiate a Mirror directly as a member. Instead, instantiate a * Mirror::Holder, which handles lifetime issues and whose destructor * initiates an asynchronous teardown of the reference-counted Mirror, * breaking the inherent cycle between Mirror and Canonical. */ template class Mirror : public AbstractMirror, public WatchTarget { public: using AbstractMirror::OwnerThread; Mirror(AbstractThread* aThread, const T& aInitialValue, const char* aName, AbstractCanonical* aCanonical) : AbstractMirror(aThread), WatchTarget(aName), mValue(aInitialValue) { MIRROR_LOG("%s [%p] initialized", mName, this); if (aCanonical) { ConnectInternal(aCanonical); } } operator const T&() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); return mValue; } virtual void UpdateValue(const T& aNewValue) override { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); if (mValue != aNewValue) { mValue = aNewValue; WatchTarget::NotifyWatchers(); } } virtual void NotifyDisconnected() override { MIRROR_LOG("%s [%p] Notifed of disconnection from %p", mName, this, mCanonical.get()); MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); mCanonical = nullptr; } bool IsConnected() const { return !!mCanonical; } void Connect(AbstractCanonical* aCanonical) { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); ConnectInternal(aCanonical); } private: // We separate the guts of Connect into a helper so that we can call it from // initialization while not necessarily on the owner thread. void ConnectInternal(AbstractCanonical* aCanonical) { MIRROR_LOG("%s [%p] Connecting to %p", mName, this, aCanonical); MOZ_ASSERT(!IsConnected()); nsCOMPtr r = NS_NewRunnableMethodWithArg>> (aCanonical, &AbstractCanonical::AddMirror, this); aCanonical->OwnerThread()->Dispatch(r.forget(), AbstractThread::DontAssertDispatchSuccess); mCanonical = aCanonical; } public: void DisconnectIfConnected() { MOZ_ASSERT(OwnerThread()->IsCurrentThreadIn()); if (!IsConnected()) { return; } MIRROR_LOG("%s [%p] Disconnecting from %p", mName, this, mCanonical.get()); nsCOMPtr r = NS_NewRunnableMethodWithArg>> (mCanonical, &AbstractCanonical::RemoveMirror, this); mCanonical->OwnerThread()->Dispatch(r.forget(), AbstractThread::DontAssertDispatchSuccess); mCanonical = nullptr; } class Holder { public: Holder() {} ~Holder() { MOZ_DIAGNOSTIC_ASSERT(mMirror, "Should have initialized me"); if (mMirror->OwnerThread()->IsCurrentThreadIn()) { mMirror->DisconnectIfConnected(); } else { // If holder destruction happens on a thread other than the mirror's // owner thread, manual disconnection is mandatory. We should make this // more automatic by hooking it up to task queue shutdown. MOZ_DIAGNOSTIC_ASSERT(!mMirror->IsConnected()); } } // NB: Because mirror-initiated disconnection can race with canonical- // initiated disconnection, a mirror should never be reinitialized. void Init(AbstractThread* aThread, const T& aInitialValue, const char* aName, AbstractCanonical* aCanonical = nullptr) { mMirror = new Mirror(aThread, aInitialValue, aName, aCanonical); } // Forward control operations to the Mirror. void Connect(AbstractCanonical* aCanonical) { mMirror->Connect(aCanonical); } void DisconnectIfConnected() { mMirror->DisconnectIfConnected(); } // Access to the Mirror. operator Mirror&() { return *mMirror; } Mirror* operator&() { return mMirror; } // Access to the T. const T& Ref() const { return *mMirror; } operator const T&() const { return Ref(); } private: nsRefPtr> mMirror; }; protected: ~Mirror() { MOZ_DIAGNOSTIC_ASSERT(!IsConnected()); } private: T mValue; nsRefPtr> mCanonical; }; #undef MIRROR_LOG } // namespace mozilla #endif