зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1312960 - Associate each message in IPC queue with the runnable that will run it (r=dvander)
This commit is contained in:
Родитель
a054d4a461
Коммит
50e4d05346
|
@ -489,10 +489,7 @@ MessageChannel::MessageChannel(MessageListener *aListener)
|
|||
mTransactionStack(nullptr),
|
||||
mTimedOutMessageSeqno(0),
|
||||
mTimedOutMessageNestedLevel(0),
|
||||
#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
|
||||
mPending(AnnotateAllocator<Message>(*this)),
|
||||
#endif
|
||||
mRemoteStackDepthGuess(false),
|
||||
mRemoteStackDepthGuess(0),
|
||||
mSawInterruptOutMsg(false),
|
||||
mIsWaitingForIncoming(false),
|
||||
mAbortOnError(false),
|
||||
|
@ -508,10 +505,6 @@ MessageChannel::MessageChannel(MessageListener *aListener)
|
|||
mIsSyncWaitingOnNonMainThread = false;
|
||||
#endif
|
||||
|
||||
RefPtr<CancelableRunnable> runnable =
|
||||
NewNonOwningCancelableRunnableMethod(this, &MessageChannel::OnMaybeDequeueOne);
|
||||
mDequeueOneTask = new RefCountedTask(runnable.forget());
|
||||
|
||||
mOnChannelConnectedTask =
|
||||
NewNonOwningCancelableRunnableMethod(this, &MessageChannel::DispatchOnChannelConnected);
|
||||
|
||||
|
@ -635,8 +628,6 @@ MessageChannel::Clear()
|
|||
gParentProcessBlocker = nullptr;
|
||||
}
|
||||
|
||||
mDequeueOneTask->Cancel();
|
||||
|
||||
mWorkerLoop = nullptr;
|
||||
delete mLink;
|
||||
mLink = nullptr;
|
||||
|
@ -649,7 +640,11 @@ MessageChannel::Clear()
|
|||
}
|
||||
|
||||
// Free up any memory used by pending messages.
|
||||
for (RefPtr<MessageTask> task : mPending) {
|
||||
task->Clear();
|
||||
}
|
||||
mPending.clear();
|
||||
|
||||
mOutOfTurnReplies.clear();
|
||||
while (!mDeferred.empty()) {
|
||||
mDeferred.pop();
|
||||
|
@ -879,19 +874,6 @@ MessageChannel::ShouldDeferMessage(const Message& aMsg)
|
|||
return mSide == ParentSide && aMsg.transaction_id() != CurrentNestedInsideSyncTransaction();
|
||||
}
|
||||
|
||||
// Predicate that is true for messages that should be consolidated if 'compress' is set.
|
||||
class MatchingKinds {
|
||||
typedef IPC::Message Message;
|
||||
Message::msgid_t mType;
|
||||
int32_t mRoutingId;
|
||||
public:
|
||||
MatchingKinds(Message::msgid_t aType, int32_t aRoutingId) :
|
||||
mType(aType), mRoutingId(aRoutingId) {}
|
||||
bool operator()(const Message &msg) {
|
||||
return msg.type() == mType && msg.routing_id() == mRoutingId;
|
||||
}
|
||||
};
|
||||
|
||||
void
|
||||
MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
|
||||
{
|
||||
|
@ -925,31 +907,34 @@ MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
|
|||
MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
|
||||
aMsg.nested_level() == IPC::Message::NOT_NESTED);
|
||||
|
||||
bool compress = false;
|
||||
bool reuseTask = false;
|
||||
if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
|
||||
compress = (!mPending.empty() &&
|
||||
mPending.back().type() == aMsg.type() &&
|
||||
mPending.back().routing_id() == aMsg.routing_id());
|
||||
bool compress = (!mPending.isEmpty() &&
|
||||
mPending.getLast()->Msg().type() == aMsg.type() &&
|
||||
mPending.getLast()->Msg().routing_id() == aMsg.routing_id());
|
||||
if (compress) {
|
||||
// This message type has compression enabled, and the back of the
|
||||
// queue was the same message type and routed to the same destination.
|
||||
// Replace it with the newer message.
|
||||
MOZ_RELEASE_ASSERT(mPending.back().compress_type() ==
|
||||
IPC::Message::COMPRESSION_ENABLED);
|
||||
mPending.pop_back();
|
||||
MOZ_RELEASE_ASSERT(mPending.getLast()->Msg().compress_type() ==
|
||||
IPC::Message::COMPRESSION_ENABLED);
|
||||
mPending.getLast()->Msg() = Move(aMsg);
|
||||
|
||||
reuseTask = true;
|
||||
}
|
||||
} else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL) {
|
||||
// Check the message queue for another message with this type/destination.
|
||||
auto it = std::find_if(mPending.rbegin(), mPending.rend(),
|
||||
MatchingKinds(aMsg.type(), aMsg.routing_id()));
|
||||
if (it != mPending.rend()) {
|
||||
// This message type has compression enabled, and the queue holds
|
||||
// a message with the same message type and routed to the same destination.
|
||||
// Erase it. Note that, since we always compress these redundancies, There Can
|
||||
// Be Only One.
|
||||
compress = true;
|
||||
MOZ_RELEASE_ASSERT((*it).compress_type() == IPC::Message::COMPRESSION_ALL);
|
||||
mPending.erase((++it).base());
|
||||
} else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL && !mPending.isEmpty()) {
|
||||
for (RefPtr<MessageTask> p = mPending.getLast(); p; p = p->getPrevious()) {
|
||||
if (p->Msg().type() == aMsg.type() &&
|
||||
p->Msg().routing_id() == aMsg.routing_id())
|
||||
{
|
||||
// This message type has compression enabled, and the queue
|
||||
// holds a message with the same message type and routed to the
|
||||
// same destination. Erase it. Note that, since we always
|
||||
// compress these redundancies, There Can Be Only One.
|
||||
MOZ_RELEASE_ASSERT(p->Msg().compress_type() == IPC::Message::COMPRESSION_ALL);
|
||||
p->remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -959,7 +944,7 @@ MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
|
|||
wakeUpSyncSend ||
|
||||
AwaitingIncomingMessage();
|
||||
|
||||
// Although we usually don't need to post an OnMaybeDequeueOne task if
|
||||
// Although we usually don't need to post a message task if
|
||||
// shouldWakeUp is true, it's easier to post anyway than to have to
|
||||
// guarantee that every Send call processes everything it's supposed to
|
||||
// before returning.
|
||||
|
@ -968,6 +953,10 @@ MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
|
|||
IPC_LOG("Receive on link thread; seqno=%d, xid=%d, shouldWakeUp=%d",
|
||||
aMsg.seqno(), aMsg.transaction_id(), shouldWakeUp);
|
||||
|
||||
if (reuseTask) {
|
||||
return;
|
||||
}
|
||||
|
||||
// There are three cases we're concerned about, relating to the state of the
|
||||
// main thread:
|
||||
//
|
||||
|
@ -990,29 +979,26 @@ MessageChannel::OnMessageReceivedFromLink(Message&& aMsg)
|
|||
// blocked. This is okay, since we always check for pending events before
|
||||
// blocking again.
|
||||
|
||||
mPending.push_back(Move(aMsg));
|
||||
RefPtr<MessageTask> task = new MessageTask(this, Move(aMsg));
|
||||
mPending.insertBack(task);
|
||||
|
||||
if (shouldWakeUp) {
|
||||
NotifyWorkerThread();
|
||||
}
|
||||
|
||||
if (shouldPostTask) {
|
||||
if (!compress) {
|
||||
// If we compressed away the previous message, we'll re-use
|
||||
// its pending task.
|
||||
RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
|
||||
mWorkerLoop->PostTask(task.forget());
|
||||
}
|
||||
task->Post();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
MessageChannel::PeekMessages(mozilla::function<bool(const Message& aMsg)> aInvoke)
|
||||
{
|
||||
// FIXME: We shouldn't be holding the lock for aInvoke!
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
|
||||
Message &msg = *it;
|
||||
for (RefPtr<MessageTask> it : mPending) {
|
||||
const Message &msg = it->Msg();
|
||||
if (!aInvoke(msg)) {
|
||||
break;
|
||||
}
|
||||
|
@ -1022,6 +1008,8 @@ MessageChannel::PeekMessages(mozilla::function<bool(const Message& aMsg)> aInvok
|
|||
void
|
||||
MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
|
||||
{
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d",
|
||||
aTransaction.SequenceNumber(), aTransaction.TransactionID());
|
||||
|
||||
|
@ -1038,8 +1026,8 @@ MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
|
|||
|
||||
mozilla::Vector<Message> toProcess;
|
||||
|
||||
for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
|
||||
Message &msg = *it;
|
||||
for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
|
||||
Message &msg = p->Msg();
|
||||
|
||||
MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
|
||||
"Calling ShouldDeferMessage when cancelled");
|
||||
|
@ -1053,10 +1041,11 @@ MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
|
|||
if (!defer) {
|
||||
if (!toProcess.append(Move(msg)))
|
||||
MOZ_CRASH();
|
||||
it = mPending.erase(it);
|
||||
|
||||
p = p->removeAndGetNext();
|
||||
continue;
|
||||
}
|
||||
it++;
|
||||
p = p->getNext();
|
||||
}
|
||||
|
||||
if (toProcess.empty()) {
|
||||
|
@ -1358,9 +1347,9 @@ MessageChannel::Call(Message* aMsg, Message* aReply)
|
|||
{
|
||||
recvd = Move(it->second);
|
||||
mOutOfTurnReplies.erase(it);
|
||||
} else if (!mPending.empty()) {
|
||||
recvd = Move(mPending.front());
|
||||
mPending.pop_front();
|
||||
} else if (!mPending.isEmpty()) {
|
||||
RefPtr<MessageTask> task = mPending.popFirst();
|
||||
recvd = Move(task->Msg());
|
||||
} else {
|
||||
// because of subtleties with nested event loops, it's possible
|
||||
// that we got here and nothing happened. or, we might have a
|
||||
|
@ -1449,18 +1438,19 @@ MessageChannel::WaitForIncomingMessage()
|
|||
NeuteredWindowRegion neuteredRgn(mFlags & REQUIRE_DEFERRED_MESSAGE_PROTECTION);
|
||||
#endif
|
||||
|
||||
{ // Scope for lock
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
AutoEnterWaitForIncoming waitingForIncoming(*this);
|
||||
if (mChannelState != ChannelConnected) {
|
||||
return false;
|
||||
}
|
||||
if (!HasPendingEvents()) {
|
||||
return WaitForInterruptNotify();
|
||||
}
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
AutoEnterWaitForIncoming waitingForIncoming(*this);
|
||||
if (mChannelState != ChannelConnected) {
|
||||
return false;
|
||||
}
|
||||
if (!HasPendingEvents()) {
|
||||
return WaitForInterruptNotify();
|
||||
}
|
||||
|
||||
return OnMaybeDequeueOne();
|
||||
MOZ_RELEASE_ASSERT(!mPending.isEmpty());
|
||||
RefPtr<MessageTask> task = mPending.getFirst();
|
||||
RunMessage(*task);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -1468,7 +1458,7 @@ MessageChannel::HasPendingEvents()
|
|||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
return Connected() && !mPending.empty();
|
||||
return Connected() && !mPending.isEmpty();
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -1479,7 +1469,7 @@ MessageChannel::InterruptEventOccurred()
|
|||
IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
|
||||
|
||||
return (!Connected() ||
|
||||
!mPending.empty() ||
|
||||
!mPending.isEmpty() ||
|
||||
(!mOutOfTurnReplies.empty() &&
|
||||
mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
|
||||
mOutOfTurnReplies.end()));
|
||||
|
@ -1503,19 +1493,12 @@ MessageChannel::ProcessPendingRequest(Message &&aUrgent)
|
|||
}
|
||||
|
||||
bool
|
||||
MessageChannel::DequeueOne(Message *recvd)
|
||||
MessageChannel::ShouldRunMessage(const Message& aMsg)
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (!Connected()) {
|
||||
ReportConnectionError("OnMaybeDequeueOne");
|
||||
return false;
|
||||
if (!mTimedOutMessageSeqno) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!mDeferred.empty())
|
||||
MaybeUndeferIncall();
|
||||
|
||||
// If we've timed out a message and we're awaiting the reply to the timed
|
||||
// out message, we have to be careful what messages we process. Here's what
|
||||
// can go wrong:
|
||||
|
@ -1532,56 +1515,131 @@ MessageChannel::DequeueOne(Message *recvd)
|
|||
// message unless the child would need the response to that message in order
|
||||
// to process M. Those messages are the ones that have a higher nested level
|
||||
// than M or that are part of the same transaction as M.
|
||||
if (mTimedOutMessageSeqno) {
|
||||
for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); it++) {
|
||||
Message &msg = *it;
|
||||
if (msg.nested_level() > mTimedOutMessageNestedLevel ||
|
||||
(msg.nested_level() == mTimedOutMessageNestedLevel
|
||||
&& msg.transaction_id() == mTimedOutMessageSeqno))
|
||||
{
|
||||
*recvd = Move(msg);
|
||||
mPending.erase(it);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (aMsg.nested_level() < mTimedOutMessageNestedLevel ||
|
||||
(aMsg.nested_level() == mTimedOutMessageNestedLevel
|
||||
&& aMsg.transaction_id() != mTimedOutMessageSeqno))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (mPending.empty())
|
||||
return false;
|
||||
|
||||
*recvd = Move(mPending.front());
|
||||
mPending.pop_front();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
MessageChannel::OnMaybeDequeueOne()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
Message recvd;
|
||||
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
if (!DequeueOne(&recvd))
|
||||
return false;
|
||||
|
||||
if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
|
||||
// We probably just received a reply in a nested loop for an
|
||||
// Interrupt call sent before entering that loop.
|
||||
mOutOfTurnReplies[recvd.seqno()] = Move(recvd);
|
||||
return false;
|
||||
}
|
||||
|
||||
DispatchMessage(Move(recvd));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
MessageChannel::RunMessage(MessageTask& aTask)
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
Message& msg = aTask.Msg();
|
||||
|
||||
if (!Connected()) {
|
||||
ReportConnectionError("RunMessage");
|
||||
return;
|
||||
}
|
||||
|
||||
// Check that we're going to run the first message that's valid to run.
|
||||
#ifdef DEBUG
|
||||
for (RefPtr<MessageTask> task : mPending) {
|
||||
if (task == &aTask) {
|
||||
break;
|
||||
}
|
||||
MOZ_ASSERT(!ShouldRunMessage(task->Msg()));
|
||||
}
|
||||
#endif
|
||||
|
||||
if (!mDeferred.empty()) {
|
||||
MaybeUndeferIncall();
|
||||
}
|
||||
|
||||
if (!ShouldRunMessage(msg)) {
|
||||
return;
|
||||
}
|
||||
|
||||
MOZ_RELEASE_ASSERT(aTask.isInList());
|
||||
aTask.remove();
|
||||
|
||||
if (IsOnCxxStack() && msg.is_interrupt() && msg.is_reply()) {
|
||||
// We probably just received a reply in a nested loop for an
|
||||
// Interrupt call sent before entering that loop.
|
||||
mOutOfTurnReplies[msg.seqno()] = Move(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
DispatchMessage(Move(msg));
|
||||
}
|
||||
|
||||
nsresult
|
||||
MessageChannel::MessageTask::Run()
|
||||
{
|
||||
if (!mChannel) {
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
mChannel->AssertWorkerThread();
|
||||
mChannel->mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
MonitorAutoLock lock(*mChannel->mMonitor);
|
||||
|
||||
// In case we choose not to run this message, we may need to be able to Post
|
||||
// it again.
|
||||
mScheduled = false;
|
||||
|
||||
if (!isInList()) {
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
mChannel->RunMessage(*this);
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
// Warning: This method removes the receiver from whatever list it might be in.
|
||||
nsresult
|
||||
MessageChannel::MessageTask::Cancel()
|
||||
{
|
||||
if (!mChannel) {
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
mChannel->AssertWorkerThread();
|
||||
mChannel->mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
MonitorAutoLock lock(*mChannel->mMonitor);
|
||||
|
||||
if (!isInList()) {
|
||||
return NS_OK;
|
||||
}
|
||||
remove();
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
void
|
||||
MessageChannel::MessageTask::Post()
|
||||
{
|
||||
MOZ_RELEASE_ASSERT(!mScheduled);
|
||||
MOZ_RELEASE_ASSERT(isInList());
|
||||
|
||||
mScheduled = true;
|
||||
|
||||
RefPtr<MessageTask> self = this;
|
||||
mChannel->mWorkerLoop->PostTask(self.forget());
|
||||
}
|
||||
|
||||
void
|
||||
MessageChannel::MessageTask::Clear()
|
||||
{
|
||||
mChannel->AssertWorkerThread();
|
||||
|
||||
mChannel = nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
MessageChannel::DispatchMessage(Message &&aMsg)
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
Maybe<AutoNoJSAPI> nojsapi;
|
||||
if (ScriptSettingsInitialized() && NS_IsMainThread())
|
||||
nojsapi.emplace();
|
||||
|
@ -1780,7 +1838,9 @@ MessageChannel::MaybeUndeferIncall()
|
|||
--mRemoteStackDepthGuess;
|
||||
|
||||
MOZ_RELEASE_ASSERT(call.nested_level() == IPC::Message::NOT_NESTED);
|
||||
mPending.push_back(Move(call));
|
||||
RefPtr<MessageTask> task = new MessageTask(this, Move(call));
|
||||
mPending.insertBack(task);
|
||||
task->Post();
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -1803,18 +1863,10 @@ MessageChannel::EnqueuePendingMessages()
|
|||
|
||||
MaybeUndeferIncall();
|
||||
|
||||
for (size_t i = 0; i < mDeferred.size(); ++i) {
|
||||
RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
|
||||
mWorkerLoop->PostTask(task.forget());
|
||||
}
|
||||
|
||||
// XXX performance tuning knob: could process all or k pending
|
||||
// messages here, rather than enqueuing for later processing
|
||||
|
||||
for (size_t i = 0; i < mPending.size(); ++i) {
|
||||
RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
|
||||
mWorkerLoop->PostTask(task.forget());
|
||||
}
|
||||
RepostAllMessages();
|
||||
}
|
||||
|
||||
static inline bool
|
||||
|
@ -2269,16 +2321,14 @@ MessageChannel::DebugAbort(const char* file, int line, const char* cond,
|
|||
mDeferred.size());
|
||||
printf_stderr(" out-of-turn Interrupt replies stack size: %" PRIuSIZE "\n",
|
||||
mOutOfTurnReplies.size());
|
||||
printf_stderr(" Pending queue size: %" PRIuSIZE ", front to back:\n",
|
||||
mPending.size());
|
||||
|
||||
MessageQueue pending = Move(mPending);
|
||||
while (!pending.empty()) {
|
||||
while (!pending.isEmpty()) {
|
||||
printf_stderr(" [ %s%s ]\n",
|
||||
pending.front().is_interrupt() ? "intr" :
|
||||
(pending.front().is_sync() ? "sync" : "async"),
|
||||
pending.front().is_reply() ? "reply" : "");
|
||||
pending.pop_front();
|
||||
pending.getFirst()->Msg().is_interrupt() ? "intr" :
|
||||
(pending.getFirst()->Msg().is_sync() ? "sync" : "async"),
|
||||
pending.getFirst()->Msg().is_reply() ? "reply" : "");
|
||||
pending.popFirst();
|
||||
}
|
||||
|
||||
NS_RUNTIMEABORT(why);
|
||||
|
@ -2326,13 +2376,33 @@ MessageChannel::EndTimeout()
|
|||
mTimedOutMessageSeqno = 0;
|
||||
mTimedOutMessageNestedLevel = 0;
|
||||
|
||||
for (size_t i = 0; i < mPending.size(); i++) {
|
||||
// There may be messages in the queue that we expected to process from
|
||||
// OnMaybeDequeueOne. But during the timeout, that function will skip
|
||||
// some messages. Now they're ready to be processed, so we enqueue more
|
||||
// tasks.
|
||||
RefPtr<DequeueTask> task = new DequeueTask(mDequeueOneTask);
|
||||
mWorkerLoop->PostTask(task.forget());
|
||||
RepostAllMessages();
|
||||
}
|
||||
|
||||
void
|
||||
MessageChannel::RepostAllMessages()
|
||||
{
|
||||
bool needRepost = false;
|
||||
for (RefPtr<MessageTask> task : mPending) {
|
||||
if (!task->IsScheduled()) {
|
||||
needRepost = true;
|
||||
}
|
||||
}
|
||||
if (!needRepost) {
|
||||
// If everything is already scheduled to run, do nothing.
|
||||
return;
|
||||
}
|
||||
|
||||
// In some cases we may have deferred dispatch of some messages in the
|
||||
// queue. Now we want to run them again. However, we can't just re-post
|
||||
// those messages since the messages after them in mPending would then be
|
||||
// before them in the event queue. So instead we cancel everything and
|
||||
// re-post all messages in the correct order.
|
||||
MessageQueue queue = Move(mPending);
|
||||
while (RefPtr<MessageTask> task = queue.popFirst()) {
|
||||
RefPtr<MessageTask> newTask = new MessageTask(this, Move(task->Msg()));
|
||||
mPending.insertBack(newTask);
|
||||
newTask->Post();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2375,8 +2445,8 @@ MessageChannel::CancelTransaction(int transaction)
|
|||
}
|
||||
|
||||
bool foundSync = false;
|
||||
for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
|
||||
Message &msg = *it;
|
||||
for (RefPtr<MessageTask> p = mPending.getFirst(); p; ) {
|
||||
Message &msg = p->Msg();
|
||||
|
||||
// If there was a race between the parent and the child, then we may
|
||||
// have a queued sync message. We want to drop this message from the
|
||||
|
@ -2387,11 +2457,11 @@ MessageChannel::CancelTransaction(int transaction)
|
|||
MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
|
||||
IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg.seqno(), msg.transaction_id());
|
||||
foundSync = true;
|
||||
it = mPending.erase(it);
|
||||
p = p->removeAndGetNext();
|
||||
continue;
|
||||
}
|
||||
|
||||
it++;
|
||||
p = p->getNext();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -274,10 +274,6 @@ class MessageChannel : HasResultCodes
|
|||
void MaybeUndeferIncall();
|
||||
void EnqueuePendingMessages();
|
||||
|
||||
// Executed on the worker thread. Dequeues one pending message.
|
||||
bool OnMaybeDequeueOne();
|
||||
bool DequeueOne(Message *recvd);
|
||||
|
||||
// Dispatches an incoming message to its appropriate handler.
|
||||
void DispatchMessage(Message &&aMsg);
|
||||
|
||||
|
@ -309,6 +305,8 @@ class MessageChannel : HasResultCodes
|
|||
void EndTimeout();
|
||||
void CancelTransaction(int transaction);
|
||||
|
||||
void RepostAllMessages();
|
||||
|
||||
// The "remote view of stack depth" can be different than the
|
||||
// actual stack depth when there are out-of-turn replies. When we
|
||||
// receive one, our actual Interrupt stack depth doesn't decrease, but
|
||||
|
@ -454,119 +452,41 @@ class MessageChannel : HasResultCodes
|
|||
}
|
||||
|
||||
private:
|
||||
#if defined(MOZ_CRASHREPORTER) && defined(OS_WIN)
|
||||
// TODO: Remove the condition OS_WIN above once we move to GCC 5 or higher,
|
||||
// the code will be able to get compiled as std::deque will meet C++11
|
||||
// allocator requirements.
|
||||
template<class T>
|
||||
struct AnnotateAllocator
|
||||
class MessageTask :
|
||||
public CancelableRunnable,
|
||||
public LinkedListElement<RefPtr<MessageTask>>
|
||||
{
|
||||
typedef T value_type;
|
||||
AnnotateAllocator(MessageChannel& channel) : mChannel(channel) {}
|
||||
template<class U> AnnotateAllocator(const AnnotateAllocator<U>& other) :
|
||||
mChannel(other.mChannel) {}
|
||||
template<class U> bool operator==(const AnnotateAllocator<U>&) { return true; }
|
||||
template<class U> bool operator!=(const AnnotateAllocator<U>&) { return false; }
|
||||
T* allocate(size_t n) {
|
||||
void* p = ::operator new(n * sizeof(T), std::nothrow);
|
||||
if (!p && n) {
|
||||
// Sort the pending messages by its type, note the sorting algorithm
|
||||
// has to be in-place to avoid memory allocation.
|
||||
MessageQueue& q = mChannel.mPending;
|
||||
std::sort(q.begin(), q.end(), [](const Message& a, const Message& b) {
|
||||
return a.type() < b.type();
|
||||
});
|
||||
public:
|
||||
explicit MessageTask(MessageChannel* aChannel, Message&& aMessage)
|
||||
: mChannel(aChannel), mMessage(Move(aMessage)), mScheduled(false)
|
||||
{}
|
||||
|
||||
// Iterate over the sorted queue to find the message that has the
|
||||
// highest number of count.
|
||||
const char* topName = nullptr;
|
||||
const char* curName = nullptr;
|
||||
msgid_t topType = 0, curType = 0;
|
||||
uint32_t topCount = 0, curCount = 0;
|
||||
for (MessageQueue::iterator it = q.begin(); it != q.end(); ++it) {
|
||||
Message &msg = *it;
|
||||
if (msg.type() == curType) {
|
||||
++curCount;
|
||||
} else {
|
||||
if (curCount > topCount) {
|
||||
topName = curName;
|
||||
topType = curType;
|
||||
topCount = curCount;
|
||||
}
|
||||
curName = StringFromIPCMessageType(msg.type());
|
||||
curType = msg.type();
|
||||
curCount = 1;
|
||||
}
|
||||
}
|
||||
// In case the last type is the top one.
|
||||
if (curCount > topCount) {
|
||||
topName = curName;
|
||||
topType = curType;
|
||||
topCount = curCount;
|
||||
}
|
||||
NS_IMETHOD Run() override;
|
||||
nsresult Cancel() override;
|
||||
void Post();
|
||||
void Clear();
|
||||
|
||||
CrashReporter::AnnotatePendingIPC(q.size(), topCount, topName, topType);
|
||||
bool IsScheduled() const { return mScheduled; }
|
||||
|
||||
mozalloc_handle_oom(n * sizeof(T));
|
||||
}
|
||||
return static_cast<T*>(p);
|
||||
}
|
||||
void deallocate(T* p, size_t n) {
|
||||
::operator delete(p);
|
||||
}
|
||||
MessageChannel& mChannel;
|
||||
Message& Msg() { return mMessage; }
|
||||
const Message& Msg() const { return mMessage; }
|
||||
|
||||
private:
|
||||
MessageTask() = delete;
|
||||
MessageTask(const MessageTask&) = delete;
|
||||
|
||||
MessageChannel* mChannel;
|
||||
Message mMessage;
|
||||
bool mScheduled : 1;
|
||||
};
|
||||
typedef std::deque<Message, AnnotateAllocator<Message>> MessageQueue;
|
||||
#else
|
||||
typedef std::deque<Message> MessageQueue;
|
||||
#endif
|
||||
|
||||
bool ShouldRunMessage(const Message& aMsg);
|
||||
void RunMessage(MessageTask& aTask);
|
||||
|
||||
typedef LinkedList<RefPtr<MessageTask>> MessageQueue;
|
||||
typedef std::map<size_t, Message> MessageMap;
|
||||
typedef IPC::Message::msgid_t msgid_t;
|
||||
|
||||
// XXXkhuey this can almost certainly die.
|
||||
// All dequeuing tasks require a single point of cancellation,
|
||||
// which is handled via a reference-counted task.
|
||||
class RefCountedTask
|
||||
{
|
||||
public:
|
||||
explicit RefCountedTask(already_AddRefed<CancelableRunnable> aTask)
|
||||
: mTask(aTask)
|
||||
{ }
|
||||
private:
|
||||
~RefCountedTask() { }
|
||||
public:
|
||||
void Run() { mTask->Run(); }
|
||||
void Cancel() { mTask->Cancel(); }
|
||||
|
||||
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
|
||||
|
||||
private:
|
||||
RefPtr<CancelableRunnable> mTask;
|
||||
};
|
||||
|
||||
// Wrap an existing task which can be cancelled at any time
|
||||
// without the wrapper's knowledge.
|
||||
class DequeueTask : public CancelableRunnable
|
||||
{
|
||||
public:
|
||||
explicit DequeueTask(RefCountedTask* aTask)
|
||||
: mTask(aTask)
|
||||
{ }
|
||||
NS_IMETHOD Run() override {
|
||||
if (mTask) {
|
||||
mTask->Run();
|
||||
}
|
||||
return NS_OK;
|
||||
}
|
||||
nsresult Cancel() override {
|
||||
mTask = nullptr;
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
private:
|
||||
RefPtr<RefCountedTask> mTask;
|
||||
};
|
||||
|
||||
private:
|
||||
// Based on presumption the listener owns and overlives the channel,
|
||||
// this is never nullified.
|
||||
|
@ -582,9 +502,6 @@ class MessageChannel : HasResultCodes
|
|||
// during channel shutdown.
|
||||
int mWorkerLoopID;
|
||||
|
||||
// A task encapsulating dequeuing one pending message.
|
||||
RefPtr<RefCountedTask> mDequeueOneTask;
|
||||
|
||||
// Timeout periods are broken up in two to prevent system suspension from
|
||||
// triggering an abort. This method (called by WaitForEvent with a 'did
|
||||
// timeout' flag) decides if we should wait again for half of mTimeoutMs
|
||||
|
@ -671,9 +588,7 @@ class MessageChannel : HasResultCodes
|
|||
int32_t mTimedOutMessageSeqno;
|
||||
int mTimedOutMessageNestedLevel;
|
||||
|
||||
// Queue of all incoming messages, except for replies to sync and urgent
|
||||
// messages, which are delivered directly to mRecvd, and any pending urgent
|
||||
// incall, which is stored in mPendingUrgentRequest.
|
||||
// Queue of all incoming messages.
|
||||
//
|
||||
// If both this side and the other side are functioning correctly, the queue
|
||||
// can only be in certain configurations. Let
|
||||
|
@ -685,7 +600,7 @@ class MessageChannel : HasResultCodes
|
|||
//
|
||||
// The queue can only match this configuration
|
||||
//
|
||||
// A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
|
||||
// A<* (S< | C< | R< (?{mInterruptStack.size() == 1} A<* (S< | C<)))
|
||||
//
|
||||
// The other side can send as many async messages |A<*| as it wants before
|
||||
// sending us a blocking message.
|
||||
|
@ -700,7 +615,7 @@ class MessageChannel : HasResultCodes
|
|||
// |mRemoteStackDepth|, and races don't matter to the queue.)
|
||||
//
|
||||
// Final case, the other side replied to our most recent out-call |R<|.
|
||||
// If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
|
||||
// If that was the *only* out-call on our stack, |?{mInterruptStack.size() == 1}|,
|
||||
// then other side "finished with us," and went back to its own business.
|
||||
// That business might have included sending any number of async message
|
||||
// |A<*| until sending a blocking message |(S< | C<)|. If we had more than
|
||||
|
@ -725,7 +640,7 @@ class MessageChannel : HasResultCodes
|
|||
//
|
||||
// Then when processing an in-call |c|, it must be true that
|
||||
//
|
||||
// mStack.size() == c.remoteDepth
|
||||
// mInterruptStack.size() == c.remoteDepth
|
||||
//
|
||||
// I.e., my depth is actually the same as what the other side thought it
|
||||
// was when it sent in-call |c|. If this fails to hold, we have detected
|
||||
|
|
Загрузка…
Ссылка в новой задаче