Bug 1248750 - Eliminate intentional IPC crashes (r=dvander)

This commit is contained in:
Bill McCloskey 2016-02-23 15:36:06 -08:00
Родитель 0bdb70d67c
Коммит b70d78b809
3 изменённых файлов: 367 добавлений и 301 удалений

Просмотреть файл

@ -281,6 +281,192 @@ private:
CxxStackFrame& operator=(const CxxStackFrame&) = delete; CxxStackFrame& operator=(const CxxStackFrame&) = delete;
}; };
class AutoEnterTransaction
{
public:
explicit AutoEnterTransaction(MessageChannel *aChan,
int32_t aMsgSeqno,
int32_t aTransactionID,
int aPriority)
: mChan(aChan),
mActive(true),
mOutgoing(true),
mPriority(aPriority),
mSeqno(aMsgSeqno),
mTransaction(aTransactionID),
mNext(mChan->mTransactionStack)
{
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->mTransactionStack = this;
}
explicit AutoEnterTransaction(MessageChannel *aChan, const IPC::Message &aMessage)
: mChan(aChan),
mActive(true),
mOutgoing(false),
mPriority(aMessage.priority()),
mSeqno(aMessage.seqno()),
mTransaction(aMessage.transaction_id()),
mNext(mChan->mTransactionStack)
{
mChan->mMonitor->AssertCurrentThreadOwns();
if (!aMessage.is_sync()) {
mActive = false;
return;
}
mChan->mTransactionStack = this;
}
~AutoEnterTransaction() {
mChan->mMonitor->AssertCurrentThreadOwns();
if (mActive) {
mChan->mTransactionStack = mNext;
}
}
void Cancel() {
AutoEnterTransaction *cur = mChan->mTransactionStack;
MOZ_RELEASE_ASSERT(cur == this);
while (cur && cur->mPriority != IPC::Message::PRIORITY_NORMAL) {
// Note that, in the following situation, we will cancel multiple
// transactions:
// 1. Parent sends high prio message P1 to child.
// 2. Child sends high prio message C1 to child.
// 3. Child dispatches P1, parent blocks.
// 4. Child cancels.
// In this case, both P1 and C1 are cancelled. The parent will
// remove C1 from its queue when it gets the cancellation message.
MOZ_RELEASE_ASSERT(cur->mActive);
cur->mActive = false;
cur = cur->mNext;
}
mChan->mTransactionStack = cur;
MOZ_RELEASE_ASSERT(IsComplete());
}
bool AwaitingSyncReply() const {
MOZ_RELEASE_ASSERT(mActive);
if (mOutgoing) {
return true;
}
return mNext ? mNext->AwaitingSyncReply() : false;
}
int AwaitingSyncReplyPriority() const {
MOZ_RELEASE_ASSERT(mActive);
if (mOutgoing) {
return mPriority;
}
return mNext ? mNext->AwaitingSyncReplyPriority() : 0;
}
bool DispatchingSyncMessage() const {
MOZ_RELEASE_ASSERT(mActive);
if (!mOutgoing) {
return true;
}
return mNext ? mNext->DispatchingSyncMessage() : false;
}
int DispatchingSyncMessagePriority() const {
MOZ_RELEASE_ASSERT(mActive);
if (!mOutgoing) {
return mPriority;
}
return mNext ? mNext->DispatchingSyncMessagePriority() : 0;
}
int Priority() const {
MOZ_RELEASE_ASSERT(mActive);
return mPriority;
}
int32_t SequenceNumber() const {
MOZ_RELEASE_ASSERT(mActive);
return mSeqno;
}
int32_t TransactionID() const {
MOZ_RELEASE_ASSERT(mActive);
return mTransaction;
}
void ReceivedReply(const IPC::Message& aMessage) {
MOZ_RELEASE_ASSERT(aMessage.seqno() == mSeqno);
MOZ_RELEASE_ASSERT(aMessage.transaction_id() == mTransaction);
MOZ_RELEASE_ASSERT(!mReply);
IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno);
mReply = new IPC::Message(aMessage);
MOZ_RELEASE_ASSERT(IsComplete());
}
void HandleReply(const IPC::Message& aMessage) {
AutoEnterTransaction *cur = mChan->mTransactionStack;
MOZ_RELEASE_ASSERT(cur == this);
while (cur) {
MOZ_RELEASE_ASSERT(cur->mActive);
if (aMessage.seqno() == cur->mSeqno) {
cur->ReceivedReply(aMessage);
break;
}
cur = cur->mNext;
MOZ_RELEASE_ASSERT(cur);
}
}
bool IsComplete() {
return !mActive || mReply;
}
bool IsOutgoing() {
return mOutgoing;
}
bool IsCanceled() {
return !mActive;
}
bool IsBottom() const {
return !mNext;
}
bool IsError() {
MOZ_RELEASE_ASSERT(mReply);
return mReply->is_reply_error();
}
nsAutoPtr<IPC::Message> GetReply() {
return Move(mReply);
}
private:
MessageChannel *mChan;
// Active is true if this transaction is on the mChan->mTransactionStack
// stack. Generally we're not on the stack if the transaction was canceled
// or if it was for a message that doesn't require transactions (an async
// message).
bool mActive;
// Is this stack frame for an outgoing message?
bool mOutgoing;
// Properties of the message being sent/received.
int mPriority;
int32_t mSeqno;
int32_t mTransaction;
// Next item in mChan->mTransactionStack.
AutoEnterTransaction *mNext;
// Pointer the a reply received for this message, if one was received.
nsAutoPtr<IPC::Message> mReply;
};
MessageChannel::MessageChannel(MessageListener *aListener) MessageChannel::MessageChannel(MessageListener *aListener)
: mListener(aListener), : mListener(aListener),
mChannelState(ChannelClosed), mChannelState(ChannelClosed),
@ -293,17 +479,11 @@ MessageChannel::MessageChannel(MessageListener *aListener)
mInTimeoutSecondHalf(false), mInTimeoutSecondHalf(false),
mNextSeqno(0), mNextSeqno(0),
mLastSendError(SyncSendError::SendSuccess), mLastSendError(SyncSendError::SendSuccess),
mAwaitingSyncReply(false),
mAwaitingSyncReplyPriority(0),
mDispatchingSyncMessage(false),
mDispatchingSyncMessagePriority(0),
mDispatchingAsyncMessage(false), mDispatchingAsyncMessage(false),
mDispatchingAsyncMessagePriority(0), mDispatchingAsyncMessagePriority(0),
mCurrentTransaction(0), mTransactionStack(nullptr),
mPendingSendPriorities(0),
mTimedOutMessageSeqno(0), mTimedOutMessageSeqno(0),
mTimedOutMessagePriority(0), mTimedOutMessagePriority(0),
mRecvdErrors(0),
mRemoteStackDepthGuess(false), mRemoteStackDepthGuess(false),
mSawInterruptOutMsg(false), mSawInterruptOutMsg(false),
mIsWaitingForIncoming(false), mIsWaitingForIncoming(false),
@ -344,6 +524,51 @@ MessageChannel::~MessageChannel()
Clear(); Clear();
} }
// This function returns the current transaction ID. Since the notion of a
// "current transaction" can be hard to define when messages race with each
// other and one gets canceled and the other doesn't, we require that this
// function is only called when the current transaction is known to be for a
// high priority message. In that case, we know for sure what the caller is
// looking for.
int32_t
MessageChannel::CurrentHighPriorityTransaction() const
{
mMonitor->AssertCurrentThreadOwns();
if (!mTransactionStack) {
return 0;
}
MOZ_RELEASE_ASSERT(mTransactionStack->Priority() == IPC::Message::PRIORITY_HIGH);
return mTransactionStack->TransactionID();
}
bool
MessageChannel::AwaitingSyncReply() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false;
}
int
MessageChannel::AwaitingSyncReplyPriority() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->AwaitingSyncReplyPriority() : 0;
}
bool
MessageChannel::DispatchingSyncMessage() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->DispatchingSyncMessage() : false;
}
int
MessageChannel::DispatchingSyncMessagePriority() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->DispatchingSyncMessagePriority() : 0;
}
static void static void
PrintErrorMessage(Side side, const char* channelName, const char* msg) PrintErrorMessage(Side side, const char* channelName, const char* msg)
{ {
@ -405,7 +630,6 @@ MessageChannel::Clear()
// Free up any memory used by pending messages. // Free up any memory used by pending messages.
mPending.clear(); mPending.clear();
mRecvd = nullptr;
mOutOfTurnReplies.clear(); mOutOfTurnReplies.clear();
while (!mDeferred.empty()) { while (!mDeferred.empty()) {
mDeferred.pop(); mDeferred.pop();
@ -622,7 +846,7 @@ MessageChannel::ShouldDeferMessage(const Message& aMsg)
// child's message comes in, we can pretend the child hasn't quite // child's message comes in, we can pretend the child hasn't quite
// finished sending it yet. Since the message is sync, we know that the // finished sending it yet. Since the message is sync, we know that the
// child hasn't moved on yet. // child hasn't moved on yet.
return mSide == ParentSide && aMsg.transaction_id() != mCurrentTransaction; return mSide == ParentSide && aMsg.transaction_id() != CurrentHighPriorityTransaction();
} }
// Predicate that is true for messages that should be consolidated if 'compress' is set. // Predicate that is true for messages that should be consolidated if 'compress' is set.
@ -659,33 +883,17 @@ MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
return; return;
} }
MOZ_RELEASE_ASSERT(aMsg.transaction_id() == mCurrentTransaction);
MOZ_RELEASE_ASSERT(AwaitingSyncReply()); MOZ_RELEASE_ASSERT(AwaitingSyncReply());
MOZ_RELEASE_ASSERT(!mRecvd);
MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
// Rather than storing errors in mRecvd, we mark them in mTransactionStack->HandleReply(aMsg);
// mRecvdErrors. We need a counter because multiple replies can arrive
// when a timeout happens, as in the following example. Imagine the
// child is running slowly. The parent sends a sync message P1. It times
// out. The child eventually sends a sync message C1. While waiting for
// the C1 response, the child dispatches P1. In doing so, it sends sync
// message C2. At that point, it's valid for the parent to send error
// responses for both C1 and C2.
if (aMsg.is_reply_error()) {
mRecvdErrors++;
NotifyWorkerThread();
return;
}
mRecvd = new Message(aMsg);
NotifyWorkerThread(); NotifyWorkerThread();
return; return;
} }
// Prioritized messages cannot be compressed. // Prioritized messages cannot be compressed.
MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE || MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
aMsg.priority() == IPC::Message::PRIORITY_NORMAL); aMsg.priority() == IPC::Message::PRIORITY_NORMAL);
bool compress = false; bool compress = false;
if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) { if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
@ -768,8 +976,11 @@ MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
} }
void void
MessageChannel::ProcessPendingRequests(int seqno, int transaction) MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
{ {
int32_t seqno = aTransaction.SequenceNumber();
int32_t transaction = aTransaction.TransactionID();
IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d", seqno, transaction); IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d", seqno, transaction);
// Loop until there aren't any more priority messages to process. // Loop until there aren't any more priority messages to process.
@ -779,7 +990,7 @@ MessageChannel::ProcessPendingRequests(int seqno, int transaction)
// operating with weird state (as if no Send is in progress). That could // operating with weird state (as if no Send is in progress). That could
// cause even normal priority sync messages to be processed (but not // cause even normal priority sync messages to be processed (but not
// normal priority async messages), which would break message ordering. // normal priority async messages), which would break message ordering.
if (WasTransactionCanceled(transaction)) { if (aTransaction.IsCanceled()) {
return; return;
} }
@ -788,8 +999,8 @@ MessageChannel::ProcessPendingRequests(int seqno, int transaction)
for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) { for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
Message &msg = *it; Message &msg = *it;
MOZ_RELEASE_ASSERT(mCurrentTransaction == transaction, MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
"Calling ShouldDeferMessage when cancelled"); "Calling ShouldDeferMessage when cancelled");
bool defer = ShouldDeferMessage(msg); bool defer = ShouldDeferMessage(msg);
// Only log the interesting messages. // Only log the interesting messages.
@ -806,38 +1017,19 @@ MessageChannel::ProcessPendingRequests(int seqno, int transaction)
it++; it++;
} }
if (toProcess.empty()) if (toProcess.empty()) {
break; break;
}
// Processing these messages could result in more messages, so we // Processing these messages could result in more messages, so we
// loop around to check for more afterwards. // loop around to check for more afterwards.
for (auto it = toProcess.begin(); it != toProcess.end(); it++) for (auto it = toProcess.begin(); it != toProcess.end(); it++) {
ProcessPendingRequest(*it); ProcessPendingRequest(*it);
}
} }
} }
bool
MessageChannel::WasTransactionCanceled(int transaction)
{
if (transaction != mCurrentTransaction) {
// Imagine this scenario:
// 1. Child sends high prio sync message H1.
// 2. Parent sends reply to H1.
// 3. Parent sends high prio sync message H2.
// 4. Child's link thread receives H1 reply and H2 before worker wakes up.
// 5. Child dispatches H2 while still waiting for H1 reply.
// 6. Child cancels H2.
//
// In this case H1 will also be considered cancelled. However, its
// reply is still sitting in mRecvd, which can trip up later Sends. So
// we null it out here.
mRecvd = nullptr;
return true;
}
return false;
}
bool bool
MessageChannel::Send(Message* aMsg, Message* aReply) MessageChannel::Send(Message* aMsg, Message* aReply)
{ {
@ -866,8 +1058,7 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false; return false;
} }
if (mCurrentTransaction && if (DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_NORMAL &&
DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_NORMAL &&
msg->priority() > IPC::Message::PRIORITY_NORMAL) msg->priority() > IPC::Message::PRIORITY_NORMAL)
{ {
// Don't allow sending CPOWs while we're dispatching a sync message. // Don't allow sending CPOWs while we're dispatching a sync message.
@ -877,9 +1068,8 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false; return false;
} }
if (mCurrentTransaction && if (DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_URGENT ||
(DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_URGENT || DispatchingAsyncMessagePriority() == IPC::Message::PRIORITY_URGENT)
DispatchingAsyncMessagePriority() == IPC::Message::PRIORITY_URGENT))
{ {
// Generally only the parent dispatches urgent messages. And the only // Generally only the parent dispatches urgent messages. And the only
// sync messages it can send are high-priority. Mainly we want to ensure // sync messages it can send are high-priority. Mainly we want to ensure
@ -890,27 +1080,24 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false; return false;
} }
if (mCurrentTransaction && if (msg->priority() < DispatchingSyncMessagePriority() ||
(msg->priority() < DispatchingSyncMessagePriority() || msg->priority() < AwaitingSyncReplyPriority())
msg->priority() < AwaitingSyncReplyPriority()))
{ {
MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage()); MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage());
IPC_LOG("Cancel from Send"); IPC_LOG("Cancel from Send");
CancelMessage *cancel = new CancelMessage(mCurrentTransaction); CancelMessage *cancel = new CancelMessage(CurrentHighPriorityTransaction());
CancelTransaction(mCurrentTransaction); CancelTransaction(CurrentHighPriorityTransaction());
mLink->SendMessage(cancel); mLink->SendMessage(cancel);
} }
IPC_ASSERT(msg->is_sync(), "can only Send() sync messages here"); IPC_ASSERT(msg->is_sync(), "can only Send() sync messages here");
if (mCurrentTransaction) { IPC_ASSERT(msg->priority() >= DispatchingSyncMessagePriority(),
IPC_ASSERT(msg->priority() >= DispatchingSyncMessagePriority(), "can't send sync message of a lesser priority than what's being dispatched");
"can't send sync message of a lesser priority than what's being dispatched"); IPC_ASSERT(AwaitingSyncReplyPriority() <= msg->priority(),
IPC_ASSERT(AwaitingSyncReplyPriority() <= msg->priority(), "nested sync message sends must be of increasing priority");
"nested sync message sends must be of increasing priority"); IPC_ASSERT(DispatchingSyncMessagePriority() != IPC::Message::PRIORITY_URGENT,
IPC_ASSERT(DispatchingSyncMessagePriority() != IPC::Message::PRIORITY_URGENT, "not allowed to send messages while dispatching urgent messages");
"not allowed to send messages while dispatching urgent messages");
}
IPC_ASSERT(DispatchingAsyncMessagePriority() != IPC::Message::PRIORITY_URGENT, IPC_ASSERT(DispatchingAsyncMessagePriority() != IPC::Message::PRIORITY_URGENT,
"not allowed to send messages while dispatching urgent messages"); "not allowed to send messages while dispatching urgent messages");
@ -927,27 +1114,28 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
int prio = msg->priority(); int prio = msg->priority();
msgid_t replyType = msg->type() + 1; msgid_t replyType = msg->type() + 1;
AutoSetValue<bool> replies(mAwaitingSyncReply, true); AutoEnterTransaction *stackTop = mTransactionStack;
AutoSetValue<int> prioSet(mAwaitingSyncReplyPriority, prio);
AutoEnterTransaction transact(this, seqno);
int prios = mPendingSendPriorities | (1 << prio); // If the most recent message on the stack is high priority, then our
AutoSetValue<int> priosSet(mPendingSendPriorities, prios); // message should nest inside that and we use the same transaction
// ID. Otherwise we need a new transaction ID (so we use the seqno of the
int32_t transaction = mCurrentTransaction; // message we're sending).
bool nest = stackTop && stackTop->Priority() == IPC::Message::PRIORITY_HIGH;
int32_t transaction = nest ? stackTop->TransactionID() : seqno;
msg->set_transaction_id(transaction); msg->set_transaction_id(transaction);
IPC_LOG("Send seqno=%d, xid=%d, pending=%d", seqno, transaction, prios);
bool handleWindowsMessages = mListener->HandleWindowsMessages(*aMsg); bool handleWindowsMessages = mListener->HandleWindowsMessages(*aMsg);
AutoEnterTransaction transact(this, seqno, transaction, prio);
IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction);
mLink->SendMessage(msg.forget()); mLink->SendMessage(msg.forget());
while (true) { while (true) {
ProcessPendingRequests(seqno, transaction); MOZ_RELEASE_ASSERT(!transact.IsCanceled());
if (WasTransactionCanceled(transaction)) { ProcessPendingRequests(transact);
IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction); if (transact.IsComplete()) {
mLastSendError = SyncSendError::CancelledAfterSend; break;
return false;
} }
if (!Connected()) { if (!Connected()) {
ReportConnectionError("MessageChannel::Send"); ReportConnectionError("MessageChannel::Send");
@ -955,23 +1143,12 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false; return false;
} }
// See if we've received a reply.
if (mRecvdErrors) {
IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction);
mRecvdErrors--;
mLastSendError = SyncSendError::ReplyError;
return false;
}
if (mRecvd) {
IPC_LOG("Got reply: seqno=%d, xid=%d", seqno, transaction);
break;
}
MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno); MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
MOZ_RELEASE_ASSERT(!transact.IsComplete());
MOZ_RELEASE_ASSERT(mTransactionStack == &transact);
MOZ_RELEASE_ASSERT(mCurrentTransaction == transaction);
bool maybeTimedOut = !WaitForSyncNotify(handleWindowsMessages); bool maybeTimedOut = !WaitForSyncNotify(handleWindowsMessages);
if (mListener->NeedArtificialSleep()) { if (mListener->NeedArtificialSleep()) {
MonitorAutoUnlock unlock(*mMonitor); MonitorAutoUnlock unlock(*mMonitor);
mListener->ArtificialSleep(); mListener->ArtificialSleep();
@ -983,31 +1160,21 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false; return false;
} }
if (WasTransactionCanceled(transaction)) { if (transact.IsCanceled()) {
IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction); break;
mLastSendError = SyncSendError::CancelledAfterSend;
return false;
} }
MOZ_RELEASE_ASSERT(mTransactionStack == &transact);
// We only time out a message if it initiated a new transaction (i.e., // We only time out a message if it initiated a new transaction (i.e.,
// if neither side has any other message Sends on the stack). // if neither side has any other message Sends on the stack).
bool canTimeOut = transaction == seqno; bool canTimeOut = transact.IsBottom();
if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) { if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) {
// Since ShouldContinueFromTimeout drops the lock, we need to // Since ShouldContinueFromTimeout drops the lock, we need to
// re-check all our conditions here. We shouldn't time out if any of // re-check all our conditions here. We shouldn't time out if any of
// these things happen because there won't be a reply to the timed // these things happen because there won't be a reply to the timed
// out message in these cases. // out message in these cases.
if (WasTransactionCanceled(transaction)) { if (transact.IsComplete()) {
IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
mLastSendError = SyncSendError::CancelledAfterSend;
return false;
}
if (mRecvdErrors) {
mRecvdErrors--;
mLastSendError = SyncSendError::ReplyError;
return false;
}
if (mRecvd) {
break; break;
} }
@ -1018,18 +1185,36 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
mLastSendError = SyncSendError::TimedOut; mLastSendError = SyncSendError::TimedOut;
return false; return false;
} }
if (transact.IsCanceled()) {
break;
}
} }
MOZ_RELEASE_ASSERT(mRecvd); if (transact.IsCanceled()) {
MOZ_RELEASE_ASSERT(mRecvd->is_reply(), "expected reply"); IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
MOZ_RELEASE_ASSERT(!mRecvd->is_reply_error()); mLastSendError = SyncSendError::CancelledAfterSend;
MOZ_RELEASE_ASSERT(mRecvd->seqno() == seqno); return false;
MOZ_RELEASE_ASSERT(mRecvd->type() == replyType, "wrong reply type"); }
MOZ_RELEASE_ASSERT(mRecvd->is_sync());
*aReply = Move(*mRecvd); if (transact.IsError()) {
mRecvd = nullptr; IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction);
mLastSendError = SyncSendError::SendSuccess; mLastSendError = SyncSendError::ReplyError;
return false;
}
IPC_LOG("Got reply: seqno=%d, xid=%d", seqno, transaction);
nsAutoPtr<Message> reply = transact.GetReply();
MOZ_RELEASE_ASSERT(reply);
MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply");
MOZ_RELEASE_ASSERT(!reply->is_reply_error());
MOZ_RELEASE_ASSERT(reply->seqno() == seqno);
MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type");
MOZ_RELEASE_ASSERT(reply->is_sync());
*aReply = Move(*reply);
return true; return true;
} }
@ -1252,16 +1437,6 @@ MessageChannel::ProcessPendingRequest(const Message &aUrgent)
AssertWorkerThread(); AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns(); mMonitor->AssertCurrentThreadOwns();
// Note that it is possible we could have sent a sync message at
// the same time the parent process sent an urgent message, and
// therefore mPendingUrgentRequest is set *and* mRecvd is set as
// well, because the link thread received both before the worker
// thread woke up.
//
// In this case, we process the urgent message first, but we need
// to save the reply.
nsAutoPtr<Message> savedReply(mRecvd.forget());
IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent.seqno(), aUrgent.transaction_id()); IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent.seqno(), aUrgent.transaction_id());
DispatchMessage(aUrgent); DispatchMessage(aUrgent);
@ -1270,13 +1445,6 @@ MessageChannel::ProcessPendingRequest(const Message &aUrgent)
return false; return false;
} }
// In between having dispatched our reply to the parent process, and
// re-acquiring the monitor, the parent process could have already
// processed that reply and sent the reply to our sync message. If so,
// our saved reply should be empty.
IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
if (!mRecvd)
mRecvd = savedReply.forget();
return true; return true;
} }
@ -1352,8 +1520,6 @@ MessageChannel::OnMaybeDequeueOne()
return false; return false;
} }
// We should not be in a transaction yet if we're not blocked.
MOZ_RELEASE_ASSERT(mCurrentTransaction == 0);
DispatchMessage(recvd); DispatchMessage(recvd);
return true; return true;
@ -1374,7 +1540,7 @@ MessageChannel::DispatchMessage(const Message &aMsg)
AutoEnterTransaction transaction(this, aMsg); AutoEnterTransaction transaction(this, aMsg);
int id = aMsg.transaction_id(); int id = aMsg.transaction_id();
MOZ_RELEASE_ASSERT(!aMsg.is_sync() || id == mCurrentTransaction); MOZ_RELEASE_ASSERT(!aMsg.is_sync() || id == transaction.TransactionID());
{ {
MonitorAutoUnlock unlock(*mMonitor); MonitorAutoUnlock unlock(*mMonitor);
@ -1392,7 +1558,7 @@ MessageChannel::DispatchMessage(const Message &aMsg)
mListener->ArtificialSleep(); mListener->ArtificialSleep();
} }
if (mCurrentTransaction != id) { if (reply && transaction.IsCanceled()) {
// The transaction has been canceled. Don't send a reply. // The transaction has been canceled. Don't send a reply.
IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d", aMsg.seqno(), id); IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d", aMsg.seqno(), id);
reply = nullptr; reply = nullptr;
@ -1420,8 +1586,6 @@ MessageChannel::DispatchSyncMessage(const Message& aMsg, Message*& aReply)
Result rv; Result rv;
{ {
AutoSetValue<MessageChannel*> blocked(blockingVar, this); AutoSetValue<MessageChannel*> blocked(blockingVar, this);
AutoSetValue<bool> sync(mDispatchingSyncMessage, true);
AutoSetValue<int> prioSet(mDispatchingSyncMessagePriority, prio);
rv = mListener->OnMessageReceived(aMsg, aReply); rv = mListener->OnMessageReceived(aMsg, aReply);
} }
@ -2123,38 +2287,7 @@ MessageChannel::CancelTransaction(int transaction)
// tampered with (by us). If so, they don't reset the variable to the old // tampered with (by us). If so, they don't reset the variable to the old
// value. // value.
IPC_LOG("CancelTransaction: xid=%d prios=%d", transaction, mPendingSendPriorities); IPC_LOG("CancelTransaction: xid=%d", transaction);
if (mPendingSendPriorities & (1 << IPC::Message::PRIORITY_NORMAL)) {
// This isn't an assert so much as an intentional crash because we're in
// a situation that we don't know how to recover from: The child is
// awaiting a reply to a normal-priority sync message. The transaction
// that this message initiated has now been canceled. That could only
// happen if a CPOW raced with the sync message and was dispatched by
// the child while the child was awaiting the sync reply; at some point
// while dispatching the CPOW, the transaction was canceled.
//
// Notes:
//
// 1. We don't want to cancel the normal-priority sync message along
// with the CPOWs because the browser relies on these messages working
// reliably.
//
// 2. Ideally we would like to avoid dispatching CPOWs while awaiting a
// sync response. This isn't possible though. To avoid deadlock, the
// parent would have to dispatch the sync message while waiting for the
// CPOW response. However, it wouldn't have dispatched async messages at
// that time, so we would have a message ordering bug. Dispatching the
// async messages first causes other hard-to-handle situations (what if
// they send CPOWs?).
//
// 3. We would like to be able to cancel the CPOWs but not the sync
// message. However, that would leave both the parent and the child
// running code at the same time, all while the sync message is still
// outstanding. That can cause a problem where message replies are
// received out of order.
mListener->IntentionalCrash();
}
// An unusual case: We timed out a transaction which the other side then // An unusual case: We timed out a transaction which the other side then
// cancelled. In this case we just leave the timedout state and try to // cancelled. In this case we just leave the timedout state and try to
@ -2168,17 +2301,13 @@ MessageChannel::CancelTransaction(int transaction)
// 2. Parent times out H. // 2. Parent times out H.
// 3. Child dispatches H and sends nested message H' (same transaction). // 3. Child dispatches H and sends nested message H' (same transaction).
// 4. Parent dispatches H' and cancels. // 4. Parent dispatches H' and cancels.
MOZ_RELEASE_ASSERT(!mCurrentTransaction || mCurrentTransaction == transaction); MOZ_RELEASE_ASSERT(!mTransactionStack || mTransactionStack->TransactionID() == transaction);
mCurrentTransaction = 0; if (mTransactionStack) {
mTransactionStack->Cancel();
// During a timeout Send should always fail. }
MOZ_RELEASE_ASSERT(!mAwaitingSyncReply);
} else { } else {
MOZ_RELEASE_ASSERT(mCurrentTransaction == transaction); MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
mCurrentTransaction = 0; mTransactionStack->Cancel();
mAwaitingSyncReply = false;
mAwaitingSyncReplyPriority = 0;
} }
bool foundSync = false; bool foundSync = false;
@ -2187,11 +2316,8 @@ MessageChannel::CancelTransaction(int transaction)
// If there was a race between the parent and the child, then we may // If there was a race between the parent and the child, then we may
// have a queued sync message. We want to drop this message from the // have a queued sync message. We want to drop this message from the
// queue since it will get cancelled along with the transaction being // queue since if will get cancelled along with the transaction being
// cancelled. We don't bother doing this for normal priority messages // cancelled. This happens if the message in the queue is high priority.
// because the child is just going to crash in that case, and we want to
// avoid processing messages out of order in the short time before it
// crashes.
if (msg.is_sync() && msg.priority() != IPC::Message::PRIORITY_NORMAL) { if (msg.is_sync() && msg.priority() != IPC::Message::PRIORITY_NORMAL) {
MOZ_RELEASE_ASSERT(!foundSync); MOZ_RELEASE_ASSERT(!foundSync);
MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction); MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
@ -2201,36 +2327,32 @@ MessageChannel::CancelTransaction(int transaction)
continue; continue;
} }
// There may be messages in the queue that we expected to process from
// ProcessPendingRequests. However, Send will no longer call that
// function once it's been canceled. So we may need to process these
// messages in the normal event loop instead.
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
it++; it++;
} }
}
// We could also zero out mDispatchingSyncMessage here. However, that would bool
// cause a race because mDispatchingSyncMessage is a worker-thread-only MessageChannel::IsInTransaction() const
// field and we can be called on the I/O thread. Luckily, we can check to {
// see if mCurrentTransaction is 0 before examining DispatchSyncMessage. MonitorAutoLock lock(*mMonitor);
return !!mTransactionStack;
} }
void void
MessageChannel::CancelCurrentTransaction() MessageChannel::CancelCurrentTransaction()
{ {
MonitorAutoLock lock(*mMonitor); MonitorAutoLock lock(*mMonitor);
if (mCurrentTransaction) { if (DispatchingSyncMessagePriority() >= IPC::Message::PRIORITY_HIGH) {
if (DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_URGENT || if (DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_URGENT ||
DispatchingAsyncMessagePriority() == IPC::Message::PRIORITY_URGENT) DispatchingAsyncMessagePriority() == IPC::Message::PRIORITY_URGENT)
{ {
mListener->IntentionalCrash(); mListener->IntentionalCrash();
} }
IPC_LOG("Cancel requested: current xid=%d", mCurrentTransaction); IPC_LOG("Cancel requested: current xid=%d", CurrentHighPriorityTransaction());
MOZ_RELEASE_ASSERT(DispatchingSyncMessage()); MOZ_RELEASE_ASSERT(DispatchingSyncMessage());
CancelMessage *cancel = new CancelMessage(mCurrentTransaction); CancelMessage *cancel = new CancelMessage(CurrentHighPriorityTransaction());
CancelTransaction(mCurrentTransaction); CancelTransaction(CurrentHighPriorityTransaction());
mLink->SendMessage(cancel); mLink->SendMessage(cancel);
} }
} }

