Bug 1634846 - P2. Make ipc's MessageChannel works with TaskQueue, r=nika

We no longer rely of having a message loop for the worker thread.

Differential Revision: https://phabricator.services.mozilla.com/D80655
This commit is contained in:
Jean-Yves Avenard 2020-07-02 00:26:41 +00:00
Родитель 8ad7f1ba8e
Коммит b2cf09ec3e
4 изменённых файлов: 44 добавлений и 68 удалений

Просмотреть файл

@ -567,9 +567,7 @@ MessageChannel::MessageChannel(const char* aName, IToplevelProtocol* aListener)
mChannelState(ChannelClosed),
mSide(UnknownSide),
mIsCrossProcess(false),
mWorkerLoop(nullptr),
mChannelErrorTask(nullptr),
mWorkerThread(nullptr),
mTimeoutMs(kNoTimeout),
mInTimeoutSecondHalf(false),
mNextSeqno(0),
@ -709,19 +707,6 @@ bool MessageChannel::CanSend() const {
return Connected();
}
void MessageChannel::WillDestroyCurrentMessageLoop() {
#if defined(DEBUG)
CrashReporter::AnnotateCrashReport(
CrashReporter::Annotation::IPCFatalErrorProtocol,
nsDependentCString(mName));
MOZ_CRASH("MessageLoop destroyed before MessageChannel that's bound to it");
#endif
// Clear mWorkerThread to avoid posting to it in the future.
MonitorAutoLock lock(*mMonitor);
mWorkerLoop = nullptr;
}
void MessageChannel::Clear() {
// Don't clear mWorkerThread; we use it in AssertLinkThread() and
// AssertWorkerThread().
@ -774,17 +759,12 @@ void MessageChannel::Clear() {
gParentProcessBlocker = nullptr;
}
if (mWorkerLoop) {
mWorkerLoop->RemoveDestructionObserver(this);
}
gUnresolvedResponses -= mPendingResponses.size();
for (auto& pair : mPendingResponses) {
pair.second.get()->Reject(ResponseRejectReason::ChannelClosed);
}
mPendingResponses.clear();
mWorkerLoop = nullptr;
if (mLink != nullptr && mIsCrossProcess) {
ChannelCountReporter::Decrement(mName);
}
@ -820,9 +800,8 @@ bool MessageChannel::Open(mozilla::UniquePtr<Transport> aTransport,
MOZ_ASSERT(!mLink, "Open() called > once");
mMonitor = new RefCountedMonitor();
mWorkerLoop = MessageLoop::current();
mWorkerThread = PR_GetCurrentThread();
mWorkerLoop->AddDestructionObserver(this);
mWorkerThread = GetCurrentSerialEventTarget();
MOZ_ASSERT(mWorkerThread, "We should always be on a nsISerialEventTarget");
mListener->OnIPCChannelOpened();
auto link = MakeUnique<ProcessLink>(this);
@ -835,7 +814,7 @@ bool MessageChannel::Open(mozilla::UniquePtr<Transport> aTransport,
}
bool MessageChannel::Open(MessageChannel* aTargetChan,
nsIEventTarget* aEventTarget, Side aSide) {
nsISerialEventTarget* aEventTarget, Side aSide) {
// Opens a connection to another thread in the same process.
// This handshake proceeds as follows:
@ -854,7 +833,7 @@ bool MessageChannel::Open(MessageChannel* aTargetChan,
MOZ_ASSERT(aTargetChan, "Need a target channel");
MOZ_ASSERT(ChannelClosed == mChannelState, "Not currently closed");
CommonThreadOpenInit(aTargetChan, aSide);
CommonThreadOpenInit(aTargetChan, GetCurrentSerialEventTarget(), aSide);
Side oppSide = UnknownSide;
switch (aSide) {
@ -872,10 +851,10 @@ bool MessageChannel::Open(MessageChannel* aTargetChan,
MonitorAutoLock lock(*mMonitor);
mChannelState = ChannelOpening;
MOZ_ALWAYS_SUCCEEDS(
aEventTarget->Dispatch(NewNonOwningRunnableMethod<MessageChannel*, Side>(
MOZ_ALWAYS_SUCCEEDS(aEventTarget->Dispatch(
NewNonOwningRunnableMethod<MessageChannel*, nsISerialEventTarget*, Side>(
"ipc::MessageChannel::OpenAsOtherThread", aTargetChan,
&MessageChannel::OpenAsOtherThread, this, oppSide)));
&MessageChannel::OpenAsOtherThread, this, aEventTarget, oppSide)));
while (ChannelOpening == mChannelState) mMonitor->Wait();
MOZ_RELEASE_ASSERT(ChannelConnected == mChannelState,
@ -884,13 +863,14 @@ bool MessageChannel::Open(MessageChannel* aTargetChan,
}
void MessageChannel::OpenAsOtherThread(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread,
Side aSide) {
// Invoked when the other side has begun the open.
MOZ_ASSERT(ChannelClosed == mChannelState, "Not currently closed");
MOZ_ASSERT(ChannelOpening == aTargetChan->mChannelState,
"Target channel not in the process of opening");
CommonThreadOpenInit(aTargetChan, aSide);
CommonThreadOpenInit(aTargetChan, aThread, aSide);
mMonitor = aTargetChan->mMonitor;
MonitorAutoLock lock(*mMonitor);
@ -902,10 +882,10 @@ void MessageChannel::OpenAsOtherThread(MessageChannel* aTargetChan,
}
void MessageChannel::CommonThreadOpenInit(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread,
Side aSide) {
mWorkerLoop = MessageLoop::current();
mWorkerThread = PR_GetCurrentThread();
mWorkerLoop->AddDestructionObserver(this);
MOZ_ASSERT(aThread);
mWorkerThread = aThread;
mListener->OnIPCChannelOpened();
mLink = MakeUnique<ThreadLink>(this, aTargetChan);
@ -914,7 +894,8 @@ void MessageChannel::CommonThreadOpenInit(MessageChannel* aTargetChan,
bool MessageChannel::OpenOnSameThread(MessageChannel* aTargetChan,
mozilla::ipc::Side aSide) {
CommonThreadOpenInit(aTargetChan, aSide);
nsCOMPtr<nsISerialEventTarget> currentThread = GetCurrentSerialEventTarget();
CommonThreadOpenInit(aTargetChan, currentThread, aSide);
Side oppSide = UnknownSide;
switch (aSide) {
@ -934,7 +915,7 @@ bool MessageChannel::OpenOnSameThread(MessageChannel* aTargetChan,
mMonitor = new RefCountedMonitor();
mChannelState = ChannelOpening;
aTargetChan->CommonThreadOpenInit(this, oppSide);
aTargetChan->CommonThreadOpenInit(this, currentThread, oppSide);
aTargetChan->mIsSameThreadChannel = true;
aTargetChan->mMonitor = mMonitor;
@ -2003,13 +1984,13 @@ void MessageChannel::MessageTask::Post() {
mScheduled = true;
RefPtr<MessageTask> self = this;
nsCOMPtr<nsIEventTarget> eventTarget =
nsCOMPtr<nsISerialEventTarget> eventTarget =
mChannel->mListener->GetMessageEventTarget(mMessage);
if (eventTarget) {
eventTarget->Dispatch(self.forget(), NS_DISPATCH_NORMAL);
} else if (mChannel->mWorkerLoop) {
mChannel->mWorkerLoop->PostTask(self.forget());
} else {
mChannel->mWorkerThread->Dispatch(self.forget());
}
}
@ -2412,9 +2393,7 @@ void MessageChannel::OnChannelConnected(int32_t peer_id) {
mPeerPidSet = true;
mPeerPid = peer_id;
RefPtr<CancelableRunnable> task = mOnChannelConnectedTask;
if (mWorkerLoop) {
mWorkerLoop->PostTask(task.forget());
}
mWorkerThread->Dispatch(task.forget());
}
void MessageChannel::DispatchOnChannelConnected() {
@ -2604,10 +2583,10 @@ void MessageChannel::OnNotifyMaybeChannelError() {
"ipc::MessageChannel::OnNotifyMaybeChannelError", this,
&MessageChannel::OnNotifyMaybeChannelError);
RefPtr<Runnable> task = mChannelErrorTask;
// 10 ms delay is completely arbitrary
if (mWorkerLoop) {
mWorkerLoop->PostDelayedTask(task.forget(), 10);
}
// This used to post a 10ms delayed patch; however not all
// nsISerialEventTarget implementations support delayed dispatch.
// The delay being completely arbitrary, we may not as well have any.
mWorkerThread->Dispatch(task.forget());
return;
}
@ -2617,14 +2596,14 @@ void MessageChannel::OnNotifyMaybeChannelError() {
void MessageChannel::PostErrorNotifyTask() {
mMonitor->AssertCurrentThreadOwns();
if (mChannelErrorTask || !mWorkerLoop) return;
if (mChannelErrorTask) return;
// This must be the last code that runs on this thread!
mChannelErrorTask = NewNonOwningCancelableRunnableMethod(
"ipc::MessageChannel::OnNotifyMaybeChannelError", this,
&MessageChannel::OnNotifyMaybeChannelError);
RefPtr<Runnable> task = mChannelErrorTask;
mWorkerLoop->PostTask(task.forget());
mWorkerThread->Dispatch(task.forget());
}
// Special async message.
@ -2785,7 +2764,7 @@ void MessageChannel::DebugAbort(const char* file, int line, const char* cond,
}
void MessageChannel::DumpInterruptStack(const char* const pfx) const {
NS_WARNING_ASSERTION(MessageLoop::current() != mWorkerLoop,
NS_WARNING_ASSERTION(!mWorkerThread->IsOnCurrentThread(),
"The worker thread had better be paused in a debugger!");
printf_stderr("%sMessageChannel 'backtrace':\n", pfx);
@ -2818,7 +2797,8 @@ void MessageChannel::AddProfilerMarker(const IPC::Message& aMessage,
}
int32_t MessageChannel::GetTopmostMessageRoutingId() const {
MOZ_RELEASE_ASSERT(MessageLoop::current() == mWorkerLoop);
AssertWorkerThread();
if (mCxxStackFrames.empty()) {
return MSG_ROUTING_NONE;
}

Просмотреть файл

@ -10,7 +10,6 @@
#include "base/basictypes.h"
#include "base/message_loop.h"
#include "mozilla/Atomics.h"
#include "mozilla/DebugOnly.h"
#include "mozilla/Monitor.h"
@ -19,17 +18,17 @@
#if defined(OS_WIN)
# include "mozilla/ipc/Neutering.h"
#endif // defined(OS_WIN)
#include "mozilla/ipc/Transport.h"
#include "MessageLink.h"
#include "nsThreadUtils.h"
#include <math.h>
#include <deque>
#include <functional>
#include <map>
#include <math.h>
#include <stack>
#include <vector>
#include "MessageLink.h"
#include "mozilla/ipc/Transport.h"
class nsIEventTarget;
namespace mozilla {
@ -97,7 +96,7 @@ enum ChannelState {
class AutoEnterTransaction;
class MessageChannel : HasResultCodes, MessageLoop::DestructionObserver {
class MessageChannel : HasResultCodes {
friend class ProcessLink;
friend class ThreadLink;
#ifdef FUZZING
@ -171,7 +170,7 @@ class MessageChannel : HasResultCodes, MessageLoop::DestructionObserver {
// For more details on the process of opening a channel between
// threads, see the extended comment on this function
// in MessageChannel.cpp.
bool Open(MessageChannel* aTargetChan, nsIEventTarget* aEventTarget,
bool Open(MessageChannel* aTargetChan, nsISerialEventTarget* aEventTarget,
Side aSide);
// "Open" a connection to an actor on the current thread.
@ -376,8 +375,10 @@ class MessageChannel : HasResultCodes, MessageLoop::DestructionObserver {
#endif // defined(OS_WIN)
private:
void CommonThreadOpenInit(MessageChannel* aTargetChan, Side aSide);
void OpenAsOtherThread(MessageChannel* aTargetChan, Side aSide);
void CommonThreadOpenInit(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread, Side aSide);
void OpenAsOtherThread(MessageChannel* aTargetChan,
nsISerialEventTarget* aThread, Side aSide);
void PostErrorNotifyTask();
void OnNotifyMaybeChannelError();
@ -552,10 +553,9 @@ class MessageChannel : HasResultCodes, MessageLoop::DestructionObserver {
void NotifyMaybeChannelError();
private:
// Can be run on either thread
void AssertWorkerThread() const {
MOZ_ASSERT(mWorkerThread, "Channel hasn't been opened yet");
MOZ_RELEASE_ASSERT(mWorkerThread == PR_GetCurrentThread(),
MOZ_RELEASE_ASSERT(mWorkerThread && mWorkerThread->IsOnCurrentThread(),
"not on worker thread!");
}
@ -573,7 +573,7 @@ class MessageChannel : HasResultCodes, MessageLoop::DestructionObserver {
// If we aren't a same-thread channel, our "link" thread is _not_ our
// worker thread!
MOZ_ASSERT(mWorkerThread, "Channel hasn't been opened yet");
MOZ_RELEASE_ASSERT(mWorkerThread != PR_GetCurrentThread(),
MOZ_RELEASE_ASSERT(mWorkerThread && !mWorkerThread->IsOnCurrentThread(),
"on worker thread but should not be!");
}
@ -617,8 +617,6 @@ class MessageChannel : HasResultCodes, MessageLoop::DestructionObserver {
typedef std::map<size_t, UniquePtr<UntypedCallbackHolder>> CallbackMap;
typedef IPC::Message::msgid_t msgid_t;
void WillDestroyCurrentMessageLoop() override;
private:
// This will be a string literal, so lifetime is not an issue.
const char* mName;
@ -631,13 +629,11 @@ class MessageChannel : HasResultCodes, MessageLoop::DestructionObserver {
Side mSide;
bool mIsCrossProcess;
UniquePtr<MessageLink> mLink;
MessageLoop* mWorkerLoop; // thread where work is done
RefPtr<CancelableRunnable>
mChannelErrorTask; // NotifyMaybeChannelError runnable
// Thread we are allowed to send and receive on. This persists even after
// mWorkerLoop is cleared during channel shutdown.
PRThread* mWorkerThread;
// Thread we are allowed to send and receive on.
nsCOMPtr<nsISerialEventTarget> mWorkerThread;
// Timeout periods are broken up in two to prevent system suspension from
// triggering an abort. This method (called by WaitForEvent with a 'did

Просмотреть файл

@ -91,7 +91,7 @@ void ProcessLink::Open(UniquePtr<Transport> aTransport, MessageLoop* aIOLoop,
mIOLoop = aIOLoop;
NS_ASSERTION(mIOLoop, "need an IO loop");
NS_ASSERTION(mChan->mWorkerLoop, "need a worker loop");
NS_ASSERTION(mChan->mWorkerThread, "need a worker thread");
// If we were never able to open the transport, immediately post an error
// message.

Просмотреть файл

@ -24,7 +24,7 @@ mozilla::dom::ContentParent* ProtocolFuzzerHelper::CreateContentParent(
const nsAString& aRemoteType) {
auto* cp = new mozilla::dom::ContentParent(aRemoteType);
// TODO: this duplicates MessageChannel::Open
cp->GetIPCChannel()->mWorkerThread = PR_GetCurrentThread();
cp->GetIPCChannel()->mWorkerThread = GetCurrentSerialEventTarget();
cp->GetIPCChannel()->mMonitor = new RefCountedMonitor();
return cp;
}