/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- * vim: set sw=2 ts=8 et tw=80 : */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #ifndef mozilla_net_ChannelEventQueue_h #define mozilla_net_ChannelEventQueue_h #include "nsTArray.h" #include "nsAutoPtr.h" #include "nsIEventTarget.h" #include "nsThreadUtils.h" #include "nsXULAppAPI.h" #include "mozilla/DebugOnly.h" #include "mozilla/Mutex.h" #include "mozilla/RecursiveMutex.h" #include "mozilla/UniquePtr.h" #include "mozilla/Unused.h" class nsISupports; namespace mozilla { namespace net { class ChannelEvent { public: ChannelEvent() { MOZ_COUNT_CTOR(ChannelEvent); } virtual ~ChannelEvent() { MOZ_COUNT_DTOR(ChannelEvent); } virtual void Run() = 0; virtual already_AddRefed GetEventTarget() = 0; }; // Note that MainThreadChannelEvent should not be used in child process since // GetEventTarget() directly returns an unlabeled event target. class MainThreadChannelEvent : public ChannelEvent { public: MainThreadChannelEvent() { MOZ_COUNT_CTOR(MainThreadChannelEvent); } virtual ~MainThreadChannelEvent() { MOZ_COUNT_DTOR(MainThreadChannelEvent); } already_AddRefed GetEventTarget() override { MOZ_ASSERT(XRE_IsParentProcess()); return do_AddRef(GetMainThreadEventTarget()); } }; // This event is designed to be only used for e10s child channels. // The goal is to force the child channel to implement GetNeckoTarget() // which should return a labeled main thread event target so that this // channel event can be dispatched correctly. template class NeckoTargetChannelEvent : public ChannelEvent { public: explicit NeckoTargetChannelEvent(T* aChild) : mChild(aChild) { MOZ_COUNT_CTOR(NeckoTargetChannelEvent); } virtual ~NeckoTargetChannelEvent() { MOZ_COUNT_DTOR(NeckoTargetChannelEvent); } already_AddRefed GetEventTarget() override { MOZ_ASSERT(mChild); return mChild->GetNeckoTarget(); } protected: T* mChild; }; // Workaround for Necko re-entrancy dangers. We buffer IPDL messages in a // queue if still dispatching previous one(s) to listeners/observers. // Otherwise synchronous XMLHttpRequests and/or other code that spins the // event loop (ex: IPDL rpc) could cause listener->OnDataAvailable (for // instance) to be dispatched and called before mListener->OnStartRequest has // completed. class ChannelEventQueue final { NS_INLINE_DECL_THREADSAFE_REFCOUNTING(ChannelEventQueue) public: explicit ChannelEventQueue(nsISupports* owner) : mSuspendCount(0), mSuspended(false), mForcedCount(0), mFlushing(false), mHasCheckedForXMLHttpRequest(false), mForXMLHttpRequest(false), mOwner(owner), mMutex("ChannelEventQueue::mMutex"), mRunningMutex("ChannelEventQueue::mRunningMutex") {} // Puts IPDL-generated channel event into queue, to be run later // automatically when EndForcedQueueing and/or Resume is called. // // @param aCallback - the ChannelEvent // @param aAssertionWhenNotQueued - this optional param will be used in an // assertion when the event is executed directly. inline void RunOrEnqueue(ChannelEvent* aCallback, bool aAssertionWhenNotQueued = false); // Append ChannelEvent in front of the event queue. inline nsresult PrependEvent(UniquePtr& aEvent); inline nsresult PrependEvents(nsTArray>& aEvents); // After StartForcedQueueing is called, RunOrEnqueue() will start enqueuing // events that will be run/flushed when EndForcedQueueing is called. // - Note: queueing may still be required after EndForcedQueueing() (if the // queue is suspended, etc): always call RunOrEnqueue() to avoid race // conditions. inline void StartForcedQueueing(); inline void EndForcedQueueing(); // Suspend/resume event queue. RunOrEnqueue() will start enqueuing // events and they will be run/flushed when resume is called. These should be // called when the channel owning the event queue is suspended/resumed. void Suspend(); // Resume flushes the queue asynchronously, i.e. items in queue will be // dispatched in a new event on the current thread. void Resume(); private: // Private destructor, to discourage deletion outside of Release(): ~ChannelEventQueue() {} void SuspendInternal(); void ResumeInternal(); bool MaybeSuspendIfEventsAreSuppressed(); inline void MaybeFlushQueue(); void FlushQueue(); inline void CompleteResume(); ChannelEvent* TakeEvent(); nsTArray> mEventQueue; uint32_t mSuspendCount; bool mSuspended; uint32_t mForcedCount; // Support ForcedQueueing on multiple thread. bool mFlushing; // Whether the queue is associated with an XHR. This is lazily instantiated // the first time it is needed. bool mHasCheckedForXMLHttpRequest; bool mForXMLHttpRequest; // Keep ptr to avoid refcount cycle: only grab ref during flushing. nsISupports* mOwner; // For atomic mEventQueue operation and state update Mutex mMutex; // To guarantee event execution order among threads RecursiveMutex mRunningMutex; friend class AutoEventEnqueuer; }; inline void ChannelEventQueue::RunOrEnqueue(ChannelEvent* aCallback, bool aAssertionWhenNotQueued) { MOZ_ASSERT(aCallback); // Events execution could be a destruction of the channel (and our own // destructor) unless we make sure its refcount doesn't drop to 0 while this // method is running. nsCOMPtr kungFuDeathGrip(mOwner); Unused << kungFuDeathGrip; // Not used in this function // To avoid leaks. UniquePtr event(aCallback); // To guarantee that the running event and all the events generated within // it will be finished before events on other threads. RecursiveMutexAutoLock lock(mRunningMutex); { MutexAutoLock lock(mMutex); bool enqueue = !!mForcedCount || mSuspended || mFlushing || !mEventQueue.IsEmpty() || MaybeSuspendIfEventsAreSuppressed(); if (enqueue) { mEventQueue.AppendElement(std::move(event)); return; } nsCOMPtr target = event->GetEventTarget(); MOZ_ASSERT(target); bool isCurrentThread = false; DebugOnly rv = target->IsOnCurrentThread(&isCurrentThread); MOZ_ASSERT(NS_SUCCEEDED(rv)); if (!isCurrentThread) { // Leverage Suspend/Resume mechanism to trigger flush procedure without // creating a new one. SuspendInternal(); mEventQueue.AppendElement(std::move(event)); ResumeInternal(); return; } } MOZ_RELEASE_ASSERT(!aAssertionWhenNotQueued); event->Run(); } inline void ChannelEventQueue::StartForcedQueueing() { MutexAutoLock lock(mMutex); ++mForcedCount; } inline void ChannelEventQueue::EndForcedQueueing() { bool tryFlush = false; { MutexAutoLock lock(mMutex); MOZ_ASSERT(mForcedCount > 0); if (!--mForcedCount) { tryFlush = true; } } if (tryFlush) { MaybeFlushQueue(); } } inline nsresult ChannelEventQueue::PrependEvent( UniquePtr& aEvent) { MutexAutoLock lock(mMutex); // Prepending event while no queue flush foreseen might cause the following // channel events not run. This assertion here guarantee there must be a // queue flush, either triggered by Resume or EndForcedQueueing, to execute // the added event. MOZ_ASSERT(mSuspended || !!mForcedCount); UniquePtr* newEvent = mEventQueue.InsertElementAt(0, std::move(aEvent)); if (!newEvent) { return NS_ERROR_OUT_OF_MEMORY; } return NS_OK; } inline nsresult ChannelEventQueue::PrependEvents( nsTArray>& aEvents) { MutexAutoLock lock(mMutex); // Prepending event while no queue flush foreseen might cause the following // channel events not run. This assertion here guarantee there must be a // queue flush, either triggered by Resume or EndForcedQueueing, to execute // the added events. MOZ_ASSERT(mSuspended || !!mForcedCount); UniquePtr* newEvents = mEventQueue.InsertElementsAt(0, aEvents.Length()); if (!newEvents) { return NS_ERROR_OUT_OF_MEMORY; } for (uint32_t i = 0; i < aEvents.Length(); i++) { newEvents[i] = std::move(aEvents[i]); } return NS_OK; } inline void ChannelEventQueue::CompleteResume() { bool tryFlush = false; { MutexAutoLock lock(mMutex); // channel may have been suspended again since Resume fired event to call // this. if (!mSuspendCount) { // we need to remain logically suspended (for purposes of queuing incoming // messages) until this point, else new incoming messages could run before // queued ones. mSuspended = false; tryFlush = true; } } if (tryFlush) { MaybeFlushQueue(); } } inline void ChannelEventQueue::MaybeFlushQueue() { // Don't flush if forced queuing on, we're already being flushed, or // suspended, or there's nothing to flush bool flushQueue = false; { MutexAutoLock lock(mMutex); flushQueue = !mForcedCount && !mFlushing && !mSuspended && !mEventQueue.IsEmpty() && !MaybeSuspendIfEventsAreSuppressed(); // Only one thread is allowed to run FlushQueue at a time. if (flushQueue) { mFlushing = true; } } if (flushQueue) { FlushQueue(); } } // Ensures that RunOrEnqueue() will be collecting events during its lifetime // (letting caller know incoming IPDL msgs should be queued). Flushes the queue // when it goes out of scope. class MOZ_STACK_CLASS AutoEventEnqueuer { public: explicit AutoEventEnqueuer(ChannelEventQueue* queue) : mEventQueue(queue), mOwner(queue->mOwner) { mEventQueue->StartForcedQueueing(); } ~AutoEventEnqueuer() { mEventQueue->EndForcedQueueing(); } private: RefPtr mEventQueue; // Ensure channel object lives longer than ChannelEventQueue. nsCOMPtr mOwner; }; } // namespace net } // namespace mozilla #endif