Bug 1320744 - Part 2, Allow ChannelEventQueue to perform flush on multiple threads. r=mayhemer

MozReview-Commit-ID: Egu2mvwFTUF

--HG--
extra : rebase_source : 5627acad147bb0881897d93a7eddd2715cf9f3c5
This commit is contained in:
Shih-Chiang Chien 2017-03-18 11:36:08 +08:00
Родитель 395c4b3244
Коммит a99b965bf8
11 изменённых файлов: 448 добавлений и 111 удалений

Просмотреть файл

@ -5,11 +5,12 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "ChannelEventQueue.h"
#include "mozilla/Assertions.h"
#include "mozilla/Unused.h"
#include "nsISupports.h" #include "nsISupports.h"
#include "mozilla/net/ChannelEventQueue.h"
#include "mozilla/Unused.h"
#include "nsThreadUtils.h" #include "nsThreadUtils.h"
#include "mozilla/Unused.h"
namespace mozilla { namespace mozilla {
namespace net { namespace net {
@ -39,29 +40,94 @@ ChannelEventQueue::FlushQueue()
nsCOMPtr<nsISupports> kungFuDeathGrip(mOwner); nsCOMPtr<nsISupports> kungFuDeathGrip(mOwner);
mozilla::Unused << kungFuDeathGrip; // Not used in this function mozilla::Unused << kungFuDeathGrip; // Not used in this function
// Prevent flushed events from flushing the queue recursively bool needResumeOnOtherThread = false;
{ {
MutexAutoLock lock(mMutex); // Don't allow event enqueued during flush to make sure all events
mFlushing = true; // are run.
} ReentrantMonitorAutoEnter monitor(mRunningMonitor);
while (true) { // Prevent flushed events from flushing the queue recursively
UniquePtr<ChannelEvent> event(TakeEvent()); {
if (!event) { MutexAutoLock lock(mMutex);
break; MOZ_ASSERT(!mFlushing);
mFlushing = true;
} }
event->Run(); while (true) {
UniquePtr<ChannelEvent> event(TakeEvent());
if (!event) {
break;
}
nsCOMPtr<nsIEventTarget> target = event->GetEventTarget();
MOZ_ASSERT(target);
bool isCurrentThread = false;
nsresult rv = target->IsOnCurrentThread(&isCurrentThread);
if (NS_WARN_IF(NS_FAILED(rv))) {
// Simply run this event on current thread if we are not sure about it
// in release channel, or assert in Aurora/Nightly channel.
MOZ_DIAGNOSTIC_ASSERT(false);
isCurrentThread = true;
}
if (!isCurrentThread) {
// Next event needs to run on another thread. Put it back to
// the front of the queue can try resume on that thread.
Suspend();
PrependEvent(event);
needResumeOnOtherThread = true;
break;
}
event->Run();
}
{
MutexAutoLock lock(mMutex);
MOZ_ASSERT(mFlushing);
mFlushing = false;
MOZ_ASSERT(mEventQueue.IsEmpty() || (needResumeOnOtherThread || mSuspended || !!mForcedCount));
}
} }
MutexAutoLock lock(mMutex); // The flush procedure is aborted because next event cannot be run on current
mFlushing = false; // thread. We need to resume the event processing right after flush procedure
// is finished.
// Note: we cannot call Resume() while "mFlushing == true" because
// CompleteResume will not trigger FlushQueue while there is an ongoing flush.
if (needResumeOnOtherThread) {
Resume();
}
} }
void void
ChannelEventQueue::Resume() ChannelEventQueue::Suspend()
{ {
MutexAutoLock lock(mMutex); MutexAutoLock lock(mMutex);
SuspendInternal();
}
void
ChannelEventQueue::SuspendInternal()
{
mMutex.AssertCurrentThreadOwns();
mSuspended = true;
mSuspendCount++;
}
void ChannelEventQueue::Resume()
{
MutexAutoLock lock(mMutex);
ResumeInternal();
}
void
ChannelEventQueue::ResumeInternal()
{
mMutex.AssertCurrentThreadOwns();
// Resuming w/o suspend: error in debug mode, ignore in build // Resuming w/o suspend: error in debug mode, ignore in build
MOZ_ASSERT(mSuspendCount > 0); MOZ_ASSERT(mSuspendCount > 0);
@ -70,44 +136,24 @@ ChannelEventQueue::Resume()
} }
if (!--mSuspendCount) { if (!--mSuspendCount) {
RefPtr<Runnable> event = if (mEventQueue.IsEmpty()) {
NewRunnableMethod(this, &ChannelEventQueue::CompleteResume); // Nothing in queue to flush, simply clear the flag.
if (mTargetThread) { mSuspended = false;
mTargetThread->Dispatch(event.forget(), NS_DISPATCH_NORMAL); return;
} else {
MOZ_RELEASE_ASSERT(NS_IsMainThread());
Unused << NS_WARN_IF(NS_FAILED(NS_DispatchToCurrentThread(event.forget())));
} }
// Worker thread requires a CancelableRunnable.
RefPtr<Runnable> event =
NewCancelableRunnableMethod(this, &ChannelEventQueue::CompleteResume);
nsCOMPtr<nsIEventTarget> target;
target = mEventQueue[0]->GetEventTarget();
MOZ_ASSERT(target);
Unused << NS_WARN_IF(NS_FAILED(target->Dispatch(event.forget(),
NS_DISPATCH_NORMAL)));
} }
} }
nsresult
ChannelEventQueue::RetargetDeliveryTo(nsIEventTarget* aTargetThread)
{
MOZ_RELEASE_ASSERT(NS_IsMainThread());
MOZ_RELEASE_ASSERT(!mTargetThread);
MOZ_RELEASE_ASSERT(aTargetThread);
mTargetThread = do_QueryInterface(aTargetThread);
MOZ_RELEASE_ASSERT(mTargetThread);
return NS_OK;
}
nsresult
ChannelEventQueue::ResetDeliveryTarget()
{
MutexAutoLock lock(mMutex);
MOZ_RELEASE_ASSERT(mEventQueue.IsEmpty());
MOZ_RELEASE_ASSERT(mSuspendCount == 0);
MOZ_RELEASE_ASSERT(!mSuspended);
MOZ_RELEASE_ASSERT(!mForced);
MOZ_RELEASE_ASSERT(!mFlushing);
mTargetThread = nullptr;
return NS_OK;
}
} // namespace net } // namespace net
} // namespace mozilla } // namespace mozilla

