From 334b5bb7d9e8a2462c1a9c381d8e2bc7fe98bf81 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 11 Sep 2009 02:28:09 -0500 Subject: [PATCH] unblock RPC waiters for async messages, for realz --- ipc/glue/RPCChannel.cpp | 144 +++++++++++++++++++++++++++++++--------- ipc/glue/RPCChannel.h | 83 ++++++++++++++++------- 2 files changed, 172 insertions(+), 55 deletions(-) diff --git a/ipc/glue/RPCChannel.cpp b/ipc/glue/RPCChannel.cpp index cfde38e804ff..a8674d92336c 100644 --- a/ipc/glue/RPCChannel.cpp +++ b/ipc/glue/RPCChannel.cpp @@ -65,30 +65,39 @@ RPCChannel::Call(Message* msg, Message* reply) NS_PRECONDITION(msg->is_rpc(), "can only Call() RPC messages here"); - mMutex.Lock(); + MutexAutoLock lock(mMutex); msg->set_rpc_remote_stack_depth(mRemoteStackDepth); - mPending.push(*msg); + mStack.push(*msg); // bypass |SyncChannel::Send| b/c RPCChannel implements its own // waiting semantics AsyncChannel::Send(msg); while (1) { - // here we're waiting for something to happen. it may legally - // be either: - // (1) async msg - // (2) reply to an outstanding message (sync or rpc) - // (3) recursive call from the other side - mCvar.Wait(); + // here we're waiting for something to happen. see long + // comment about the queue in RPCChannel.h + while (mPending.empty()) { + mCvar.Wait(); + } - Message recvd = mPending.top(); + Message recvd = mPending.front(); mPending.pop(); + // async message. process it, go back to waiting + if (!recvd.is_sync() && !recvd.is_rpc()) { + MutexAutoUnlock unlock(mMutex); + + AsyncChannel::OnDispatchMessage(recvd); + continue; + } + // something sync. Let the sync dispatcher take care of it // (it may be an invalid message, but the sync handler will // check that). if (recvd.is_sync()) { + NS_ABORT_IF_FALSE(mPending.empty(), + "other side is malfunctioning"); MutexAutoUnlock unlock(mMutex); SyncChannel::OnDispatchMessage(recvd); @@ -100,32 +109,85 @@ RPCChannel::Call(Message* msg, Message* reply) // reply message if (recvd.is_reply()) { - NS_ABORT_IF_FALSE(0 < mPending.size(), "invalid RPC stack"); + NS_ABORT_IF_FALSE(0 < mStack.size(), "invalid RPC stack"); - const Message& pending = mPending.top(); + const Message& outcall = mStack.top(); - if (recvd.type() != (pending.type()+1) && !recvd.is_reply_error()) { + if (recvd.type() != (outcall.type()+1) && !recvd.is_reply_error()) { // FIXME/cjones: handle error NS_ABORT_IF_FALSE(0, "somebody's misbehavin'"); } // we received a reply to our most recent outstanding // call. pop this frame and return the reply - mPending.pop(); + mStack.pop(); bool isError = recvd.is_reply_error(); if (!isError) { *reply = recvd; } - mMutex.Unlock(); + if (0 == StackDepth()) { + // this was the last outcall we were waiting on. + // flush the pending queue into the "regular" event + // queue, checking invariants along the way. see long + // comment in RPCChannel.h + bool seenBlocker = false; + + // A<* (S< | C<) + while (!mPending.empty()) { + Message m = mPending.front(); + mPending.pop(); + + if (m.is_sync()) { + NS_ABORT_IF_FALSE(!seenBlocker, + "other side is malfunctioning"); + seenBlocker = true; + + MessageLoop::current()->PostTask( + FROM_HERE, + NewRunnableMethod(this, + &RPCChannel::OnDelegate, m)); + } + else if (m.is_rpc()) { + NS_ABORT_IF_FALSE(!seenBlocker, + "other side is malfunctioning"); + seenBlocker = true; + + MessageLoop::current()->PostTask( + FROM_HERE, + NewRunnableMethod(this, + &RPCChannel::OnIncall, + m)); + } + else { + MessageLoop::current()->PostTask( + FROM_HERE, + NewRunnableMethod(this, + &RPCChannel::OnDelegate, m)); + } + } + } + else { + // shouldn't have queued any more messages, since + // the other side is now supposed to be blocked on a + // reply from us! + if (mPending.size() > 0) { + NS_RUNTIMEABORT("other side should have been blocked"); + } + } + + // unlocks mMutex return !isError; } - // in-call - else { - // "snapshot" the current stack depth while we own the Mutex - size_t stackDepth = StackDepth(); + // in-call. process in a new stack frame + NS_ABORT_IF_FALSE(mPending.empty(), + "other side is malfunctioning"); + + // "snapshot" the current stack depth while we own the Mutex + size_t stackDepth = StackDepth(); + { MutexAutoUnlock unlock(mMutex); // someone called in to us from the other side. handle the call ProcessIncall(recvd, stackDepth); @@ -136,12 +198,22 @@ RPCChannel::Call(Message* msg, Message* reply) return true; } +void +RPCChannel::OnDelegate(const Message& msg) +{ + if (msg.is_sync()) + return SyncChannel::OnDispatchMessage(msg); + else if (!msg.is_rpc()) + return AsyncChannel::OnDispatchMessage(msg); + NS_RUNTIMEABORT("fatal logic error"); +} + void RPCChannel::OnIncall(const Message& call) { - // We were called from the IO thread when StackDepth() == 0, and - // we were "idle". That's the "snapshot" of the state of - // the RPCChannel we use when processing this message. + // We only reach here from the "regular" event loop, when + // StackDepth() == 0. That's the "snapshot" of the state of the + // RPCChannel we use when processing this message. ProcessIncall(call, 0); } @@ -244,18 +316,18 @@ RPCChannel::OnMessageReceived(const Message& msg) // NB some logic here is duplicated with SyncChannel. this is // to allow more local reasoning - // harmless async message, enqueue for later processing. - // We might have been waiting for a sync reply, but receiving - // an async message here is allowed. - if (!msg.is_sync() && !msg.is_rpc()) { - // unlocks mutex - return AsyncChannel::OnMessageReceived(msg); - } + // NBB see the second-to-last long comment in RPCChannel.h + // describing legal queue states // if we're waiting on a sync reply, and this message is sync, // dispatch it to the sync message handler. It will check that // it's a reply, and the right kind of reply, then do its // thing. + // + // since we're waiting on an RPC answer in an older stack + // frame, we know we'll eventually pop back to the + // RPCChannel::Call frame where we're awaiting the RPC reply. + // so the queue won't be forgotten! if (AwaitingSyncReply() && msg.is_sync()) { // wake up worker thread (at SyncChannel::Send) awaiting @@ -265,6 +337,14 @@ RPCChannel::OnMessageReceived(const Message& msg) return; } + // waiting on a sync reply, but got an async message. that's OK, + // but we defer processing of it until the sync reply comes in. + if (AwaitingSyncReply() + && !msg.is_sync() && !msg.is_rpc()) { + mPending.push(msg); + return; + } + // if this side and the other were functioning correctly, we'd // never reach this case. RPCChannel::Call explicitly checks // for and disallows this case. so if we reach here, the other @@ -275,10 +355,10 @@ RPCChannel::OnMessageReceived(const Message& msg) return; // not reached } - // otherwise, we (legally) either got (i) sync in-msg; (ii) - // re-entrant rpc in-call; (iii) rpc reply we were awaiting. - // Dispatch to the worker, where invariants are checked and - // the message processed. + // otherwise, we (legally) either got (i) async msg; (ii) sync + // in-msg; (iii) re-entrant rpc in-call; (iv) rpc reply we + // were awaiting. Dispatch to the worker, where invariants + // are checked and the message processed. mPending.push(msg); mCvar.Notify(); } diff --git a/ipc/glue/RPCChannel.h b/ipc/glue/RPCChannel.h index 2c2acb2afdfe..85fc1a69f960 100644 --- a/ipc/glue/RPCChannel.h +++ b/ipc/glue/RPCChannel.h @@ -39,7 +39,8 @@ #ifndef ipc_glue_RPCChannel_h #define ipc_glue_RPCChannel_h 1 -// FIXME/cjones probably shouldn't depend on this +// FIXME/cjones probably shouldn't depend on STL +#include #include #include "mozilla/ipc/SyncChannel.h" @@ -81,37 +82,73 @@ public: // Override the SyncChannel handler so we can dispatch RPC messages virtual void OnMessageReceived(const Message& msg); +protected: + // Only exists because we can't schedule SyncChannel::OnDispatchMessage + // or AsyncChannel::OnDispatchMessage from within Call() when we flush + // the pending queue + void OnDelegate(const Message& msg); + private: void OnIncall(const Message& msg); void ProcessIncall(const Message& call, size_t stackDepth); + // Called from both threads size_t StackDepth() { mMutex.AssertCurrentThreadOwns(); - NS_ABORT_IF_FALSE( - mPending.empty() - || (mPending.top().is_rpc() && !mPending.top().is_reply()), - "StackDepth() called from an inconsistent state"); - - return mPending.size(); + return mStack.size(); } + // + // Stack of all the RPC out-calls on which this RPCChannel is + // awaiting a response. // - // In quiescent states, |mPending| is a stack of all the RPC - // out-calls on which this RPCChannel is awaiting a response. - // - // The stack is also used by the IO thread to transfer received - // messages to the worker thread, only when the worker thread is - // awaiting an RPC response. Until the worker pops the top of the - // stack, it may (legally) contain one of - // - // - sync in-msg (msg.is_sync() && !msg.is_reply()) - // - RPC in-call (msg.is_rpc() && !msg.is_reply()) - // - RPC reply (msg.is_rpc() && msg.is_reply()) - // - // In any cases, the worker will pop the message off the stack - // and process it ASAP, returning |mPending| to a quiescent state. - // - std::stack mPending; + std::stack mStack; + + // + // After the worker thread is blocked on an RPC out-call + // (i.e. awaiting a reply), the IO thread uses this queue to + // transfer received messages to the worker thread for processing. + // If both this side and the other side are functioning correctly, + // the queue is only allowed to have certain configurations. Let + // + // |A<| be an async in-message, + // |S<| be a sync in-message, + // |C<| be an RPC in-call, + // |R<| be an RPC reply. + // + // After the worker thread wakes us up to process the queue, + // the queue can only match this configuration + // + // A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<))) + // + // After we send an RPC message, the other side can send as many + // async messages |A<*| as it wants before sending back any other + // message type. + // + // The first "other message type" case is |S<|, a sync in-msg. + // The other side must be blocked, and thus can't send us any more + // messages until we process the sync in-msg. + // + // The second case is |C<|, an RPC in-call; the other side + // re-entered us while processing our out-call. It therefore must + // be blocked. (There's a subtlety here: this in-call might have + // raced with our out-call, but we detect that with the mechanism + // below, |mRemoteStackDepth|, and races don't matter to the + // queue.) + // + // Final case, the other side replied to our most recent out-call + // |R<|. If that was the *only* out-call on our stack, |{ + // mStack.size() == 1}|, then other side "finished with us," and + // went back to its own business. That business might have + // included sending any number of async message |A<*| until + // sending a blocking message |(S< | C<)|. We just flush these to + // the event loop to process in order, it will do the Right Thing, + // since only the last message can be a blocking message. + // HOWEVER, if we had more than one RPC call on our stack, the + // other side *better* not have sent us another blocking message, + // because it's blocked on a reply from us. + // + std::queue mPending; // // This is what we think the RPC stack depth is on the "other