Back out bug 1248750 on a CLOSED TREE

This commit is contained in:
Bill McCloskey 2016-03-04 16:04:41 -08:00
Родитель e8e7fc3268
Коммит f8784ed28b
3 изменённых файлов: 301 добавлений и 366 удалений

Просмотреть файл

@ -281,191 +281,6 @@ private:
CxxStackFrame& operator=(const CxxStackFrame&) = delete;
};
class AutoEnterTransaction
{
public:
explicit AutoEnterTransaction(MessageChannel *aChan,
int32_t aMsgSeqno,
int32_t aTransactionID,
int aPriority)
: mChan(aChan),
mActive(true),
mOutgoing(true),
mPriority(aPriority),
mSeqno(aMsgSeqno),
mTransaction(aTransactionID),
mNext(mChan->mTransactionStack)
{
mChan->mMonitor->AssertCurrentThreadOwns();
mChan->mTransactionStack = this;
}
explicit AutoEnterTransaction(MessageChannel *aChan, const IPC::Message &aMessage)
: mChan(aChan),
mActive(true),
mOutgoing(false),
mPriority(aMessage.priority()),
mSeqno(aMessage.seqno()),
mTransaction(aMessage.transaction_id()),
mNext(mChan->mTransactionStack)
{
mChan->mMonitor->AssertCurrentThreadOwns();
if (!aMessage.is_sync()) {
mActive = false;
return;
}
mChan->mTransactionStack = this;
}
~AutoEnterTransaction() {
mChan->mMonitor->AssertCurrentThreadOwns();
if (mActive) {
mChan->mTransactionStack = mNext;
}
}
void Cancel() {
AutoEnterTransaction *cur = mChan->mTransactionStack;
MOZ_RELEASE_ASSERT(cur == this);
while (cur && cur->mPriority != IPC::Message::PRIORITY_NORMAL) {
// Note that, in the following situation, we will cancel multiple
// transactions:
// 1. Parent sends high prio message P1 to child.
// 2. Child sends high prio message C1 to child.
// 3. Child dispatches P1, parent blocks.
// 4. Child cancels.
// In this case, both P1 and C1 are cancelled. The parent will
// remove C1 from its queue when it gets the cancellation message.
MOZ_RELEASE_ASSERT(cur->mActive);
cur->mActive = false;
cur = cur->mNext;
}
mChan->mTransactionStack = cur;
MOZ_RELEASE_ASSERT(IsComplete());
}
bool AwaitingSyncReply() const {
MOZ_RELEASE_ASSERT(mActive);
if (mOutgoing) {
return true;
}
return mNext ? mNext->AwaitingSyncReply() : false;
}
int AwaitingSyncReplyPriority() const {
MOZ_RELEASE_ASSERT(mActive);
if (mOutgoing) {
return mPriority;
}
return mNext ? mNext->AwaitingSyncReplyPriority() : 0;
}
bool DispatchingSyncMessage() const {
MOZ_RELEASE_ASSERT(mActive);
if (!mOutgoing) {
return true;
}
return mNext ? mNext->DispatchingSyncMessage() : false;
}
int DispatchingSyncMessagePriority() const {
MOZ_RELEASE_ASSERT(mActive);
if (!mOutgoing) {
return mPriority;
}
return mNext ? mNext->DispatchingSyncMessagePriority() : 0;
}
int Priority() const {
MOZ_RELEASE_ASSERT(mActive);
return mPriority;
}
int32_t SequenceNumber() const {
MOZ_RELEASE_ASSERT(mActive);
return mSeqno;
}
int32_t TransactionID() const {
MOZ_RELEASE_ASSERT(mActive);
return mTransaction;
}
void ReceivedReply(const IPC::Message& aMessage) {
MOZ_RELEASE_ASSERT(aMessage.seqno() == mSeqno);
MOZ_RELEASE_ASSERT(aMessage.transaction_id() == mTransaction);
MOZ_RELEASE_ASSERT(!mReply);
IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno);
mReply = new IPC::Message(aMessage);
MOZ_RELEASE_ASSERT(IsComplete());
}
void HandleReply(const IPC::Message& aMessage) {
AutoEnterTransaction *cur = mChan->mTransactionStack;
MOZ_RELEASE_ASSERT(cur == this);
while (cur) {
MOZ_RELEASE_ASSERT(cur->mActive);
if (aMessage.seqno() == cur->mSeqno) {
cur->ReceivedReply(aMessage);
}
cur = cur->mNext;
MOZ_RELEASE_ASSERT(cur);
}
}
bool IsComplete() {
return !mActive || mReply;
}
bool IsOutgoing() {
return mOutgoing;
}
bool IsCanceled() {
return !mActive;
}
bool IsBottom() const {
return !mNext;
}
bool IsError() {
MOZ_RELEASE_ASSERT(mReply);
return mReply->is_reply_error();
}
nsAutoPtr<IPC::Message> GetReply() {
return Move(mReply);
}
private:
MessageChannel *mChan;
// Active is true if this transaction is on the mChan->mTransactionStack
// stack. Generally we're not on the stack if the transaction was canceled
// or if it was for a message that doesn't require transactions (an async
// message).
bool mActive;
// Is this stack frame for an outgoing message?
bool mOutgoing;
// Properties of the message being sent/received.
int mPriority;
int32_t mSeqno;
int32_t mTransaction;
// Next item in mChan->mTransactionStack.
AutoEnterTransaction *mNext;
// Pointer the a reply received for this message, if one was received.
nsAutoPtr<IPC::Message> mReply;
};
MessageChannel::MessageChannel(MessageListener *aListener)
: mListener(aListener),
mChannelState(ChannelClosed),
@ -478,11 +293,17 @@ MessageChannel::MessageChannel(MessageListener *aListener)
mInTimeoutSecondHalf(false),
mNextSeqno(0),
mLastSendError(SyncSendError::SendSuccess),
mAwaitingSyncReply(false),
mAwaitingSyncReplyPriority(0),
mDispatchingSyncMessage(false),
mDispatchingSyncMessagePriority(0),
mDispatchingAsyncMessage(false),
mDispatchingAsyncMessagePriority(0),
mTransactionStack(nullptr),
mCurrentTransaction(0),
mPendingSendPriorities(0),
mTimedOutMessageSeqno(0),
mTimedOutMessagePriority(0),
mRecvdErrors(0),
mRemoteStackDepthGuess(false),
mSawInterruptOutMsg(false),
mIsWaitingForIncoming(false),
@ -523,51 +344,6 @@ MessageChannel::~MessageChannel()
Clear();
}
// This function returns the current transaction ID. Since the notion of a
// "current transaction" can be hard to define when messages race with each
// other and one gets canceled and the other doesn't, we require that this
// function is only called when the current transaction is known to be for a
// high priority message. In that case, we know for sure what the caller is
// looking for.
int32_t
MessageChannel::CurrentHighPriorityTransaction() const
{
mMonitor->AssertCurrentThreadOwns();
if (!mTransactionStack) {
return 0;
}
MOZ_RELEASE_ASSERT(mTransactionStack->Priority() == IPC::Message::PRIORITY_HIGH);
return mTransactionStack->TransactionID();
}
bool
MessageChannel::AwaitingSyncReply() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->AwaitingSyncReply() : false;
}
int
MessageChannel::AwaitingSyncReplyPriority() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->AwaitingSyncReplyPriority() : 0;
}
bool
MessageChannel::DispatchingSyncMessage() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->DispatchingSyncMessage() : false;
}
int
MessageChannel::DispatchingSyncMessagePriority() const
{
mMonitor->AssertCurrentThreadOwns();
return mTransactionStack ? mTransactionStack->DispatchingSyncMessagePriority() : 0;
}
static void
PrintErrorMessage(Side side, const char* channelName, const char* msg)
{
@ -629,6 +405,7 @@ MessageChannel::Clear()
// Free up any memory used by pending messages.
mPending.clear();
mRecvd = nullptr;
mOutOfTurnReplies.clear();
while (!mDeferred.empty()) {
mDeferred.pop();
@ -845,7 +622,7 @@ MessageChannel::ShouldDeferMessage(const Message& aMsg)
// child's message comes in, we can pretend the child hasn't quite
// finished sending it yet. Since the message is sync, we know that the
// child hasn't moved on yet.
return mSide == ParentSide && aMsg.transaction_id() != CurrentHighPriorityTransaction();
return mSide == ParentSide && aMsg.transaction_id() != mCurrentTransaction;
}
// Predicate that is true for messages that should be consolidated if 'compress' is set.
@ -882,17 +659,33 @@ MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
return;
}
MOZ_RELEASE_ASSERT(aMsg.transaction_id() == mCurrentTransaction);
MOZ_RELEASE_ASSERT(AwaitingSyncReply());
MOZ_RELEASE_ASSERT(!mRecvd);
MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
mTransactionStack->HandleReply(aMsg);
// Rather than storing errors in mRecvd, we mark them in
// mRecvdErrors. We need a counter because multiple replies can arrive
// when a timeout happens, as in the following example. Imagine the
// child is running slowly. The parent sends a sync message P1. It times
// out. The child eventually sends a sync message C1. While waiting for
// the C1 response, the child dispatches P1. In doing so, it sends sync
// message C2. At that point, it's valid for the parent to send error
// responses for both C1 and C2.
if (aMsg.is_reply_error()) {
mRecvdErrors++;
NotifyWorkerThread();
return;
}
mRecvd = new Message(aMsg);
NotifyWorkerThread();
return;
}
// Prioritized messages cannot be compressed.
MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
aMsg.priority() == IPC::Message::PRIORITY_NORMAL);
aMsg.priority() == IPC::Message::PRIORITY_NORMAL);
bool compress = false;
if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
@ -975,11 +768,8 @@ MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
}
void
MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
MessageChannel::ProcessPendingRequests(int seqno, int transaction)
{
int32_t seqno = aTransaction.SequenceNumber();
int32_t transaction = aTransaction.TransactionID();
IPC_LOG("ProcessPendingRequests for seqno=%d, xid=%d", seqno, transaction);
// Loop until there aren't any more priority messages to process.
@ -989,7 +779,7 @@ MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
// operating with weird state (as if no Send is in progress). That could
// cause even normal priority sync messages to be processed (but not
// normal priority async messages), which would break message ordering.
if (aTransaction.IsCanceled()) {
if (WasTransactionCanceled(transaction)) {
return;
}
@ -998,8 +788,8 @@ MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
Message &msg = *it;
MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
"Calling ShouldDeferMessage when cancelled");
MOZ_RELEASE_ASSERT(mCurrentTransaction == transaction,
"Calling ShouldDeferMessage when cancelled");
bool defer = ShouldDeferMessage(msg);
// Only log the interesting messages.
@ -1016,19 +806,38 @@ MessageChannel::ProcessPendingRequests(AutoEnterTransaction& aTransaction)
it++;
}
if (toProcess.empty()) {
if (toProcess.empty())
break;
}
// Processing these messages could result in more messages, so we
// loop around to check for more afterwards.
for (auto it = toProcess.begin(); it != toProcess.end(); it++) {
for (auto it = toProcess.begin(); it != toProcess.end(); it++)
ProcessPendingRequest(*it);
}
}
}
bool
MessageChannel::WasTransactionCanceled(int transaction)
{
if (transaction != mCurrentTransaction) {
// Imagine this scenario:
// 1. Child sends high prio sync message H1.
// 2. Parent sends reply to H1.
// 3. Parent sends high prio sync message H2.
// 4. Child's link thread receives H1 reply and H2 before worker wakes up.
// 5. Child dispatches H2 while still waiting for H1 reply.
// 6. Child cancels H2.
//
// In this case H1 will also be considered cancelled. However, its
// reply is still sitting in mRecvd, which can trip up later Sends. So
// we null it out here.
mRecvd = nullptr;
return true;
}
return false;
}
bool
MessageChannel::Send(Message* aMsg, Message* aReply)
{
@ -1057,7 +866,8 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false;
}
if (DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_NORMAL &&
if (mCurrentTransaction &&
DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_NORMAL &&
msg->priority() > IPC::Message::PRIORITY_NORMAL)
{
// Don't allow sending CPOWs while we're dispatching a sync message.
@ -1067,8 +877,9 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false;
}
if (DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_URGENT ||
DispatchingAsyncMessagePriority() == IPC::Message::PRIORITY_URGENT)
if (mCurrentTransaction &&
(DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_URGENT ||
DispatchingAsyncMessagePriority() == IPC::Message::PRIORITY_URGENT))
{
// Generally only the parent dispatches urgent messages. And the only
// sync messages it can send are high-priority. Mainly we want to ensure
@ -1079,24 +890,27 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false;
}
if (msg->priority() < DispatchingSyncMessagePriority() ||
msg->priority() < AwaitingSyncReplyPriority())
if (mCurrentTransaction &&
(msg->priority() < DispatchingSyncMessagePriority() ||
msg->priority() < AwaitingSyncReplyPriority()))
{
MOZ_RELEASE_ASSERT(DispatchingSyncMessage() || DispatchingAsyncMessage());
IPC_LOG("Cancel from Send");
CancelMessage *cancel = new CancelMessage(CurrentHighPriorityTransaction());
CancelTransaction(CurrentHighPriorityTransaction());
CancelMessage *cancel = new CancelMessage(mCurrentTransaction);
CancelTransaction(mCurrentTransaction);
mLink->SendMessage(cancel);
}
IPC_ASSERT(msg->is_sync(), "can only Send() sync messages here");
IPC_ASSERT(msg->priority() >= DispatchingSyncMessagePriority(),
"can't send sync message of a lesser priority than what's being dispatched");
IPC_ASSERT(AwaitingSyncReplyPriority() <= msg->priority(),
"nested sync message sends must be of increasing priority");
IPC_ASSERT(DispatchingSyncMessagePriority() != IPC::Message::PRIORITY_URGENT,
"not allowed to send messages while dispatching urgent messages");
if (mCurrentTransaction) {
IPC_ASSERT(msg->priority() >= DispatchingSyncMessagePriority(),
"can't send sync message of a lesser priority than what's being dispatched");
IPC_ASSERT(AwaitingSyncReplyPriority() <= msg->priority(),
"nested sync message sends must be of increasing priority");
IPC_ASSERT(DispatchingSyncMessagePriority() != IPC::Message::PRIORITY_URGENT,
"not allowed to send messages while dispatching urgent messages");
}
IPC_ASSERT(DispatchingAsyncMessagePriority() != IPC::Message::PRIORITY_URGENT,
"not allowed to send messages while dispatching urgent messages");
@ -1113,28 +927,27 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
int prio = msg->priority();
msgid_t replyType = msg->type() + 1;
AutoEnterTransaction *stackTop = mTransactionStack;
AutoSetValue<bool> replies(mAwaitingSyncReply, true);
AutoSetValue<int> prioSet(mAwaitingSyncReplyPriority, prio);
AutoEnterTransaction transact(this, seqno);
// If the most recent message on the stack is high priority, then our
// message should nest inside that and we use the same transaction
// ID. Otherwise we need a new transaction ID (so we use the seqno of the
// message we're sending).
bool nest = stackTop && stackTop->Priority() == IPC::Message::PRIORITY_HIGH;
int32_t transaction = nest ? stackTop->TransactionID() : seqno;
int prios = mPendingSendPriorities | (1 << prio);
AutoSetValue<int> priosSet(mPendingSendPriorities, prios);
int32_t transaction = mCurrentTransaction;
msg->set_transaction_id(transaction);
IPC_LOG("Send seqno=%d, xid=%d, pending=%d", seqno, transaction, prios);
bool handleWindowsMessages = mListener->HandleWindowsMessages(*aMsg);
AutoEnterTransaction transact(this, seqno, transaction, prio);
IPC_LOG("Send seqno=%d, xid=%d", seqno, transaction);
mLink->SendMessage(msg.forget());
while (true) {
MOZ_RELEASE_ASSERT(!transact.IsCanceled());
ProcessPendingRequests(transact);
if (transact.IsComplete()) {
break;
ProcessPendingRequests(seqno, transaction);
if (WasTransactionCanceled(transaction)) {
IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
mLastSendError = SyncSendError::CancelledAfterSend;
return false;
}
if (!Connected()) {
ReportConnectionError("MessageChannel::Send");
@ -1142,12 +955,23 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false;
}
// See if we've received a reply.
if (mRecvdErrors) {
IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction);
mRecvdErrors--;
mLastSendError = SyncSendError::ReplyError;
return false;
}
if (mRecvd) {
IPC_LOG("Got reply: seqno=%d, xid=%d", seqno, transaction);
break;
}
MOZ_RELEASE_ASSERT(!mTimedOutMessageSeqno);
MOZ_RELEASE_ASSERT(!transact.IsComplete());
MOZ_RELEASE_ASSERT(mTransactionStack == &transact);
MOZ_RELEASE_ASSERT(mCurrentTransaction == transaction);
bool maybeTimedOut = !WaitForSyncNotify(handleWindowsMessages);
if (mListener->NeedArtificialSleep()) {
MonitorAutoUnlock unlock(*mMonitor);
mListener->ArtificialSleep();
@ -1159,21 +983,31 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
return false;
}
if (transact.IsCanceled()) {
break;
if (WasTransactionCanceled(transaction)) {
IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
mLastSendError = SyncSendError::CancelledAfterSend;
return false;
}
MOZ_RELEASE_ASSERT(mTransactionStack == &transact);
// We only time out a message if it initiated a new transaction (i.e.,
// if neither side has any other message Sends on the stack).
bool canTimeOut = transact.IsBottom();
bool canTimeOut = transaction == seqno;
if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) {
// Since ShouldContinueFromTimeout drops the lock, we need to
// re-check all our conditions here. We shouldn't time out if any of
// these things happen because there won't be a reply to the timed
// out message in these cases.
if (transact.IsComplete()) {
if (WasTransactionCanceled(transaction)) {
IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
mLastSendError = SyncSendError::CancelledAfterSend;
return false;
}
if (mRecvdErrors) {
mRecvdErrors--;
mLastSendError = SyncSendError::ReplyError;
return false;
}
if (mRecvd) {
break;
}
@ -1184,36 +1018,18 @@ MessageChannel::Send(Message* aMsg, Message* aReply)
mLastSendError = SyncSendError::TimedOut;
return false;
}
if (transact.IsCanceled()) {
break;
}
}
if (transact.IsCanceled()) {
IPC_LOG("Other side canceled seqno=%d, xid=%d", seqno, transaction);
mLastSendError = SyncSendError::CancelledAfterSend;
return false;
}
MOZ_RELEASE_ASSERT(mRecvd);
MOZ_RELEASE_ASSERT(mRecvd->is_reply(), "expected reply");
MOZ_RELEASE_ASSERT(!mRecvd->is_reply_error());
MOZ_RELEASE_ASSERT(mRecvd->seqno() == seqno);
MOZ_RELEASE_ASSERT(mRecvd->type() == replyType, "wrong reply type");
MOZ_RELEASE_ASSERT(mRecvd->is_sync());
if (transact.IsError()) {
IPC_LOG("Error: seqno=%d, xid=%d", seqno, transaction);
mLastSendError = SyncSendError::ReplyError;
return false;
}
IPC_LOG("Got reply: seqno=%d, xid=%d", seqno, transaction);
nsAutoPtr<Message> reply = transact.GetReply();
MOZ_RELEASE_ASSERT(reply);
MOZ_RELEASE_ASSERT(reply->is_reply(), "expected reply");
MOZ_RELEASE_ASSERT(!reply->is_reply_error());
MOZ_RELEASE_ASSERT(reply->seqno() == seqno);
MOZ_RELEASE_ASSERT(reply->type() == replyType, "wrong reply type");
MOZ_RELEASE_ASSERT(reply->is_sync());
*aReply = Move(*reply);
*aReply = Move(*mRecvd);
mRecvd = nullptr;
mLastSendError = SyncSendError::SendSuccess;
return true;
}
@ -1436,6 +1252,16 @@ MessageChannel::ProcessPendingRequest(const Message &aUrgent)
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
// Note that it is possible we could have sent a sync message at
// the same time the parent process sent an urgent message, and
// therefore mPendingUrgentRequest is set *and* mRecvd is set as
// well, because the link thread received both before the worker
// thread woke up.
//
// In this case, we process the urgent message first, but we need
// to save the reply.
nsAutoPtr<Message> savedReply(mRecvd.forget());
IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent.seqno(), aUrgent.transaction_id());
DispatchMessage(aUrgent);
@ -1444,6 +1270,13 @@ MessageChannel::ProcessPendingRequest(const Message &aUrgent)
return false;
}
// In between having dispatched our reply to the parent process, and
// re-acquiring the monitor, the parent process could have already
// processed that reply and sent the reply to our sync message. If so,
// our saved reply should be empty.
IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
if (!mRecvd)
mRecvd = savedReply.forget();
return true;
}
@ -1519,6 +1352,8 @@ MessageChannel::OnMaybeDequeueOne()
return false;
}
// We should not be in a transaction yet if we're not blocked.
MOZ_RELEASE_ASSERT(mCurrentTransaction == 0);
DispatchMessage(recvd);
return true;
@ -1539,7 +1374,7 @@ MessageChannel::DispatchMessage(const Message &aMsg)
AutoEnterTransaction transaction(this, aMsg);
int id = aMsg.transaction_id();
MOZ_RELEASE_ASSERT(!aMsg.is_sync() || id == transaction.TransactionID());
MOZ_RELEASE_ASSERT(!aMsg.is_sync() || id == mCurrentTransaction);
{
MonitorAutoUnlock unlock(*mMonitor);
@ -1557,7 +1392,7 @@ MessageChannel::DispatchMessage(const Message &aMsg)
mListener->ArtificialSleep();
}
if (reply && transaction.IsCanceled()) {
if (mCurrentTransaction != id) {
// The transaction has been canceled. Don't send a reply.
IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d", aMsg.seqno(), id);
reply = nullptr;
@ -1585,6 +1420,8 @@ MessageChannel::DispatchSyncMessage(const Message& aMsg, Message*& aReply)
Result rv;
{
AutoSetValue<MessageChannel*> blocked(blockingVar, this);
AutoSetValue<bool> sync(mDispatchingSyncMessage, true);
AutoSetValue<int> prioSet(mDispatchingSyncMessagePriority, prio);
rv = mListener->OnMessageReceived(aMsg, aReply);
}
@ -2286,7 +2123,38 @@ MessageChannel::CancelTransaction(int transaction)
// tampered with (by us). If so, they don't reset the variable to the old
// value.
IPC_LOG("CancelTransaction: xid=%d", transaction);
IPC_LOG("CancelTransaction: xid=%d prios=%d", transaction, mPendingSendPriorities);
if (mPendingSendPriorities & (1 << IPC::Message::PRIORITY_NORMAL)) {
// This isn't an assert so much as an intentional crash because we're in
// a situation that we don't know how to recover from: The child is
// awaiting a reply to a normal-priority sync message. The transaction
// that this message initiated has now been canceled. That could only
// happen if a CPOW raced with the sync message and was dispatched by
// the child while the child was awaiting the sync reply; at some point
// while dispatching the CPOW, the transaction was canceled.
//
// Notes:
//
// 1. We don't want to cancel the normal-priority sync message along
// with the CPOWs because the browser relies on these messages working
// reliably.
//
// 2. Ideally we would like to avoid dispatching CPOWs while awaiting a
// sync response. This isn't possible though. To avoid deadlock, the
// parent would have to dispatch the sync message while waiting for the
// CPOW response. However, it wouldn't have dispatched async messages at
// that time, so we would have a message ordering bug. Dispatching the
// async messages first causes other hard-to-handle situations (what if
// they send CPOWs?).
//
// 3. We would like to be able to cancel the CPOWs but not the sync
// message. However, that would leave both the parent and the child
// running code at the same time, all while the sync message is still
// outstanding. That can cause a problem where message replies are
// received out of order.
mListener->IntentionalCrash();
}
// An unusual case: We timed out a transaction which the other side then
// cancelled. In this case we just leave the timedout state and try to
@ -2300,13 +2168,17 @@ MessageChannel::CancelTransaction(int transaction)
// 2. Parent times out H.
// 3. Child dispatches H and sends nested message H' (same transaction).
// 4. Parent dispatches H' and cancels.
MOZ_RELEASE_ASSERT(!mTransactionStack || mTransactionStack->TransactionID() == transaction);
if (mTransactionStack) {
mTransactionStack->Cancel();
}
MOZ_RELEASE_ASSERT(!mCurrentTransaction || mCurrentTransaction == transaction);
mCurrentTransaction = 0;
// During a timeout Send should always fail.
MOZ_RELEASE_ASSERT(!mAwaitingSyncReply);
} else {
MOZ_RELEASE_ASSERT(mTransactionStack->TransactionID() == transaction);
mTransactionStack->Cancel();
MOZ_RELEASE_ASSERT(mCurrentTransaction == transaction);
mCurrentTransaction = 0;
mAwaitingSyncReply = false;
mAwaitingSyncReplyPriority = 0;
}
bool foundSync = false;
@ -2315,8 +2187,11 @@ MessageChannel::CancelTransaction(int transaction)
// If there was a race between the parent and the child, then we may
// have a queued sync message. We want to drop this message from the
// queue since if will get cancelled along with the transaction being
// cancelled. This happens if the message in the queue is high priority.
// queue since it will get cancelled along with the transaction being
// cancelled. We don't bother doing this for normal priority messages
// because the child is just going to crash in that case, and we want to
// avoid processing messages out of order in the short time before it
// crashes.
if (msg.is_sync() && msg.priority() != IPC::Message::PRIORITY_NORMAL) {
MOZ_RELEASE_ASSERT(!foundSync);
MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
@ -2326,32 +2201,36 @@ MessageChannel::CancelTransaction(int transaction)
continue;
}
// There may be messages in the queue that we expected to process from
// ProcessPendingRequests. However, Send will no longer call that
// function once it's been canceled. So we may need to process these
// messages in the normal event loop instead.
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
it++;
}
}
bool
MessageChannel::IsInTransaction() const
{
MonitorAutoLock lock(*mMonitor);
return !!mTransactionStack;
// We could also zero out mDispatchingSyncMessage here. However, that would
// cause a race because mDispatchingSyncMessage is a worker-thread-only
// field and we can be called on the I/O thread. Luckily, we can check to
// see if mCurrentTransaction is 0 before examining DispatchSyncMessage.
}
void
MessageChannel::CancelCurrentTransaction()
{
MonitorAutoLock lock(*mMonitor);
if (DispatchingSyncMessagePriority() >= IPC::Message::PRIORITY_HIGH) {
if (mCurrentTransaction) {
if (DispatchingSyncMessagePriority() == IPC::Message::PRIORITY_URGENT ||
DispatchingAsyncMessagePriority() == IPC::Message::PRIORITY_URGENT)
{
mListener->IntentionalCrash();
}
IPC_LOG("Cancel requested: current xid=%d", CurrentHighPriorityTransaction());
IPC_LOG("Cancel requested: current xid=%d", mCurrentTransaction);
MOZ_RELEASE_ASSERT(DispatchingSyncMessage());
CancelMessage *cancel = new CancelMessage(CurrentHighPriorityTransaction());
CancelTransaction(CurrentHighPriorityTransaction());
CancelMessage *cancel = new CancelMessage(mCurrentTransaction);
CancelTransaction(mCurrentTransaction);
mLink->SendMessage(cancel);
}
}

Просмотреть файл

@ -57,8 +57,6 @@ enum class SyncSendError {
ReplyError,
};
class AutoEnterTransaction;
class MessageChannel : HasResultCodes
{
friend class ProcessLink;
@ -158,7 +156,7 @@ class MessageChannel : HasResultCodes
return !mCxxStackFrames.empty();
}
bool IsInTransaction() const;
bool IsInTransaction() const { return mCurrentTransaction != 0; }
void CancelCurrentTransaction();
/**
@ -265,7 +263,7 @@ class MessageChannel : HasResultCodes
bool InterruptEventOccurred();
bool HasPendingEvents();
void ProcessPendingRequests(AutoEnterTransaction& aTransaction);
void ProcessPendingRequests(int seqno, int transaction);
bool ProcessPendingRequest(const Message &aUrgent);
void MaybeUndeferIncall();
@ -368,6 +366,15 @@ class MessageChannel : HasResultCodes
return mInterruptStack.size();
}
// Returns true if we're blocking waiting for a reply.
bool AwaitingSyncReply() const {
mMonitor->AssertCurrentThreadOwns();
return mAwaitingSyncReply;
}
int AwaitingSyncReplyPriority() const {
mMonitor->AssertCurrentThreadOwns();
return mAwaitingSyncReplyPriority;
}
bool AwaitingInterruptReply() const {
mMonitor->AssertCurrentThreadOwns();
return !mInterruptStack.empty();
@ -397,7 +404,17 @@ class MessageChannel : HasResultCodes
};
friend class AutoEnterWaitForIncoming;
// Returns true if we're dispatching an async message's callback.
// Returns true if we're dispatching a sync message's callback.
bool DispatchingSyncMessage() const {
AssertWorkerThread();
return mDispatchingSyncMessage;
}
int DispatchingSyncMessagePriority() const {
AssertWorkerThread();
return mDispatchingSyncMessagePriority;
}
bool DispatchingAsyncMessage() const {
AssertWorkerThread();
return mDispatchingAsyncMessage;
@ -543,6 +560,15 @@ class MessageChannel : HasResultCodes
T mNew;
};
// Worker thread only.
bool mAwaitingSyncReply;
int mAwaitingSyncReplyPriority;
// Set while we are dispatching a synchronous message. Only for use on the
// worker thread.
bool mDispatchingSyncMessage;
int mDispatchingSyncMessagePriority;
bool mDispatchingAsyncMessage;
int mDispatchingAsyncMessagePriority;
@ -564,16 +590,56 @@ class MessageChannel : HasResultCodes
// To ensure IDs are unique, we use sequence numbers for transaction IDs,
// which grow in opposite directions from child to parent.
friend class AutoEnterTransaction;
AutoEnterTransaction *mTransactionStack;
// The current transaction ID.
int32_t mCurrentTransaction;
int32_t CurrentHighPriorityTransaction() const;
// This field describes the priorities of the sync Send calls that are
// currently on stack. If a Send call for a message with priority P is on
// the C stack, then mPendingSendPriorities & (1 << P) will be
// non-zero. Note that cancelled Send calls are not removed from this
// bitfield (until they return).
int mPendingSendPriorities;
bool AwaitingSyncReply() const;
int AwaitingSyncReplyPriority() const;
class AutoEnterTransaction
{
public:
explicit AutoEnterTransaction(MessageChannel *aChan, int32_t aMsgSeqno)
: mChan(aChan),
mNewTransaction(INT32_MAX),
mOldTransaction(mChan->mCurrentTransaction)
{
mChan->mMonitor->AssertCurrentThreadOwns();
if (mChan->mCurrentTransaction == 0) {
mNewTransaction = aMsgSeqno;
mChan->mCurrentTransaction = aMsgSeqno;
}
}
explicit AutoEnterTransaction(MessageChannel *aChan, const Message &aMessage)
: mChan(aChan),
mNewTransaction(aMessage.transaction_id()),
mOldTransaction(mChan->mCurrentTransaction)
{
mChan->mMonitor->AssertCurrentThreadOwns();
bool DispatchingSyncMessage() const;
int DispatchingSyncMessagePriority() const;
if (!aMessage.is_sync())
return;
MOZ_DIAGNOSTIC_ASSERT(
!(mChan->mSide == ParentSide && mOldTransaction != aMessage.transaction_id()) ||
!mOldTransaction || aMessage.priority() > mChan->AwaitingSyncReplyPriority());
mChan->mCurrentTransaction = aMessage.transaction_id();
}
~AutoEnterTransaction() {
mChan->mMonitor->AssertCurrentThreadOwns();
if (mChan->mCurrentTransaction == mNewTransaction) {
mChan->mCurrentTransaction = mOldTransaction;
}
}
private:
MessageChannel *mChan;
int32_t mNewTransaction, mOldTransaction;
};
// If a sync message times out, we store its sequence number here. Any
// future sync messages will fail immediately. Once the reply for original
@ -591,6 +657,14 @@ class MessageChannel : HasResultCodes
int32_t mTimedOutMessageSeqno;
int mTimedOutMessagePriority;
// If waiting for the reply to a sync out-message, it will be saved here
// on the I/O thread and then read and cleared by the worker thread.
nsAutoPtr<Message> mRecvd;
// If a sync message reply that is an error arrives, we increment this
// counter rather than storing it in mRecvd.
size_t mRecvdErrors;
// Queue of all incoming messages, except for replies to sync and urgent
// messages, which are delivered directly to mRecvd, and any pending urgent
// incall, which is stored in mPendingUrgentRequest.

Просмотреть файл

@ -207,31 +207,17 @@ TestDemonParent::RunLimitedSequence(int flags)
gStackHeight--;
}
static bool
AllowAsync(int outgoing, int incoming)
{
return incoming >= outgoing - 5;
}
bool
TestDemonParent::DoAction(int flags)
{
if (flags & ASYNC_ONLY) {
if (AllowAsync(mOutgoing[0], mIncoming[0])) {
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
} else {
return true;
}
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
} else {
switch (Choose(3)) {
case 0:
if (AllowAsync(mOutgoing[0], mIncoming[0])) {
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
} else {
return true;
}
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
case 1: {
DEMON_LOG("Start SendHiPrioSyncMessage");
@ -353,12 +339,8 @@ TestDemonChild::DoAction()
{
switch (Choose(6)) {
case 0:
if (AllowAsync(mOutgoing[0], mIncoming[0])) {
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
} else {
return true;
}
DEMON_LOG("SendAsyncMessage [%d]", mOutgoing[0]);
return SendAsyncMessage(mOutgoing[0]++);
case 1: {
DEMON_LOG("Start SendHiPrioSyncMessage");