Просмотреть файл

@ -10,11 +10,15 @@
#include "nsTArray.h" #include "nsTArray.h"
#include "nsAutoPtr.h" #include "nsAutoPtr.h"
#include "nsIEventTarget.h"
#include "nsThreadUtils.h"
#include "mozilla/DebugOnly.h"
#include "mozilla/Mutex.h" #include "mozilla/Mutex.h"
#include "mozilla/ReentrantMonitor.h"
#include "mozilla/UniquePtr.h" #include "mozilla/UniquePtr.h"
#include "mozilla/Unused.h"
class nsISupports; class nsISupports;
class nsIEventTarget;
namespace mozilla { namespace mozilla {
namespace net { namespace net {
@ -25,6 +29,20 @@ class ChannelEvent
ChannelEvent() { MOZ_COUNT_CTOR(ChannelEvent); } ChannelEvent() { MOZ_COUNT_CTOR(ChannelEvent); }
virtual ~ChannelEvent() { MOZ_COUNT_DTOR(ChannelEvent); } virtual ~ChannelEvent() { MOZ_COUNT_DTOR(ChannelEvent); }
virtual void Run() = 0; virtual void Run() = 0;
virtual already_AddRefed<nsIEventTarget> GetEventTarget() = 0;
};
class MainThreadChannelEvent : public ChannelEvent
{
public:
MainThreadChannelEvent() { MOZ_COUNT_CTOR(MainThreadChannelEvent); }
virtual ~MainThreadChannelEvent() { MOZ_COUNT_DTOR(MainThreadChannelEvent); }
already_AddRefed<nsIEventTarget>
GetEventTarget() override
{
return do_GetMainThread();
}
}; };
// Workaround for Necko re-entrancy dangers. We buffer IPDL messages in a // Workaround for Necko re-entrancy dangers. We buffer IPDL messages in a
@ -42,10 +60,11 @@ class ChannelEventQueue final
explicit ChannelEventQueue(nsISupports *owner) explicit ChannelEventQueue(nsISupports *owner)
: mSuspendCount(0) : mSuspendCount(0)
, mSuspended(false) , mSuspended(false)
, mForced(false) , mForcedCount(0)
, mFlushing(false) , mFlushing(false)
, mOwner(owner) , mOwner(owner)
, mMutex("ChannelEventQueue::mMutex") , mMutex("ChannelEventQueue::mMutex")
, mRunningMonitor("ChannelEventQueue::mRunningMonitor")
{} {}
// Puts IPDL-generated channel event into queue, to be run later // Puts IPDL-generated channel event into queue, to be run later
@ -56,6 +75,9 @@ class ChannelEventQueue final
// assertion when the event is executed directly. // assertion when the event is executed directly.
inline void RunOrEnqueue(ChannelEvent* aCallback, inline void RunOrEnqueue(ChannelEvent* aCallback,
bool aAssertionWhenNotQueued = false); bool aAssertionWhenNotQueued = false);
// Append ChannelEvent in front of the event queue.
inline nsresult PrependEvent(UniquePtr<ChannelEvent>& aEvent);
inline nsresult PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents); inline nsresult PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents);
// After StartForcedQueueing is called, RunOrEnqueue() will start enqueuing // After StartForcedQueueing is called, RunOrEnqueue() will start enqueuing
@ -69,25 +91,20 @@ class ChannelEventQueue final
// Suspend/resume event queue. RunOrEnqueue() will start enqueuing // Suspend/resume event queue. RunOrEnqueue() will start enqueuing
// events and they will be run/flushed when resume is called. These should be // events and they will be run/flushed when resume is called. These should be
// called when the channel owning the event queue is suspended/resumed. // called when the channel owning the event queue is suspended/resumed.
inline void Suspend(); void Suspend();
// Resume flushes the queue asynchronously, i.e. items in queue will be // Resume flushes the queue asynchronously, i.e. items in queue will be
// dispatched in a new event on the current thread. // dispatched in a new event on the current thread.
void Resume(); void Resume();
// Retargets delivery of events to the target thread specified.
nsresult RetargetDeliveryTo(nsIEventTarget* aTargetThread);
// Nulls out the delivery target so events are delivered to the main
// thread. Should only be called when the queue is known to be empty.
// Useful if the queue will be re-used.
nsresult ResetDeliveryTarget();
private: private:
// Private destructor, to discourage deletion outside of Release(): // Private destructor, to discourage deletion outside of Release():
~ChannelEventQueue() ~ChannelEventQueue()
{ {
} }
void SuspendInternal();
void ResumeInternal();
inline void MaybeFlushQueue(); inline void MaybeFlushQueue();
void FlushQueue(); void FlushQueue();
inline void CompleteResume(); inline void CompleteResume();
@ -97,17 +114,18 @@ class ChannelEventQueue final
nsTArray<UniquePtr<ChannelEvent>> mEventQueue; nsTArray<UniquePtr<ChannelEvent>> mEventQueue;
uint32_t mSuspendCount; uint32_t mSuspendCount;
bool mSuspended; bool mSuspended;
bool mForced; uint32_t mForcedCount; // Support ForcedQueueing on multiple thread.
bool mFlushing; bool mFlushing;
// Keep ptr to avoid refcount cycle: only grab ref during flushing. // Keep ptr to avoid refcount cycle: only grab ref during flushing.
nsISupports *mOwner; nsISupports *mOwner;
// For atomic mEventQueue operation and state update
Mutex mMutex; Mutex mMutex;
// EventTarget for delivery of events to the correct thread. // To guarantee event execution order among threads
nsCOMPtr<nsIEventTarget> mTargetThread; ReentrantMonitor mRunningMonitor;
friend class AutoEventEnqueuer; friend class AutoEventEnqueuer;
}; };
@ -118,20 +136,44 @@ ChannelEventQueue::RunOrEnqueue(ChannelEvent* aCallback,
{ {
MOZ_ASSERT(aCallback); MOZ_ASSERT(aCallback);
// Events execution could be a destruction of the channel (and our own
// destructor) unless we make sure its refcount doesn't drop to 0 while this
// method is running.
nsCOMPtr<nsISupports> kungFuDeathGrip(mOwner);
Unused << kungFuDeathGrip; // Not used in this function
// To avoid leaks. // To avoid leaks.
UniquePtr<ChannelEvent> event(aCallback); UniquePtr<ChannelEvent> event(aCallback);
// To guarantee that the running event and all the events generated within
// it will be finished before events on other threads.
ReentrantMonitorAutoEnter monitor(mRunningMonitor);
{ {
MutexAutoLock lock(mMutex); MutexAutoLock lock(mMutex);
bool enqueue = mForced || mSuspended || mFlushing; bool enqueue = !!mForcedCount || mSuspended || mFlushing || !mEventQueue.IsEmpty();
MOZ_ASSERT(enqueue == true || mEventQueue.IsEmpty(),
"Should always enqueue if ChannelEventQueue not empty");
if (enqueue) { if (enqueue) {
mEventQueue.AppendElement(Move(event)); mEventQueue.AppendElement(Move(event));
return; return;
} }
nsCOMPtr<nsIEventTarget> target = event->GetEventTarget();
MOZ_ASSERT(target);
bool isCurrentThread = false;
DebugOnly<nsresult> rv = target->IsOnCurrentThread(&isCurrentThread);
MOZ_ASSERT(NS_SUCCEEDED(rv));
if (!isCurrentThread) {
// Leverage Suspend/Resume mechanism to trigger flush procedure without
// creating a new one.
SuspendInternal();
mEventQueue.AppendElement(Move(event));
ResumeInternal();
return;
}
} }
MOZ_RELEASE_ASSERT(!aAssertionWhenNotQueued); MOZ_RELEASE_ASSERT(!aAssertionWhenNotQueued);
@ -142,18 +184,45 @@ inline void
ChannelEventQueue::StartForcedQueueing() ChannelEventQueue::StartForcedQueueing()
{ {
MutexAutoLock lock(mMutex); MutexAutoLock lock(mMutex);
mForced = true; ++mForcedCount;
} }
inline void inline void
ChannelEventQueue::EndForcedQueueing() ChannelEventQueue::EndForcedQueueing()
{ {
bool tryFlush = false;
{ {
MutexAutoLock lock(mMutex); MutexAutoLock lock(mMutex);
mForced = false; MOZ_ASSERT(mForcedCount > 0);
if(!--mForcedCount) {
tryFlush = true;
}
} }
MaybeFlushQueue(); if (tryFlush) {
MaybeFlushQueue();
}
}
inline nsresult
ChannelEventQueue::PrependEvent(UniquePtr<ChannelEvent>& aEvent)
{
MutexAutoLock lock(mMutex);
// Prepending event while no queue flush foreseen might cause the following
// channel events not run. This assertion here guarantee there must be a
// queue flush, either triggered by Resume or EndForcedQueueing, to execute
// the added event.
MOZ_ASSERT(mSuspended || !!mForcedCount);
UniquePtr<ChannelEvent>* newEvent =
mEventQueue.InsertElementAt(0, Move(aEvent));
if (!newEvent) {
return NS_ERROR_OUT_OF_MEMORY;
}
return NS_OK;
} }
inline nsresult inline nsresult
@ -161,6 +230,12 @@ ChannelEventQueue::PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents)
{ {
MutexAutoLock lock(mMutex); MutexAutoLock lock(mMutex);
// Prepending event while no queue flush foreseen might cause the following
// channel events not run. This assertion here guarantee there must be a
// queue flush, either triggered by Resume or EndForcedQueueing, to execute
// the added events.
MOZ_ASSERT(mSuspended || !!mForcedCount);
UniquePtr<ChannelEvent>* newEvents = UniquePtr<ChannelEvent>* newEvents =
mEventQueue.InsertElementsAt(0, aEvents.Length()); mEventQueue.InsertElementsAt(0, aEvents.Length());
if (!newEvents) { if (!newEvents) {
@ -174,18 +249,10 @@ ChannelEventQueue::PrependEvents(nsTArray<UniquePtr<ChannelEvent>>& aEvents)
return NS_OK; return NS_OK;
} }
inline void
ChannelEventQueue::Suspend()
{
MutexAutoLock lock(mMutex);
mSuspended = true;
mSuspendCount++;
}
inline void inline void
ChannelEventQueue::CompleteResume() ChannelEventQueue::CompleteResume()
{ {
bool tryFlush = false;
{ {
MutexAutoLock lock(mMutex); MutexAutoLock lock(mMutex);
@ -196,10 +263,13 @@ ChannelEventQueue::CompleteResume()
// messages) until this point, else new incoming messages could run before // messages) until this point, else new incoming messages could run before
// queued ones. // queued ones.
mSuspended = false; mSuspended = false;
tryFlush = true;
} }
} }
MaybeFlushQueue(); if (tryFlush) {
MaybeFlushQueue();
}
} }
inline void inline void
@ -211,7 +281,7 @@ ChannelEventQueue::MaybeFlushQueue()
{ {
MutexAutoLock lock(mMutex); MutexAutoLock lock(mMutex);
flushQueue = !mForced && !mFlushing && !mSuspended && flushQueue = !mForcedCount && !mFlushing && !mSuspended &&
!mEventQueue.IsEmpty(); !mEventQueue.IsEmpty();
} }

Просмотреть файл

@ -263,6 +263,12 @@ public:
mLastModified, mEntityID, mURI); mLastModified, mEntityID, mURI);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -369,6 +375,12 @@ public:
mChild->DoOnDataAvailable(mChannelStatus, mData, mOffset, mCount); mChild->DoOnDataAvailable(mChannelStatus, mData, mOffset, mCount);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -412,6 +424,12 @@ class MaybeDivertOnDataFTPEvent : public ChannelEvent
mChild->MaybeDivertOnData(mData, mOffset, mCount); mChild->MaybeDivertOnData(mData, mOffset, mCount);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
nsCString mData; nsCString mData;
@ -498,6 +516,10 @@ public:
mChild->DoOnStopRequest(mChannelStatus, mErrorMsg, mUseUTF8); mChild->DoOnStopRequest(mChannelStatus, mErrorMsg, mUseUTF8);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
return do_GetMainThread();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -559,6 +581,12 @@ class MaybeDivertOnStopFTPEvent : public ChannelEvent
mChild->MaybeDivertOnStop(mChannelStatus); mChild->MaybeDivertOnStop(mChannelStatus);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -646,6 +674,13 @@ class FTPFailedAsyncOpenEvent : public ChannelEvent
FTPFailedAsyncOpenEvent(FTPChannelChild* aChild, nsresult aStatus) FTPFailedAsyncOpenEvent(FTPChannelChild* aChild, nsresult aStatus)
: mChild(aChild), mStatus(aStatus) {} : mChild(aChild), mStatus(aStatus) {}
void Run() { mChild->DoFailedAsyncOpen(mStatus); } void Run() { mChild->DoFailedAsyncOpen(mStatus); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
nsresult mStatus; nsresult mStatus;
@ -698,6 +733,13 @@ class FTPFlushedForDiversionEvent : public ChannelEvent
{ {
mChild->FlushedForDiversion(); mChild->FlushedForDiversion();
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
}; };
@ -747,6 +789,13 @@ class FTPDeleteSelfEvent : public ChannelEvent
explicit FTPDeleteSelfEvent(FTPChannelChild* aChild) explicit FTPDeleteSelfEvent(FTPChannelChild* aChild)
: mChild(aChild) {} : mChild(aChild) {}
void Run() { mChild->DoDeleteSelf(); } void Run() { mChild->DoDeleteSelf(); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
FTPChannelChild* mChild; FTPChannelChild* mChild;
}; };
@ -961,7 +1010,19 @@ FTPChannelChild::EnsureDispatcher()
nsCOMPtr<nsIEventTarget> target = nsCOMPtr<nsIEventTarget> target =
mDispatcher->EventTargetFor(TaskCategory::Network); mDispatcher->EventTargetFor(TaskCategory::Network);
gNeckoChild->SetEventTargetForActor(this, target); gNeckoChild->SetEventTargetForActor(this, target);
mEventQ->RetargetDeliveryTo(target);
mNeckoTarget = target;
}
already_AddRefed<nsIEventTarget>
FTPChannelChild::GetNeckoTarget()
{
nsCOMPtr<nsIEventTarget> target = mNeckoTarget;
if (!target) {
target = do_GetMainThread();
}
return target.forget();
} }
} // namespace net } // namespace net

