unblock RPC waiters for async messages, for realz

This commit is contained in:
Chris Jones 2009-09-11 02:28:09 -05:00
Родитель b332673a20
Коммит 334b5bb7d9
2 изменённых файлов: 172 добавлений и 55 удалений

Просмотреть файл

@ -65,30 +65,39 @@ RPCChannel::Call(Message* msg, Message* reply)
NS_PRECONDITION(msg->is_rpc(),
"can only Call() RPC messages here");
mMutex.Lock();
MutexAutoLock lock(mMutex);
msg->set_rpc_remote_stack_depth(mRemoteStackDepth);
mPending.push(*msg);
mStack.push(*msg);
// bypass |SyncChannel::Send| b/c RPCChannel implements its own
// waiting semantics
AsyncChannel::Send(msg);
while (1) {
// here we're waiting for something to happen. it may legally
// be either:
// (1) async msg
// (2) reply to an outstanding message (sync or rpc)
// (3) recursive call from the other side
// here we're waiting for something to happen. see long
// comment about the queue in RPCChannel.h
while (mPending.empty()) {
mCvar.Wait();
}
Message recvd = mPending.top();
Message recvd = mPending.front();
mPending.pop();
// async message. process it, go back to waiting
if (!recvd.is_sync() && !recvd.is_rpc()) {
MutexAutoUnlock unlock(mMutex);
AsyncChannel::OnDispatchMessage(recvd);
continue;
}
// something sync. Let the sync dispatcher take care of it
// (it may be an invalid message, but the sync handler will
// check that).
if (recvd.is_sync()) {
NS_ABORT_IF_FALSE(mPending.empty(),
"other side is malfunctioning");
MutexAutoUnlock unlock(mMutex);
SyncChannel::OnDispatchMessage(recvd);
@ -100,32 +109,85 @@ RPCChannel::Call(Message* msg, Message* reply)
// reply message
if (recvd.is_reply()) {
NS_ABORT_IF_FALSE(0 < mPending.size(), "invalid RPC stack");
NS_ABORT_IF_FALSE(0 < mStack.size(), "invalid RPC stack");
const Message& pending = mPending.top();
const Message& outcall = mStack.top();
if (recvd.type() != (pending.type()+1) && !recvd.is_reply_error()) {
if (recvd.type() != (outcall.type()+1) && !recvd.is_reply_error()) {
// FIXME/cjones: handle error
NS_ABORT_IF_FALSE(0, "somebody's misbehavin'");
}
// we received a reply to our most recent outstanding
// call. pop this frame and return the reply
mPending.pop();
mStack.pop();
bool isError = recvd.is_reply_error();
if (!isError) {
*reply = recvd;
}
mMutex.Unlock();
if (0 == StackDepth()) {
// this was the last outcall we were waiting on.
// flush the pending queue into the "regular" event
// queue, checking invariants along the way. see long
// comment in RPCChannel.h
bool seenBlocker = false;
// A<* (S< | C<)
while (!mPending.empty()) {
Message m = mPending.front();
mPending.pop();
if (m.is_sync()) {
NS_ABORT_IF_FALSE(!seenBlocker,
"other side is malfunctioning");
seenBlocker = true;
MessageLoop::current()->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnDelegate, m));
}
else if (m.is_rpc()) {
NS_ABORT_IF_FALSE(!seenBlocker,
"other side is malfunctioning");
seenBlocker = true;
MessageLoop::current()->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnIncall,
m));
}
else {
MessageLoop::current()->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnDelegate, m));
}
}
}
else {
// shouldn't have queued any more messages, since
// the other side is now supposed to be blocked on a
// reply from us!
if (mPending.size() > 0) {
NS_RUNTIMEABORT("other side should have been blocked");
}
}
// unlocks mMutex
return !isError;
}
// in-call
else {
// in-call. process in a new stack frame
NS_ABORT_IF_FALSE(mPending.empty(),
"other side is malfunctioning");
// "snapshot" the current stack depth while we own the Mutex
size_t stackDepth = StackDepth();
{
MutexAutoUnlock unlock(mMutex);
// someone called in to us from the other side. handle the call
ProcessIncall(recvd, stackDepth);
@ -136,12 +198,22 @@ RPCChannel::Call(Message* msg, Message* reply)
return true;
}
void
RPCChannel::OnDelegate(const Message& msg)
{
if (msg.is_sync())
return SyncChannel::OnDispatchMessage(msg);
else if (!msg.is_rpc())
return AsyncChannel::OnDispatchMessage(msg);
NS_RUNTIMEABORT("fatal logic error");
}
void
RPCChannel::OnIncall(const Message& call)
{
// We were called from the IO thread when StackDepth() == 0, and
// we were "idle". That's the "snapshot" of the state of
// the RPCChannel we use when processing this message.
// We only reach here from the "regular" event loop, when
// StackDepth() == 0. That's the "snapshot" of the state of the
// RPCChannel we use when processing this message.
ProcessIncall(call, 0);
}
@ -244,18 +316,18 @@ RPCChannel::OnMessageReceived(const Message& msg)
// NB some logic here is duplicated with SyncChannel. this is
// to allow more local reasoning
// harmless async message, enqueue for later processing.
// We might have been waiting for a sync reply, but receiving
// an async message here is allowed.
if (!msg.is_sync() && !msg.is_rpc()) {
// unlocks mutex
return AsyncChannel::OnMessageReceived(msg);
}
// NBB see the second-to-last long comment in RPCChannel.h
// describing legal queue states
// if we're waiting on a sync reply, and this message is sync,
// dispatch it to the sync message handler. It will check that
// it's a reply, and the right kind of reply, then do its
// thing.
//
// since we're waiting on an RPC answer in an older stack
// frame, we know we'll eventually pop back to the
// RPCChannel::Call frame where we're awaiting the RPC reply.
// so the queue won't be forgotten!
if (AwaitingSyncReply()
&& msg.is_sync()) {
// wake up worker thread (at SyncChannel::Send) awaiting
@ -265,6 +337,14 @@ RPCChannel::OnMessageReceived(const Message& msg)
return;
}
// waiting on a sync reply, but got an async message. that's OK,
// but we defer processing of it until the sync reply comes in.
if (AwaitingSyncReply()
&& !msg.is_sync() && !msg.is_rpc()) {
mPending.push(msg);
return;
}
// if this side and the other were functioning correctly, we'd
// never reach this case. RPCChannel::Call explicitly checks
// for and disallows this case. so if we reach here, the other
@ -275,10 +355,10 @@ RPCChannel::OnMessageReceived(const Message& msg)
return; // not reached
}
// otherwise, we (legally) either got (i) sync in-msg; (ii)
// re-entrant rpc in-call; (iii) rpc reply we were awaiting.
// Dispatch to the worker, where invariants are checked and
// the message processed.
// otherwise, we (legally) either got (i) async msg; (ii) sync
// in-msg; (iii) re-entrant rpc in-call; (iv) rpc reply we
// were awaiting. Dispatch to the worker, where invariants
// are checked and the message processed.
mPending.push(msg);
mCvar.Notify();
}

