зеркало из https://github.com/mozilla/pjs.git
add racy RPC resolution. also add better debugging info and fix two shared-memory-race bugs.
This commit is contained in:
Родитель
104a830005
Коммит
a1bf0da8f0
|
@ -141,14 +141,23 @@ class Message : public Pickle {
|
|||
}
|
||||
|
||||
#if defined(CHROMIUM_MOZILLA_BUILD)
|
||||
size_t rpc_remote_stack_depth() const {
|
||||
size_t rpc_remote_stack_depth_guess() const {
|
||||
return header()->rpc_remote_stack_depth_guess;
|
||||
}
|
||||
|
||||
void set_rpc_remote_stack_depth(size_t depth) {
|
||||
void set_rpc_remote_stack_depth_guess(size_t depth) {
|
||||
DCHECK(is_rpc());
|
||||
header()->rpc_remote_stack_depth_guess = depth;
|
||||
}
|
||||
|
||||
size_t rpc_local_stack_depth() const {
|
||||
return header()->rpc_local_stack_depth;
|
||||
}
|
||||
|
||||
void set_rpc_local_stack_depth(size_t depth) {
|
||||
DCHECK(is_rpc());
|
||||
header()->rpc_local_stack_depth = depth;
|
||||
}
|
||||
#endif
|
||||
|
||||
template<class T>
|
||||
|
@ -262,9 +271,10 @@ class Message : public Pickle {
|
|||
uint32 num_fds; // the number of descriptors included with this message
|
||||
#endif
|
||||
#if defined(CHROMIUM_MOZILLA_BUILD)
|
||||
// For RPC messages, what *this* side of the channel thinks the
|
||||
// *other* side's RPC stack depth is.
|
||||
// For RPC messages, a guess at what the *other* side's stack depth is.
|
||||
size_t rpc_remote_stack_depth_guess;
|
||||
// The actual local stack depth.
|
||||
size_t rpc_local_stack_depth;
|
||||
#endif
|
||||
};
|
||||
#pragma pack(pop)
|
||||
|
|
|
@ -79,6 +79,8 @@ AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop)
|
|||
mChannelState = ChannelConnected;
|
||||
}
|
||||
|
||||
mChild = needOpen;
|
||||
|
||||
mIOLoop = aIOLoop;
|
||||
mWorkerLoop = MessageLoop::current();
|
||||
|
||||
|
|
|
@ -158,6 +158,7 @@ protected:
|
|||
CondVar mCvar;
|
||||
MessageLoop* mIOLoop; // thread where IO happens
|
||||
MessageLoop* mWorkerLoop; // thread where work is done
|
||||
bool mChild; // am I the child or parent?
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -64,14 +64,15 @@ RPCChannel::Call(Message* msg, Message* reply)
|
|||
NS_ABORT_IF_FALSE(msg->is_rpc(),
|
||||
"can only Call() RPC messages here");
|
||||
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
if (!Connected())
|
||||
// trying to Send() to a closed or error'd channel
|
||||
return false;
|
||||
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
msg->set_rpc_remote_stack_depth(mRemoteStackDepth);
|
||||
mStack.push(*msg);
|
||||
msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
|
||||
msg->set_rpc_local_stack_depth(StackDepth());
|
||||
|
||||
// bypass |SyncChannel::Send| b/c RPCChannel implements its own
|
||||
// waiting semantics
|
||||
|
@ -103,8 +104,8 @@ RPCChannel::Call(Message* msg, Message* reply)
|
|||
// (it may be an invalid message, but the sync handler will
|
||||
// check that).
|
||||
if (recvd.is_sync()) {
|
||||
NS_ABORT_IF_FALSE(mPending.empty(),
|
||||
"other side is malfunctioning");
|
||||
if (!mPending.empty())
|
||||
RPC_DEBUGABORT("other side is malfunctioning");
|
||||
MutexAutoUnlock unlock(mMutex);
|
||||
|
||||
SyncChannel::OnDispatchMessage(recvd);
|
||||
|
@ -116,13 +117,14 @@ RPCChannel::Call(Message* msg, Message* reply)
|
|||
|
||||
// reply message
|
||||
if (recvd.is_reply()) {
|
||||
NS_ABORT_IF_FALSE(0 < mStack.size(), "invalid RPC stack");
|
||||
if (0 == mStack.size())
|
||||
RPC_DEBUGABORT("invalid RPC stack");
|
||||
|
||||
const Message& outcall = mStack.top();
|
||||
|
||||
if (recvd.type() != (outcall.type()+1) && !recvd.is_reply_error()) {
|
||||
// FIXME/cjones: handle error
|
||||
NS_ABORT_IF_FALSE(0, "somebody's misbehavin'");
|
||||
RPC_DEBUGABORT("somebody's misbehavin'", "rpc", true);
|
||||
}
|
||||
|
||||
// we received a reply to our most recent outstanding
|
||||
|
@ -147,8 +149,9 @@ RPCChannel::Call(Message* msg, Message* reply)
|
|||
mPending.pop();
|
||||
|
||||
if (m.is_sync()) {
|
||||
NS_ABORT_IF_FALSE(!seenBlocker,
|
||||
"other side is malfunctioning");
|
||||
if (seenBlocker)
|
||||
RPC_DEBUGABORT("other side is malfunctioning",
|
||||
"sync", m.is_reply());
|
||||
seenBlocker = true;
|
||||
|
||||
MessageLoop::current()->PostTask(
|
||||
|
@ -157,8 +160,9 @@ RPCChannel::Call(Message* msg, Message* reply)
|
|||
&RPCChannel::OnDelegate, m));
|
||||
}
|
||||
else if (m.is_rpc()) {
|
||||
NS_ABORT_IF_FALSE(!seenBlocker,
|
||||
"other side is malfunctioning");
|
||||
if (seenBlocker)
|
||||
RPC_DEBUGABORT("other side is malfunctioning",
|
||||
"rpc", m.is_reply());
|
||||
seenBlocker = true;
|
||||
|
||||
MessageLoop::current()->PostTask(
|
||||
|
@ -179,18 +183,25 @@ RPCChannel::Call(Message* msg, Message* reply)
|
|||
// shouldn't have queued any more messages, since
|
||||
// the other side is now supposed to be blocked on a
|
||||
// reply from us!
|
||||
if (mPending.size() > 0) {
|
||||
NS_RUNTIMEABORT("other side should have been blocked");
|
||||
}
|
||||
if (!mPending.empty())
|
||||
RPC_DEBUGABORT("other side should have been blocked");
|
||||
}
|
||||
|
||||
// unlocks mMutex
|
||||
return !isError;
|
||||
}
|
||||
|
||||
// in-call. process in a new stack frame
|
||||
NS_ABORT_IF_FALSE(mPending.empty(),
|
||||
"other side is malfunctioning");
|
||||
// in-call. process in a new stack frame. the other side
|
||||
// should be blocked on us, hence an empty queue, except for
|
||||
// the case where this side "won" an RPC race and the other
|
||||
// side already replied back
|
||||
|
||||
if (!(mPending.empty()
|
||||
|| (1 == mPending.size()
|
||||
&& mPending.front().is_rpc()
|
||||
&& mPending.front().is_reply()
|
||||
&& 1 == StackDepth())))
|
||||
RPC_DEBUGABORT("other side is malfunctioning", "rpc");
|
||||
|
||||
// "snapshot" the current stack depth while we own the Mutex
|
||||
size_t stackDepth = StackDepth();
|
||||
|
@ -213,7 +224,7 @@ RPCChannel::OnDelegate(const Message& msg)
|
|||
return SyncChannel::OnDispatchMessage(msg);
|
||||
else if (!msg.is_rpc())
|
||||
return AsyncChannel::OnDispatchMessage(msg);
|
||||
NS_RUNTIMEABORT("fatal logic error");
|
||||
RPC_DEBUGABORT("fatal logic error");
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -227,13 +238,17 @@ RPCChannel::OnMaybeDequeueOne()
|
|||
if (mPending.empty())
|
||||
return;
|
||||
|
||||
NS_ABORT_IF_FALSE(mPending.size() == 1, "should only have one msg");
|
||||
NS_ABORT_IF_FALSE(mPending.front().is_sync(), "msg should be sync");
|
||||
if (mPending.size() != 1)
|
||||
RPC_DEBUGABORT("should only have one msg");
|
||||
if (!(mPending.front().is_rpc() || mPending.front().is_sync()))
|
||||
RPC_DEBUGABORT("msg should be RPC or sync", "async");
|
||||
|
||||
recvd = mPending.front();
|
||||
mPending.pop();
|
||||
}
|
||||
return SyncChannel::OnDispatchMessage(recvd);
|
||||
return recvd.is_sync() ?
|
||||
SyncChannel::OnDispatchMessage(recvd)
|
||||
: RPCChannel::OnIncall(recvd);
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -254,17 +269,65 @@ RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
|
|||
NS_ABORT_IF_FALSE(call.is_rpc(),
|
||||
"should have been handled by SyncChannel");
|
||||
|
||||
// Race detection: see the long comment near mRemoteStackDepth
|
||||
// in RPCChannel.h
|
||||
NS_ASSERTION(stackDepth == call.rpc_remote_stack_depth(),
|
||||
"RPC in-calls have raced!");
|
||||
// Race detection: see the long comment near
|
||||
// mRemoteStackDepthGuess in RPCChannel.h. "Remote" stack depth
|
||||
// means our side, and "local" means other side.
|
||||
if (call.rpc_remote_stack_depth_guess() != stackDepth) {
|
||||
NS_WARNING("RPC in-calls have raced!");
|
||||
|
||||
// assumption: at this point, we have one call on our stack,
|
||||
// as does the other side. But both we and the other side
|
||||
// think that the opposite side *doesn't* have any calls on
|
||||
// its stack. We need to verify this because race resolution
|
||||
// depends on this fact.
|
||||
if (!((1 == stackDepth && 0 == mRemoteStackDepthGuess)
|
||||
&& (1 == call.rpc_local_stack_depth()
|
||||
&& 0 == call.rpc_remote_stack_depth_guess())))
|
||||
// TODO this /could/ be construed as evidence of a
|
||||
// misbehaving process, so should probably go through
|
||||
// regular error-handling channels
|
||||
RPC_DEBUGABORT("fatal logic error");
|
||||
|
||||
// the "winner", if there is one, gets to defer processing of
|
||||
// the other side's in-call
|
||||
bool defer;
|
||||
const char* winner;
|
||||
switch (mRacePolicy) {
|
||||
case RRPChildWins:
|
||||
winner = "child";
|
||||
defer = mChild;
|
||||
break;
|
||||
case RRPParentWins:
|
||||
winner = "parent";
|
||||
defer = !mChild;
|
||||
break;
|
||||
case RRPError:
|
||||
NS_RUNTIMEABORT("NYI: 'Error' RPC race policy");
|
||||
return;
|
||||
default:
|
||||
NS_RUNTIMEABORT("not reached");
|
||||
return;
|
||||
}
|
||||
|
||||
printf(" (%s won, so we're%sdeferring)\n",
|
||||
winner, defer ? " " : " not ");
|
||||
|
||||
if (defer) {
|
||||
mWorkerLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(this, &RPCChannel::OnIncall, call));
|
||||
return;
|
||||
}
|
||||
|
||||
// we "lost" and need to process the other side's in-call
|
||||
}
|
||||
|
||||
Message* reply = nsnull;
|
||||
|
||||
++mRemoteStackDepth;
|
||||
++mRemoteStackDepthGuess;
|
||||
Result rv =
|
||||
static_cast<RPCListener*>(mListener)->OnCallReceived(call, reply);
|
||||
--mRemoteStackDepth;
|
||||
--mRemoteStackDepthGuess;
|
||||
|
||||
switch (rv) {
|
||||
case MsgProcessed:
|
||||
|
@ -358,19 +421,31 @@ RPCChannel::OnMessageReceived(const Message& msg)
|
|||
|
||||
mWorkerLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(this,
|
||||
&RPCChannel::OnMaybeDequeueOne));
|
||||
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
|
||||
return;
|
||||
}
|
||||
|
||||
// OK: the RPC channel is idle, and we received an in-call.
|
||||
// wake up the worker thread
|
||||
// wake up the worker thread.
|
||||
//
|
||||
// Because of RPC race resolution, there's another sublety
|
||||
// here. We can't enqueue an OnInCall() event directly,
|
||||
// because while this code is executing, the worker thread
|
||||
// concurrently might be making (or preparing to make) an
|
||||
// out-call. If so, and race resolution is ParentWins or
|
||||
// ChildWins, and this side is the "losing" side, then this
|
||||
// side needs to "unblock" and process the new in-call. If
|
||||
// the in-call were to go into the main event queue, then it
|
||||
// would be lost. So it needs to go into mPending queue along
|
||||
// with a OnMaybeDequeueOne() event. The OnMaybeDequeueOne()
|
||||
// event handles the non-racy case.
|
||||
|
||||
NS_ABORT_IF_FALSE(msg.is_rpc(), "should be RPC");
|
||||
|
||||
mWorkerLoop->PostTask(FROM_HERE,
|
||||
NewRunnableMethod(this,
|
||||
&RPCChannel::OnIncall, msg));
|
||||
mPending.push(msg);
|
||||
mWorkerLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
|
||||
}
|
||||
else {
|
||||
// we're waiting on an RPC reply
|
||||
|
@ -403,7 +478,8 @@ RPCChannel::OnMessageReceived(const Message& msg)
|
|||
// side is malfunctioning (compromised?).
|
||||
if (AwaitingSyncReply() /* msg.is_rpc() */) {
|
||||
// FIXME other error handling?
|
||||
NS_RUNTIMEABORT("the other side is malfunctioning");
|
||||
RPC_DEBUGABORT("the other side is malfunctioning",
|
||||
"rpc", msg.is_reply());
|
||||
return; // not reached
|
||||
}
|
||||
|
||||
|
|
|
@ -64,10 +64,18 @@ public:
|
|||
Message*& aReply) = 0;
|
||||
};
|
||||
|
||||
RPCChannel(RPCListener* aListener) :
|
||||
// What happens if RPC calls race?
|
||||
enum RacyRPCPolicy {
|
||||
RRPError,
|
||||
RRPChildWins,
|
||||
RRPParentWins
|
||||
};
|
||||
|
||||
RPCChannel(RPCListener* aListener, RacyRPCPolicy aPolicy=RRPChildWins) :
|
||||
SyncChannel(aListener),
|
||||
mPending(),
|
||||
mRemoteStackDepth(0)
|
||||
mRemoteStackDepthGuess(0),
|
||||
mRacePolicy(aPolicy)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -105,6 +113,37 @@ private:
|
|||
return mStack.size();
|
||||
}
|
||||
|
||||
#define RPC_DEBUGABORT(...) \
|
||||
DebugAbort(__FILE__, __LINE__,## __VA_ARGS__)
|
||||
|
||||
void DebugAbort(const char* file, int line,
|
||||
const char* why,
|
||||
const char* type="rpc", bool reply=false)
|
||||
{
|
||||
fprintf(stderr,
|
||||
"[RPCChannel][%s][%s:%d] Aborting: %s (triggered by %s%s)\n",
|
||||
mChild ? "Child" : "Parent",
|
||||
file, line,
|
||||
why,
|
||||
type, reply ? "reply" : "");
|
||||
// technically we need the mutex for this, but we're dying anyway
|
||||
fprintf(stderr, " local RPC stack size: %zu\n",
|
||||
mStack.size());
|
||||
fprintf(stderr, " remote RPC stack guess: %zd\n",
|
||||
mRemoteStackDepthGuess);
|
||||
fprintf(stderr, " Pending queue size: %zu, front to back:\n",
|
||||
mPending.size());
|
||||
while (!mPending.empty()) {
|
||||
fprintf(stderr, " [ %s%s ]\n",
|
||||
mPending.front().is_rpc() ? "rpc" :
|
||||
(mPending.front().is_sync() ? "sync" : "async"),
|
||||
mPending.front().is_reply() ? "reply" : "");
|
||||
mPending.pop();
|
||||
}
|
||||
|
||||
NS_RUNTIMEABORT(why);
|
||||
}
|
||||
|
||||
//
|
||||
// Stack of all the RPC out-calls on which this RPCChannel is
|
||||
// awaiting a response.
|
||||
|
@ -187,7 +226,9 @@ private:
|
|||
//
|
||||
// TODO: and when we detect a race, what should we actually *do* ... ?
|
||||
//
|
||||
size_t mRemoteStackDepth;
|
||||
size_t mRemoteStackDepthGuess;
|
||||
|
||||
RacyRPCPolicy mRacePolicy;
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -62,12 +62,12 @@ SyncChannel::Send(Message* msg, Message* reply)
|
|||
"violation of sync handler invariant");
|
||||
NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
|
||||
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
if (!Connected())
|
||||
// trying to Send() to a closed or error'd channel
|
||||
return false;
|
||||
|
||||
MutexAutoLock lock(mMutex);
|
||||
|
||||
mPendingReply = msg->type() + 1;
|
||||
if (!AsyncChannel::Send(msg))
|
||||
// FIXME more sophisticated error handling
|
||||
|
|
Загрузка…
Ссылка в новой задаче