Просмотреть файл

@ -18,6 +18,7 @@
#include "nsIResumableChannel.h" #include "nsIResumableChannel.h"
#include "nsIChildChannel.h" #include "nsIChildChannel.h"
#include "nsIDivertableChannel.h" #include "nsIDivertableChannel.h"
#include "nsIEventTarget.h"
#include "nsIStreamListener.h" #include "nsIStreamListener.h"
#include "PrivateBrowsingChannel.h" #include "PrivateBrowsingChannel.h"
@ -123,9 +124,13 @@ protected:
friend class FTPStopRequestEvent; friend class FTPStopRequestEvent;
friend class MaybeDivertOnStopFTPEvent; friend class MaybeDivertOnStopFTPEvent;
friend class FTPFailedAsyncOpenEvent; friend class FTPFailedAsyncOpenEvent;
friend class FTPFlushedForDiversionEvent;
friend class FTPDeleteSelfEvent; friend class FTPDeleteSelfEvent;
private: private:
// Get event target for processing network events.
already_AddRefed<nsIEventTarget> GetNeckoTarget();
nsCOMPtr<nsIInputStream> mUploadStream; nsCOMPtr<nsIInputStream> mUploadStream;
bool mIPCOpen; bool mIPCOpen;
@ -154,6 +159,9 @@ private:
// diverting callbacks to parent. // diverting callbacks to parent.
bool mSuspendSent; bool mSuspendSent;
// EventTarget for labeling networking events.
nsCOMPtr<nsIEventTarget> mNeckoTarget;
RefPtr<Dispatcher> mDispatcher; RefPtr<Dispatcher> mDispatcher;
void EnsureDispatcher(); void EnsureDispatcher();