Просмотреть файл

@ -39,7 +39,8 @@
#ifndef ipc_glue_RPCChannel_h
#define ipc_glue_RPCChannel_h 1
// FIXME/cjones probably shouldn't depend on this
// FIXME/cjones probably shouldn't depend on STL
#include <queue>
#include <stack>
#include "mozilla/ipc/SyncChannel.h"
@ -81,37 +82,73 @@ public:
// Override the SyncChannel handler so we can dispatch RPC messages
virtual void OnMessageReceived(const Message& msg);
protected:
// Only exists because we can't schedule SyncChannel::OnDispatchMessage
// or AsyncChannel::OnDispatchMessage from within Call() when we flush
// the pending queue
void OnDelegate(const Message& msg);
private:
void OnIncall(const Message& msg);
void ProcessIncall(const Message& call, size_t stackDepth);
// Called from both threads
size_t StackDepth() {
mMutex.AssertCurrentThreadOwns();
NS_ABORT_IF_FALSE(
mPending.empty()
|| (mPending.top().is_rpc() && !mPending.top().is_reply()),
"StackDepth() called from an inconsistent state");
return mPending.size();
return mStack.size();
}
//
// In quiescent states, |mPending| is a stack of all the RPC
// out-calls on which this RPCChannel is awaiting a response.
// Stack of all the RPC out-calls on which this RPCChannel is
// awaiting a response.
//
// The stack is also used by the IO thread to transfer received
// messages to the worker thread, only when the worker thread is
// awaiting an RPC response. Until the worker pops the top of the
// stack, it may (legally) contain one of
std::stack<Message> mStack;
//
// - sync in-msg (msg.is_sync() && !msg.is_reply())
// - RPC in-call (msg.is_rpc() && !msg.is_reply())
// - RPC reply (msg.is_rpc() && msg.is_reply())
// After the worker thread is blocked on an RPC out-call
// (i.e. awaiting a reply), the IO thread uses this queue to
// transfer received messages to the worker thread for processing.
// If both this side and the other side are functioning correctly,
// the queue is only allowed to have certain configurations. Let
//
// In any cases, the worker will pop the message off the stack
// and process it ASAP, returning |mPending| to a quiescent state.
// |A<| be an async in-message,
// |S<| be a sync in-message,
// |C<| be an RPC in-call,
// |R<| be an RPC reply.
//
std::stack<Message> mPending;
// After the worker thread wakes us up to process the queue,
// the queue can only match this configuration
//
// A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
//
// After we send an RPC message, the other side can send as many
// async messages |A<*| as it wants before sending back any other
// message type.
//
// The first "other message type" case is |S<|, a sync in-msg.
// The other side must be blocked, and thus can't send us any more
// messages until we process the sync in-msg.
//
// The second case is |C<|, an RPC in-call; the other side
// re-entered us while processing our out-call. It therefore must
// be blocked. (There's a subtlety here: this in-call might have
// raced with our out-call, but we detect that with the mechanism
// below, |mRemoteStackDepth|, and races don't matter to the
// queue.)
//
// Final case, the other side replied to our most recent out-call
// |R<|. If that was the *only* out-call on our stack, |{
// mStack.size() == 1}|, then other side "finished with us," and
// went back to its own business. That business might have
// included sending any number of async message |A<*| until
// sending a blocking message |(S< | C<)|. We just flush these to
// the event loop to process in order, it will do the Right Thing,
// since only the last message can be a blocking message.
// HOWEVER, if we had more than one RPC call on our stack, the
// other side *better* not have sent us another blocking message,
// because it's blocked on a reply from us.
//
std::queue<Message> mPending;
//
// This is what we think the RPC stack depth is on the "other