Просмотреть файл

@ -57,6 +57,8 @@ enum class SyncSendError {
ReplyError, ReplyError,
}; };
class AutoEnterTransaction;
class MessageChannel : HasResultCodes class MessageChannel : HasResultCodes
{ {
friend class ProcessLink; friend class ProcessLink;
@ -156,7 +158,7 @@ class MessageChannel : HasResultCodes
return !mCxxStackFrames.empty(); return !mCxxStackFrames.empty();
} }
bool IsInTransaction() const { return mCurrentTransaction != 0; } bool IsInTransaction() const;
void CancelCurrentTransaction(); void CancelCurrentTransaction();
/** /**
@ -263,7 +265,7 @@ class MessageChannel : HasResultCodes
bool InterruptEventOccurred(); bool InterruptEventOccurred();
bool HasPendingEvents(); bool HasPendingEvents();
void ProcessPendingRequests(int seqno, int transaction); void ProcessPendingRequests(AutoEnterTransaction& aTransaction);
bool ProcessPendingRequest(const Message &aUrgent); bool ProcessPendingRequest(const Message &aUrgent);
void MaybeUndeferIncall(); void MaybeUndeferIncall();
@ -366,15 +368,6 @@ class MessageChannel : HasResultCodes
return mInterruptStack.size(); return mInterruptStack.size();
} }
// Returns true if we're blocking waiting for a reply.
bool AwaitingSyncReply() const {
mMonitor->AssertCurrentThreadOwns();
return mAwaitingSyncReply;
}
int AwaitingSyncReplyPriority() const {
mMonitor->AssertCurrentThreadOwns();
return mAwaitingSyncReplyPriority;
}
bool AwaitingInterruptReply() const { bool AwaitingInterruptReply() const {
mMonitor->AssertCurrentThreadOwns(); mMonitor->AssertCurrentThreadOwns();
return !mInterruptStack.empty(); return !mInterruptStack.empty();
@ -404,17 +397,7 @@ class MessageChannel : HasResultCodes
}; };
friend class AutoEnterWaitForIncoming; friend class AutoEnterWaitForIncoming;
// Returns true if we're dispatching a sync message's callback. // Returns true if we're dispatching an async message's callback.
bool DispatchingSyncMessage() const {
AssertWorkerThread();
return mDispatchingSyncMessage;
}
int DispatchingSyncMessagePriority() const {
AssertWorkerThread();
return mDispatchingSyncMessagePriority;
}
bool DispatchingAsyncMessage() const { bool DispatchingAsyncMessage() const {
AssertWorkerThread(); AssertWorkerThread();
return mDispatchingAsyncMessage; return mDispatchingAsyncMessage;
@ -560,15 +543,6 @@ class MessageChannel : HasResultCodes
T mNew; T mNew;
}; };
// Worker thread only.
bool mAwaitingSyncReply;
int mAwaitingSyncReplyPriority;
// Set while we are dispatching a synchronous message. Only for use on the
// worker thread.
bool mDispatchingSyncMessage;
int mDispatchingSyncMessagePriority;
bool mDispatchingAsyncMessage; bool mDispatchingAsyncMessage;
int mDispatchingAsyncMessagePriority; int mDispatchingAsyncMessagePriority;
@ -590,56 +564,16 @@ class MessageChannel : HasResultCodes
// To ensure IDs are unique, we use sequence numbers for transaction IDs, // To ensure IDs are unique, we use sequence numbers for transaction IDs,
// which grow in opposite directions from child to parent. // which grow in opposite directions from child to parent.
// The current transaction ID. friend class AutoEnterTransaction;
int32_t mCurrentTransaction; AutoEnterTransaction *mTransactionStack;
// This field describes the priorities of the sync Send calls that are int32_t CurrentHighPriorityTransaction() const;
// currently on stack. If a Send call for a message with priority P is on
// the C stack, then mPendingSendPriorities & (1 << P) will be
// non-zero. Note that cancelled Send calls are not removed from this
// bitfield (until they return).
int mPendingSendPriorities;
class AutoEnterTransaction bool AwaitingSyncReply() const;
{ int AwaitingSyncReplyPriority() const;
public:
explicit AutoEnterTransaction(MessageChannel *aChan, int32_t aMsgSeqno)
: mChan(aChan),
mNewTransaction(INT32_MAX),
mOldTransaction(mChan->mCurrentTransaction)
{
mChan->mMonitor->AssertCurrentThreadOwns();
if (mChan->mCurrentTransaction == 0) {
mNewTransaction = aMsgSeqno;
mChan->mCurrentTransaction = aMsgSeqno;
}
}
explicit AutoEnterTransaction(MessageChannel *aChan, const Message &aMessage)
: mChan(aChan),
mNewTransaction(aMessage.transaction_id()),
mOldTransaction(mChan->mCurrentTransaction)
{
mChan->mMonitor->AssertCurrentThreadOwns();
if (!aMessage.is_sync()) bool DispatchingSyncMessage() const;
return; int DispatchingSyncMessagePriority() const;
MOZ_DIAGNOSTIC_ASSERT(
!(mChan->mSide == ParentSide && mOldTransaction != aMessage.transaction_id()) ||
!mOldTransaction || aMessage.priority() > mChan->AwaitingSyncReplyPriority());
mChan->mCurrentTransaction = aMessage.transaction_id();
}
~AutoEnterTransaction() {
mChan->mMonitor->AssertCurrentThreadOwns();
if (mChan->mCurrentTransaction == mNewTransaction) {
mChan->mCurrentTransaction = mOldTransaction;
}
}
private:
MessageChannel *mChan;
int32_t mNewTransaction, mOldTransaction;
};
// If a sync message times out, we store its sequence number here. Any // If a sync message times out, we store its sequence number here. Any
// future sync messages will fail immediately. Once the reply for original // future sync messages will fail immediately. Once the reply for original
@ -657,14 +591,6 @@ class MessageChannel : HasResultCodes
int32_t mTimedOutMessageSeqno; int32_t mTimedOutMessageSeqno;
int mTimedOutMessagePriority; int mTimedOutMessagePriority;
// If waiting for the reply to a sync out-message, it will be saved here
// on the I/O thread and then read and cleared by the worker thread.
nsAutoPtr<Message> mRecvd;
// If a sync message reply that is an error arrives, we increment this
// counter rather than storing it in mRecvd.
size_t mRecvdErrors;
// Queue of all incoming messages, except for replies to sync and urgent // Queue of all incoming messages, except for replies to sync and urgent
// messages, which are delivered directly to mRecvd, and any pending urgent // messages, which are delivered directly to mRecvd, and any pending urgent
// incall, which is stored in mPendingUrgentRequest. // incall, which is stored in mPendingUrgentRequest.

Просмотреть файл

@ -207,17 +207,31 @@ TestDemonParent::RunLimitedSequence(int flags)
gStackHeight--; gStackHeight--;
} }
static bool
AllowAsync(int outgoing, int incoming)
{
return incoming >= outgoing - 5;
}
bool bool
TestDemonParent::DoAction(int flags) TestDemonParent::DoAction(int flags)
{ {
if (flags & ASYNC_ONLY) { if (flags & ASYNC_ONLY) {
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]); if (AllowAsync(mOutgoing[0], mIncoming[0])) {
return SendAsyncMessage(mOutgoing[0]++); DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
} else {
return true;
}
} else { } else {
switch (Choose(3)) { switch (Choose(3)) {
case 0: case 0:
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]); if (AllowAsync(mOutgoing[0], mIncoming[0])) {
return SendAsyncMessage(mOutgoing[0]++); DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
} else {
return true;
}
case 1: { case 1: {
DEMON_LOG("Start SendHiPrioSyncMessage"); DEMON_LOG("Start SendHiPrioSyncMessage");
@ -339,8 +353,12 @@ TestDemonChild::DoAction()
{ {
switch (Choose(6)) { switch (Choose(6)) {
case 0: case 0:
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]); if (AllowAsync(mOutgoing[0], mIncoming[0])) {
return SendAsyncMessage(mOutgoing[0]++); DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
} else {
return true;
}
case 1: { case 1: {
DEMON_LOG("Start SendHiPrioSyncMessage"); DEMON_LOG("Start SendHiPrioSyncMessage");