Просмотреть файл

@ -240,7 +240,7 @@ FTPChannelParent::RecvResume()
return IPC_OK(); return IPC_OK();
} }
class FTPDivertDataAvailableEvent : public ChannelEvent class FTPDivertDataAvailableEvent : public MainThreadChannelEvent
{ {
public: public:
FTPDivertDataAvailableEvent(FTPChannelParent* aParent, FTPDivertDataAvailableEvent(FTPChannelParent* aParent,
@ -331,7 +331,7 @@ FTPChannelParent::DivertOnDataAvailable(const nsCString& data,
} }
} }
class FTPDivertStopRequestEvent : public ChannelEvent class FTPDivertStopRequestEvent : public MainThreadChannelEvent
{ {
public: public:
FTPDivertStopRequestEvent(FTPChannelParent* aParent, FTPDivertStopRequestEvent(FTPChannelParent* aParent,
@ -391,7 +391,7 @@ FTPChannelParent::DivertOnStopRequest(const nsresult& statusCode)
OnStopRequest(mChannel, nullptr, status); OnStopRequest(mChannel, nullptr, status);
} }
class FTPDivertCompleteEvent : public ChannelEvent class FTPDivertCompleteEvent : public MainThreadChannelEvent
{ {
public: public:
explicit FTPDivertCompleteEvent(FTPChannelParent* aParent) explicit FTPDivertCompleteEvent(FTPChannelParent* aParent)

Просмотреть файл

@ -180,6 +180,7 @@ HttpChannelChild::HttpChannelChild()
, mPostRedirectChannelShouldUpgrade(false) , mPostRedirectChannelShouldUpgrade(false)
, mShouldParentIntercept(false) , mShouldParentIntercept(false)
, mSuspendParentAfterSynthesizeResponse(false) , mSuspendParentAfterSynthesizeResponse(false)
, mEventTargetMutex("HttpChannelChild::EventTargetMutex")
{ {
LOG(("Creating HttpChannelChild @%p\n", this)); LOG(("Creating HttpChannelChild @%p\n", this));
@ -293,6 +294,13 @@ class AssociateApplicationCacheEvent : public ChannelEvent
, clientID(aClientID) {} , clientID(aClientID) {}
void Run() { mChild->AssociateApplicationCache(groupID, clientID); } void Run() { mChild->AssociateApplicationCache(groupID, clientID); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsCString groupID; nsCString groupID;
@ -367,6 +375,13 @@ class StartRequestEvent : public ChannelEvent
mSecurityInfoSerialization, mSelfAddr, mPeerAddr, mSecurityInfoSerialization, mSelfAddr, mPeerAddr,
mCacheKey, mAltDataType, mAltDataLen); mCacheKey, mAltDataType, mAltDataLen);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -626,6 +641,13 @@ class TransportAndDataEvent : public ChannelEvent
mChild->OnTransportAndData(mChannelStatus, mTransportStatus, mChild->OnTransportAndData(mChannelStatus, mTransportStatus,
mOffset, mCount, mData); mOffset, mCount, mData);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -670,6 +692,12 @@ class MaybeDivertOnDataHttpEvent : public ChannelEvent
mChild->MaybeDivertOnData(mData, mOffset, mCount); mChild->MaybeDivertOnData(mData, mOffset, mCount);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsCString mData; nsCString mData;
@ -833,6 +861,13 @@ class StopRequestEvent : public ChannelEvent
, mTiming(timing) {} , mTiming(timing) {}
void Run() { mChild->OnStopRequest(mChannelStatus, mTiming); } void Run() { mChild->OnStopRequest(mChannelStatus, mTiming); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -866,6 +901,12 @@ class MaybeDivertOnStopHttpEvent : public ChannelEvent
mChild->MaybeDivertOnStop(mChannelStatus); mChild->MaybeDivertOnStop(mChannelStatus);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsresult mChannelStatus; nsresult mChannelStatus;
@ -1036,6 +1077,13 @@ class ProgressEvent : public ChannelEvent
, mProgressMax(progressMax) {} , mProgressMax(progressMax) {}
void Run() { mChild->OnProgress(mProgress, mProgressMax); } void Run() { mChild->OnProgress(mProgress, mProgressMax); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
int64_t mProgress, mProgressMax; int64_t mProgress, mProgressMax;
@ -1084,6 +1132,13 @@ class StatusEvent : public ChannelEvent
, mStatus(status) {} , mStatus(status) {}
void Run() { mChild->OnStatus(mStatus); } void Run() { mChild->OnStatus(mStatus); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsresult mStatus; nsresult mStatus;
@ -1131,6 +1186,13 @@ class FailedAsyncOpenEvent : public ChannelEvent
, mStatus(status) {} , mStatus(status) {}
void Run() { mChild->FailedAsyncOpen(mStatus); } void Run() { mChild->FailedAsyncOpen(mStatus); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
nsresult mStatus; nsresult mStatus;
@ -1185,6 +1247,13 @@ class DeleteSelfEvent : public ChannelEvent
public: public:
explicit DeleteSelfEvent(HttpChannelChild* child) : mChild(child) {} explicit DeleteSelfEvent(HttpChannelChild* child) : mChild(child) {}
void Run() { mChild->DeleteSelf(); } void Run() { mChild->DeleteSelf(); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
}; };
@ -1242,9 +1311,12 @@ HttpChannelChild::RecvFinishInterceptedRedirect()
RefPtr<HttpChannelChild> self(this); RefPtr<HttpChannelChild> self(this);
Send__delete__(this); Send__delete__(this);
// Reset the event target since the IPC actor is about to be destroyed. {
// Following channel event should be handled on main thread. // Reset the event target since the IPC actor is about to be destroyed.
mEventQ->ResetDeliveryTarget(); // Following channel event should be handled on main thread.
MutexAutoLock lock(mEventTargetMutex);
mNeckoTarget = nullptr;
}
// The IPDL connection was torn down by a interception logic in // The IPDL connection was torn down by a interception logic in
// CompleteRedirectSetup, and we need to call FinishInterceptedRedirect. // CompleteRedirectSetup, and we need to call FinishInterceptedRedirect.
@ -1315,6 +1387,13 @@ class Redirect1Event : public ChannelEvent
mResponseHead, mSecurityInfoSerialization, mResponseHead, mSecurityInfoSerialization,
mChannelId); mChannelId);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
uint32_t mRegistrarId; uint32_t mRegistrarId;
@ -1488,6 +1567,13 @@ class Redirect3Event : public ChannelEvent
public: public:
explicit Redirect3Event(HttpChannelChild* child) : mChild(child) {} explicit Redirect3Event(HttpChannelChild* child) : mChild(child) {}
void Run() { mChild->Redirect3Complete(nullptr); } void Run() { mChild->Redirect3Complete(nullptr); }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
}; };
@ -1513,6 +1599,13 @@ class HttpFlushedForDiversionEvent : public ChannelEvent
{ {
mChild->FlushedForDiversion(); mChild->FlushedForDiversion();
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
MOZ_ASSERT(mChild);
nsCOMPtr<nsIEventTarget> target = mChild->GetNeckoTarget();
return target.forget();
}
private: private:
HttpChannelChild* mChild; HttpChannelChild* mChild;
}; };
@ -2106,7 +2199,26 @@ HttpChannelChild::SetEventTarget()
nsCOMPtr<nsIEventTarget> target = nsCOMPtr<nsIEventTarget> target =
dispatcher->EventTargetFor(TaskCategory::Network); dispatcher->EventTargetFor(TaskCategory::Network);
gNeckoChild->SetEventTargetForActor(this, target); gNeckoChild->SetEventTargetForActor(this, target);
mEventQ->RetargetDeliveryTo(target);
{
MutexAutoLock lock(mEventTargetMutex);
mNeckoTarget = target;
}
}
already_AddRefed<nsIEventTarget>
HttpChannelChild::GetNeckoTarget()
{
nsCOMPtr<nsIEventTarget> target;
{
MutexAutoLock lock(mEventTargetMutex);
target = mNeckoTarget;
}
if (!target) {
target = do_GetMainThread();
}
return target.forget();
} }
nsresult nsresult

Просмотреть файл

@ -8,6 +8,7 @@
#ifndef mozilla_net_HttpChannelChild_h #ifndef mozilla_net_HttpChannelChild_h
#define mozilla_net_HttpChannelChild_h #define mozilla_net_HttpChannelChild_h
#include "mozilla/Mutex.h"
#include "mozilla/UniquePtr.h" #include "mozilla/UniquePtr.h"
#include "mozilla/net/HttpBaseChannel.h" #include "mozilla/net/HttpBaseChannel.h"
#include "mozilla/net/PHttpChannelChild.h" #include "mozilla/net/PHttpChannelChild.h"
@ -197,6 +198,9 @@ private:
// before the constructor message is sent to the parent. // before the constructor message is sent to the parent.
void SetEventTarget(); void SetEventTarget();
// Get event target for processing network events.
already_AddRefed<nsIEventTarget> GetNeckoTarget();
MOZ_MUST_USE nsresult ContinueAsyncOpen(); MOZ_MUST_USE nsresult ContinueAsyncOpen();
void DoOnStartRequest(nsIRequest* aRequest, nsISupports* aContext); void DoOnStartRequest(nsIRequest* aRequest, nsISupports* aContext);
@ -295,6 +299,11 @@ private:
// Used to call OverrideWithSynthesizedResponse in FinishInterceptedRedirect // Used to call OverrideWithSynthesizedResponse in FinishInterceptedRedirect
RefPtr<OverrideRunnable> mOverrideRunnable; RefPtr<OverrideRunnable> mOverrideRunnable;
// EventTarget for labeling networking events.
nsCOMPtr<nsIEventTarget> mNeckoTarget;
// Used to ensure atomicity of mNeckoTarget;
Mutex mEventTargetMutex;
void FinishInterceptedRedirect(); void FinishInterceptedRedirect();
void CleanupRedirectingChannel(nsresult rv); void CleanupRedirectingChannel(nsresult rv);
@ -366,6 +375,7 @@ private:
friend class Redirect1Event; friend class Redirect1Event;
friend class Redirect3Event; friend class Redirect3Event;
friend class DeleteSelfEvent; friend class DeleteSelfEvent;
friend class HttpFlushedForDiversionEvent;
friend class HttpAsyncAborter<HttpChannelChild>; friend class HttpAsyncAborter<HttpChannelChild>;
friend class InterceptStreamListener; friend class InterceptStreamListener;
friend class InterceptedChannelContent; friend class InterceptedChannelContent;

Просмотреть файл

@ -838,7 +838,7 @@ HttpChannelParent::RecvMarkOfflineCacheEntryAsForeign()
return IPC_OK(); return IPC_OK();
} }
class DivertDataAvailableEvent : public ChannelEvent class DivertDataAvailableEvent : public MainThreadChannelEvent
{ {
public: public:
DivertDataAvailableEvent(HttpChannelParent* aParent, DivertDataAvailableEvent(HttpChannelParent* aParent,
@ -933,7 +933,7 @@ HttpChannelParent::DivertOnDataAvailable(const nsCString& data,
} }
} }
class DivertStopRequestEvent : public ChannelEvent class DivertStopRequestEvent : public MainThreadChannelEvent
{ {
public: public:
DivertStopRequestEvent(HttpChannelParent* aParent, DivertStopRequestEvent(HttpChannelParent* aParent,
@ -994,7 +994,7 @@ HttpChannelParent::DivertOnStopRequest(const nsresult& statusCode)
mParentListener->OnStopRequest(mChannel, nullptr, status); mParentListener->OnStopRequest(mChannel, nullptr, status);
} }
class DivertCompleteEvent : public ChannelEvent class DivertCompleteEvent : public MainThreadChannelEvent
{ {
public: public:
explicit DivertCompleteEvent(HttpChannelParent* aParent) explicit DivertCompleteEvent(HttpChannelParent* aParent)

Просмотреть файл

@ -176,6 +176,15 @@ public:
mChannelEvent->Run(); mChannelEvent->Run();
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
nsCOMPtr<nsIEventTarget> target = mEventTarget;
if (!target) {
target = do_GetMainThread();
}
return target.forget();
}
private: private:
nsAutoPtr<ChannelEvent> mChannelEvent; nsAutoPtr<ChannelEvent> mChannelEvent;
nsCOMPtr<nsIEventTarget> mEventTarget; nsCOMPtr<nsIEventTarget> mEventTarget;
@ -200,6 +209,13 @@ class StartEvent : public ChannelEvent
{ {
mChild->OnStart(mProtocol, mExtensions, mEffectiveURL, mEncrypted); mChild->OnStart(mProtocol, mExtensions, mEffectiveURL, mEncrypted);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
nsCOMPtr<nsIEventTarget> target = do_GetCurrentThread();
return target.forget();
}
private: private:
RefPtr<WebSocketChannelChild> mChild; RefPtr<WebSocketChannelChild> mChild;
nsCString mProtocol; nsCString mProtocol;
@ -258,6 +274,13 @@ class StopEvent : public ChannelEvent
{ {
mChild->OnStop(mStatusCode); mChild->OnStop(mStatusCode);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
nsCOMPtr<nsIEventTarget> target = do_GetCurrentThread();
return target.forget();
}
private: private:
RefPtr<WebSocketChannelChild> mChild; RefPtr<WebSocketChannelChild> mChild;
nsresult mStatusCode; nsresult mStatusCode;
@ -308,6 +331,13 @@ class MessageEvent : public ChannelEvent
mChild->OnBinaryMessageAvailable(mMessage); mChild->OnBinaryMessageAvailable(mMessage);
} }
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
nsCOMPtr<nsIEventTarget> target = do_GetCurrentThread();
return target.forget();
}
private: private:
RefPtr<WebSocketChannelChild> mChild; RefPtr<WebSocketChannelChild> mChild;
nsCString mMessage; nsCString mMessage;
@ -380,6 +410,13 @@ class AcknowledgeEvent : public ChannelEvent
{ {
mChild->OnAcknowledge(mSize); mChild->OnAcknowledge(mSize);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
nsCOMPtr<nsIEventTarget> target = do_GetCurrentThread();
return target.forget();
}
private: private:
RefPtr<WebSocketChannelChild> mChild; RefPtr<WebSocketChannelChild> mChild;
uint32_t mSize; uint32_t mSize;
@ -426,6 +463,13 @@ class ServerCloseEvent : public ChannelEvent
{ {
mChild->OnServerClose(mCode, mReason); mChild->OnServerClose(mCode, mReason);
} }
already_AddRefed<nsIEventTarget> GetEventTarget()
{
nsCOMPtr<nsIEventTarget> target = do_GetCurrentThread();
return target.forget();
}
private: private:
RefPtr<WebSocketChannelChild> mChild; RefPtr<WebSocketChannelChild> mChild;
uint16_t mCode; uint16_t mCode;
@ -721,19 +765,6 @@ WebSocketChannelChild::GetSecurityInfo(nsISupports **aSecurityInfo)
return NS_ERROR_NOT_AVAILABLE; return NS_ERROR_NOT_AVAILABLE;
} }
//-----------------------------------------------------------------------------
// WebSocketChannelChild::nsIThreadRetargetableRequest
//-----------------------------------------------------------------------------
NS_IMETHODIMP
WebSocketChannelChild::RetargetDeliveryTo(nsIEventTarget* aTargetThread)
{
nsresult rv = BaseWebSocketChannel::RetargetDeliveryTo(aTargetThread);
MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv));
return mEventQ->RetargetDeliveryTo(aTargetThread);
}
bool bool
WebSocketChannelChild::IsOnTargetThread() WebSocketChannelChild::IsOnTargetThread()
{ {

Просмотреть файл

@ -25,7 +25,6 @@ class WebSocketChannelChild final : public BaseWebSocketChannel,
explicit WebSocketChannelChild(bool aSecure); explicit WebSocketChannelChild(bool aSecure);
NS_DECL_THREADSAFE_ISUPPORTS NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSITHREADRETARGETABLEREQUEST
// nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us // nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us
// //

Просмотреть файл

@ -130,7 +130,7 @@ WyciwygChannelChild::Init(nsIURI* uri)
// WyciwygChannelChild::PWyciwygChannelChild // WyciwygChannelChild::PWyciwygChannelChild
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
class WyciwygStartRequestEvent : public ChannelEvent class WyciwygStartRequestEvent : public MainThreadChannelEvent
{ {
public: public:
WyciwygStartRequestEvent(WyciwygChannelChild* child, WyciwygStartRequestEvent(WyciwygChannelChild* child,
@ -192,7 +192,7 @@ WyciwygChannelChild::OnStartRequest(const nsresult& statusCode,
Cancel(rv); Cancel(rv);
} }
class WyciwygDataAvailableEvent : public ChannelEvent class WyciwygDataAvailableEvent : public MainThreadChannelEvent
{ {
public: public:
WyciwygDataAvailableEvent(WyciwygChannelChild* child, WyciwygDataAvailableEvent(WyciwygChannelChild* child,
@ -253,7 +253,7 @@ WyciwygChannelChild::OnDataAvailable(const nsCString& data,
} }
} }
class WyciwygStopRequestEvent : public ChannelEvent class WyciwygStopRequestEvent : public MainThreadChannelEvent
{ {
public: public:
WyciwygStopRequestEvent(WyciwygChannelChild* child, WyciwygStopRequestEvent(WyciwygChannelChild* child,
@ -305,7 +305,7 @@ WyciwygChannelChild::OnStopRequest(const nsresult& statusCode)
PWyciwygChannelChild::Send__delete__(this); PWyciwygChannelChild::Send__delete__(this);
} }
class WyciwygCancelEvent : public ChannelEvent class WyciwygCancelEvent : public MainThreadChannelEvent
{ {
public: public:
WyciwygCancelEvent(WyciwygChannelChild* child, const nsresult& status) WyciwygCancelEvent(WyciwygChannelChild* child, const nsresult& status)