simplify some *Channel code. enforce more SyncChannel invariants

This commit is contained in:
Chris Jones 2009-08-19 10:44:56 -05:00
Родитель db2bde3f0d
Коммит 57c28a4b6c
6 изменённых файлов: 74 добавлений и 97 удалений

Просмотреть файл

@ -73,7 +73,7 @@ AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop)
::GetMessageLoop(BrowserProcessSubThread::IO);
// FIXME assuming that the parent waits for the OnConnected event.
// FIXME see GeckoChildProcessHost.cpp. bad assumption!
mChannelState = ChannelIdle;
mChannelState = ChannelConnected;
}
mIOLoop = aIOLoop;
@ -102,11 +102,10 @@ AsyncChannel::Close()
bool
AsyncChannel::Send(Message* msg)
{
NS_ASSERTION(ChannelIdle == mChannelState
|| ChannelWaiting == mChannelState,
NS_ASSERTION(ChannelConnected == mChannelState,
"trying to Send() to a channel not yet open");
NS_PRECONDITION(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
return true;
@ -154,7 +153,7 @@ AsyncChannel::OnMessageReceived(const Message& msg)
void
AsyncChannel::OnChannelConnected(int32 peer_pid)
{
mChannelState = ChannelIdle;
mChannelState = ChannelConnected;
}
void

Просмотреть файл

@ -66,8 +66,7 @@ protected:
enum ChannelState {
ChannelClosed,
ChannelOpening,
ChannelIdle, // => connected
ChannelWaiting, // => connected
ChannelConnected,
ChannelError
};

Просмотреть файл

@ -57,16 +57,15 @@ namespace ipc {
bool
RPCChannel::Call(Message* msg, Message* reply)
{
NS_ASSERTION(ChannelIdle == mChannelState
|| ChannelWaiting == mChannelState,
NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
"violation of sync handler invariant");
NS_ASSERTION(ChannelConnected == mChannelState,
"trying to Send() to a channel not yet open");
NS_PRECONDITION(msg->is_rpc(), "can only Call() RPC messages here");
NS_PRECONDITION(msg->is_rpc(),
"can only Call() RPC messages here");
mMutex.Lock();
mChannelState = ChannelWaiting;
msg->set_rpc_remote_stack_depth(mRemoteStackDepth);
mPending.push(*msg);
@ -108,10 +107,6 @@ RPCChannel::Call(Message* msg, Message* reply)
*reply = recvd;
}
if (!WaitingForReply()) {
mChannelState = ChannelIdle;
}
mMutex.Unlock();
return !isError;
}
@ -129,8 +124,6 @@ RPCChannel::Call(Message* msg, Message* reply)
}
}
delete msg;
return true;
}
@ -208,13 +201,22 @@ RPCChannel::OnMessageReceived(const Message& msg)
MutexAutoLock lock(mMutex);
if (0 == StackDepth()) {
// wake up the worker, there's work to do
// wake up the worker, there's a new in-call to process
// NB: the interaction between this and SyncChannel is rather
// subtle. We may be awaiting a synchronous reply when this
// code is executed. If that's the case, posting this in-call
// to the worker will defer processing of the in-call until
// after the synchronous reply is received.
// subtle. It's possible for us to send a sync message
// exactly when the other side sends an RPC in-call. A sync
// handler invariant is that the sync message must be replied
// to before sending any other blocking message, so we know
// that the other side must reply ASAP to the sync message we
// just sent. Thus by queuing this RPC in-call in that
// situation, we specify an order on the previously unordered
// messages and satisfy all invariants.
//
// It's not possible for us to otherwise receive an RPC
// in-call while awaiting a sync response in any case where
// both us and the other side are behaving legally. Is it
// worth trying to detect this oddball case?
mWorkerLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnIncall, msg));

Просмотреть файл

@ -82,12 +82,6 @@ public:
virtual void OnMessageReceived(const Message& msg);
private:
// Executed on worker thread
virtual bool WaitingForReply() {
mMutex.AssertCurrentThreadOwns();
return mPending.size() > 0 || SyncChannel::WaitingForReply();
}
void OnIncall(const Message& msg);
void ProcessIncall(const Message& call, size_t stackDepth);

Просмотреть файл

@ -57,64 +57,49 @@ namespace ipc {
bool
SyncChannel::Send(Message* msg, Message* reply)
{
NS_ASSERTION(ChannelIdle == mChannelState
|| ChannelWaiting == mChannelState,
NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
"violation of sync handler invariant");
NS_ASSERTION(ChannelConnected == mChannelState,
"trying to Send() to a channel not yet open");
NS_PRECONDITION(msg->is_sync(), "can only Send() sync messages here");
MutexAutoLock lock(mMutex);
mChannelState = ChannelWaiting;
mPendingReply = msg->type() + 1;
/*assert*/AsyncChannel::Send(msg);
while (1) {
// here we're waiting for something to happen. it may be either:
// (1) the reply we're waiting for (mPendingReply)
// or
// (2) any other message
//
// In case (1), we return this reply back to the caller.
// In case (2), we defer processing of the message until our reply
// comes back.
mCvar.Wait();
// wait for the next sync message to arrive
mCvar.Wait();
if (mRecvd.is_reply() && mPendingReply == mRecvd.type()) {
// case (1)
mPendingReply = 0;
*reply = mRecvd;
// we just received a synchronous message from the other side.
// If it's not the reply we were awaiting, there's a serious
// error: either a mistimed/malformed message or a sync in-message
// that raced with our sync out-message.
// (NB: IPDL prevents the latter from occuring in actor code)
if (!WaitingForReply()) {
mChannelState = ChannelIdle;
}
// FIXME/cjones: real error handling
NS_ABORT_IF_FALSE(mRecvd.is_reply() && mPendingReply == mRecvd.type(),
"unexpected sync message");
return true;
}
else {
// case (2)
NS_ASSERTION(!mRecvd.is_reply(), "can't process replies here");
// post a task to our own event loop; this delays processing
// of mRecvd
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, mRecvd));
}
}
mPendingReply = 0;
*reply = mRecvd;
return true;
}
void
SyncChannel::OnDispatchMessage(const Message& msg)
{
NS_ASSERTION(!msg.is_reply(), "can't process replies here");
NS_ASSERTION(!msg.is_rpc(), "sync or async only here");
if (!msg.is_sync()) {
return AsyncChannel::OnDispatchMessage(msg);
}
NS_ABORT_IF_FALSE(msg.is_sync(), "only sync messages here");
Message* reply;
switch (static_cast<SyncListener*>(mListener)->OnMessageReceived(msg, reply)) {
mProcessingSyncMessage = true;
Result rv =
static_cast<SyncListener*>(mListener)->OnMessageReceived(msg, reply);
mProcessingSyncMessage = false;
switch (rv) {
case MsgProcessed:
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
@ -144,31 +129,23 @@ SyncChannel::OnDispatchMessage(const Message& msg)
void
SyncChannel::OnMessageReceived(const Message& msg)
{
mMutex.Lock();
if (ChannelIdle == mChannelState) {
// wake up the worker, there's work to do
if (msg.is_sync()) {
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
}
else {
mMutex.Unlock();
return AsyncChannel::OnMessageReceived(msg);
}
if (!msg.is_sync()) {
return AsyncChannel::OnMessageReceived(msg);
}
else if (ChannelWaiting == mChannelState) {
// let the worker know something new has happened
MutexAutoLock lock(mMutex);
if (!AwaitingSyncReply()) {
// wake up the worker, there's work to do
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
}
else {
// let the worker know a new sync message has arrived
mRecvd = msg;
mCvar.Notify();
}
else {
// FIXME/cjones: could reach here in error conditions. impl me
NOTREACHED();
}
mMutex.Unlock();
}
void

Просмотреть файл

@ -56,7 +56,6 @@ protected:
typedef mozilla::CondVar CondVar;
typedef mozilla::Mutex Mutex;
typedef uint16 MessageId;
typedef std::queue<Message> MessageQueue;
public:
class /*NS_INTERFACE_CLASS*/ SyncListener :
@ -72,7 +71,9 @@ public:
SyncChannel(SyncListener* aListener) :
AsyncChannel(aListener),
mMutex("mozilla.ipc.SyncChannel.mMutex"),
mCvar(mMutex, "mozilla.ipc.SyncChannel.mCvar")
mCvar(mMutex, "mozilla.ipc.SyncChannel.mCvar"),
mPendingReply(0),
mProcessingSyncMessage(false)
{
}
@ -81,7 +82,6 @@ public:
// FIXME/cjones: impl
}
bool Send(Message* msg) {
return AsyncChannel::Send(msg);
}
@ -94,9 +94,8 @@ public:
protected:
// Executed on the worker thread
virtual bool WaitingForReply() {
mMutex.AssertCurrentThreadOwns();
return mPendingReply != 0;
bool ProcessingSyncMessage() {
return mProcessingSyncMessage;
}
void OnDispatchMessage(const Message& aMsg);
@ -104,9 +103,16 @@ protected:
// Executed on the IO thread.
void OnSendReply(Message* msg);
// On both
bool AwaitingSyncReply() {
mMutex.AssertCurrentThreadOwns();
return mPendingReply != 0;
}
Mutex mMutex;
CondVar mCvar;
MessageId mPendingReply;
bool mProcessingSyncMessage;
Message mRecvd;
};