зеркало из https://github.com/mozilla/gecko-dev.git
Bug 699319 - Part 1: Abstract out the mTransport and I/O thread into the Link abstraction. r=cjones
This commit is contained in:
Родитель
f16092aa21
Коммит
fce15e5c44
|
@ -54,6 +54,13 @@ struct RunnableMethodTraits<mozilla::ipc::AsyncChannel>
|
|||
static void ReleaseCallee(mozilla::ipc::AsyncChannel* obj) { }
|
||||
};
|
||||
|
||||
template<>
|
||||
struct RunnableMethodTraits<mozilla::ipc::AsyncChannel::ProcessLink>
|
||||
{
|
||||
static void RetainCallee(mozilla::ipc::AsyncChannel::ProcessLink* obj) { }
|
||||
static void ReleaseCallee(mozilla::ipc::AsyncChannel::ProcessLink* obj) { }
|
||||
};
|
||||
|
||||
// We rely on invariants about the lifetime of the transport:
|
||||
//
|
||||
// - outlives this AsyncChannel
|
||||
|
@ -102,30 +109,39 @@ public:
|
|||
namespace mozilla {
|
||||
namespace ipc {
|
||||
|
||||
AsyncChannel::AsyncChannel(AsyncListener* aListener)
|
||||
: mTransport(0),
|
||||
mListener(aListener),
|
||||
mChannelState(ChannelClosed),
|
||||
mMonitor("mozilla.ipc.AsyncChannel.mMonitor"),
|
||||
mIOLoop(),
|
||||
mWorkerLoop(),
|
||||
mChild(false),
|
||||
mChannelErrorTask(NULL),
|
||||
mExistingListener(NULL)
|
||||
AsyncChannel::Link::Link(AsyncChannel *aChan)
|
||||
: mChan(aChan)
|
||||
{
|
||||
MOZ_COUNT_CTOR(AsyncChannel);
|
||||
}
|
||||
|
||||
AsyncChannel::~AsyncChannel()
|
||||
AsyncChannel::Link::~Link()
|
||||
{
|
||||
MOZ_COUNT_DTOR(AsyncChannel);
|
||||
Clear();
|
||||
mChan = 0;
|
||||
}
|
||||
|
||||
bool
|
||||
AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
|
||||
AsyncChannel::ProcessLink::ProcessLink(AsyncChannel *aChan)
|
||||
: Link(aChan)
|
||||
, mExistingListener(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
AsyncChannel::ProcessLink::~ProcessLink()
|
||||
{
|
||||
mIOLoop = 0;
|
||||
if (mTransport) {
|
||||
mTransport->set_listener(0);
|
||||
|
||||
// we only hold a weak ref to the transport, which is "owned"
|
||||
// by GeckoChildProcess/GeckoThread
|
||||
mTransport = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::Open(mozilla::ipc::Transport* aTransport,
|
||||
MessageLoop *aIOLoop,
|
||||
Side aSide)
|
||||
{
|
||||
NS_PRECONDITION(!mTransport, "Open() called > once");
|
||||
NS_PRECONDITION(aTransport, "need transport layer");
|
||||
|
||||
// FIXME need to check for valid channel
|
||||
|
@ -140,38 +156,96 @@ AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
|
|||
// We're a child or using the new arguments. Either way, we
|
||||
// need an open.
|
||||
needOpen = true;
|
||||
mChild = (aSide == Unknown) || (aSide == Child);
|
||||
mChan->mChild = (aSide == AsyncChannel::Unknown) || (aSide == AsyncChannel::Child);
|
||||
} else {
|
||||
NS_PRECONDITION(aSide == Unknown, "expected default side arg");
|
||||
|
||||
// parent
|
||||
mChild = false;
|
||||
mChan->mChild = false;
|
||||
needOpen = false;
|
||||
aIOLoop = XRE_GetIOMessageLoop();
|
||||
// FIXME assuming that the parent waits for the OnConnected event.
|
||||
// FIXME see GeckoChildProcessHost.cpp. bad assumption!
|
||||
mChannelState = ChannelConnected;
|
||||
mChan->mChannelState = ChannelConnected;
|
||||
}
|
||||
|
||||
mIOLoop = aIOLoop;
|
||||
mWorkerLoop = MessageLoop::current();
|
||||
|
||||
NS_ASSERTION(mIOLoop, "need an IO loop");
|
||||
NS_ASSERTION(mWorkerLoop, "need a worker loop");
|
||||
NS_ASSERTION(mChan->mWorkerLoop, "need a worker loop");
|
||||
|
||||
if (needOpen) { // child process
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mChan->mMonitor);
|
||||
|
||||
mIOLoop->PostTask(FROM_HERE,
|
||||
NewRunnableMethod(this,
|
||||
&AsyncChannel::OnChannelOpened));
|
||||
NewRunnableMethod(this, &ProcessLink::OnChannelOpened));
|
||||
|
||||
// FIXME/cjones: handle errors
|
||||
while (mChannelState != ChannelConnected) {
|
||||
mMonitor.Wait();
|
||||
while (mChan->mChannelState != ChannelConnected) {
|
||||
mChan->mMonitor->Wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::EchoMessage(Message *msg)
|
||||
{
|
||||
// NB: Go through this OnMessageReceived indirection so that
|
||||
// echoing this message does the right thing for SyncChannel
|
||||
// and RPCChannel too
|
||||
mIOLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(this, &ProcessLink::OnEchoMessage, msg));
|
||||
// OnEchoMessage takes ownership of |msg|
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::SendMessage(Message *msg)
|
||||
{
|
||||
mChan->AssertWorkerThread();
|
||||
mIOLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(mTransport, &Transport::Send, msg));
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::SendClose()
|
||||
{
|
||||
mChan->AssertWorkerThread();
|
||||
mChan->mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
mIOLoop->PostTask(
|
||||
FROM_HERE, NewRunnableMethod(this, &ProcessLink::OnCloseChannel));
|
||||
}
|
||||
|
||||
AsyncChannel::AsyncChannel(AsyncListener* aListener)
|
||||
: mListener(aListener),
|
||||
mChannelState(ChannelClosed),
|
||||
mWorkerLoop(),
|
||||
mChild(false),
|
||||
mChannelErrorTask(NULL),
|
||||
mLink(NULL)
|
||||
{
|
||||
MOZ_COUNT_CTOR(AsyncChannel);
|
||||
}
|
||||
|
||||
AsyncChannel::~AsyncChannel()
|
||||
{
|
||||
MOZ_COUNT_DTOR(AsyncChannel);
|
||||
Clear();
|
||||
}
|
||||
|
||||
bool
|
||||
AsyncChannel::Open(Transport* aTransport,
|
||||
MessageLoop* aIOLoop,
|
||||
AsyncChannel::Side aSide)
|
||||
{
|
||||
ProcessLink *link;
|
||||
NS_PRECONDITION(!mLink, "Open() called > once");
|
||||
mMonitor = new RefCountedMonitor();
|
||||
mWorkerLoop = MessageLoop::current();
|
||||
mLink = link = new ProcessLink(this);
|
||||
link->Open(aTransport, aIOLoop, aSide); // n.b.: sets mChild
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -181,7 +255,12 @@ AsyncChannel::Close()
|
|||
AssertWorkerThread();
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
// n.b.: We increase the ref count of monitor temporarily
|
||||
// for the duration of this block. Otherwise, the
|
||||
// function NotifyMaybeChannelError() will call
|
||||
// ::Clear() which can free the monitor.
|
||||
nsRefPtr<RefCountedMonitor> monitor(mMonitor);
|
||||
MonitorAutoLock lock(*monitor);
|
||||
|
||||
if (ChannelError == mChannelState ||
|
||||
ChannelTimeout == mChannelState) {
|
||||
|
@ -191,7 +270,7 @@ AsyncChannel::Close()
|
|||
// also be deleted and the listener will never be notified
|
||||
// of the channel error.
|
||||
if (mListener) {
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*monitor);
|
||||
NotifyMaybeChannelError();
|
||||
}
|
||||
return;
|
||||
|
@ -217,13 +296,10 @@ void
|
|||
AsyncChannel::SynchronouslyClose()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
|
||||
mIOLoop->PostTask(
|
||||
FROM_HERE, NewRunnableMethod(this, &AsyncChannel::OnCloseChannel));
|
||||
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
mLink->SendClose();
|
||||
while (ChannelClosed != mChannelState)
|
||||
mMonitor.Wait();
|
||||
mMonitor->Wait();
|
||||
}
|
||||
|
||||
bool
|
||||
|
@ -231,18 +307,18 @@ AsyncChannel::Send(Message* _msg)
|
|||
{
|
||||
nsAutoPtr<Message> msg(_msg);
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
NS_ABORT_IF_FALSE(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
if (!Connected()) {
|
||||
ReportConnectionError("AsyncChannel");
|
||||
return false;
|
||||
}
|
||||
|
||||
SendThroughTransport(msg.forget());
|
||||
mLink->SendMessage(msg.forget());
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -253,24 +329,18 @@ AsyncChannel::Echo(Message* _msg)
|
|||
{
|
||||
nsAutoPtr<Message> msg(_msg);
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
NS_ABORT_IF_FALSE(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
if (!Connected()) {
|
||||
ReportConnectionError("AsyncChannel");
|
||||
return false;
|
||||
}
|
||||
|
||||
// NB: Go through this OnMessageReceived indirection so that
|
||||
// echoing this message does the right thing for SyncChannel
|
||||
// and RPCChannel too
|
||||
mIOLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(this, &AsyncChannel::OnEchoMessage, msg.forget()));
|
||||
// OnEchoMessage takes ownership of |msg|
|
||||
mLink->EchoMessage(msg.forget());
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -306,31 +376,21 @@ void
|
|||
AsyncChannel::SendSpecialMessage(Message* msg) const
|
||||
{
|
||||
AssertWorkerThread();
|
||||
SendThroughTransport(msg);
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::SendThroughTransport(Message* msg) const
|
||||
{
|
||||
AssertWorkerThread();
|
||||
|
||||
mIOLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(mTransport, &Transport::Send, msg));
|
||||
mLink->SendMessage(msg);
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::OnNotifyMaybeChannelError()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
// OnChannelError holds mMonitor when it posts this task and this
|
||||
// task cannot be allowed to run until OnChannelError has
|
||||
// exited. We enforce that order by grabbing the mutex here which
|
||||
// should only continue once OnChannelError has completed.
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
// nothing to do here
|
||||
}
|
||||
|
||||
|
@ -348,7 +408,7 @@ AsyncChannel::OnNotifyMaybeChannelError()
|
|||
void
|
||||
AsyncChannel::NotifyChannelClosed()
|
||||
{
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
if (ChannelClosed != mChannelState)
|
||||
NS_RUNTIMEABORT("channel should have been closed!");
|
||||
|
@ -363,7 +423,7 @@ AsyncChannel::NotifyChannelClosed()
|
|||
void
|
||||
AsyncChannel::NotifyMaybeChannelError()
|
||||
{
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
// TODO sort out Close() on this side racing with Close() on the
|
||||
// other side
|
||||
|
@ -386,16 +446,12 @@ void
|
|||
AsyncChannel::Clear()
|
||||
{
|
||||
mListener = 0;
|
||||
mIOLoop = 0;
|
||||
mWorkerLoop = 0;
|
||||
|
||||
if (mTransport) {
|
||||
mTransport->set_listener(0);
|
||||
delete mLink;
|
||||
mLink = 0;
|
||||
mMonitor = 0;
|
||||
|
||||
// we only hold a weak ref to the transport, which is "owned"
|
||||
// by GeckoChildProcess/GeckoThread
|
||||
mTransport = 0;
|
||||
}
|
||||
if (mChannelErrorTask) {
|
||||
mChannelErrorTask->Cancel();
|
||||
mChannelErrorTask = NULL;
|
||||
|
@ -480,17 +536,96 @@ AsyncChannel::ReportConnectionError(const char* channelName) const
|
|||
mListener->OnProcessingError(MsgDropped);
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::DispatchOnChannelConnected(int32 peer_pid)
|
||||
{
|
||||
AssertWorkerThread();
|
||||
if (mListener)
|
||||
mListener->OnChannelConnected(peer_pid);
|
||||
}
|
||||
|
||||
//
|
||||
// The methods below run in the context of the IO thread
|
||||
//
|
||||
|
||||
void
|
||||
AsyncChannel::OnMessageReceived(const Message& msg)
|
||||
AsyncChannel::ProcessLink::OnMessageReceived(const Message& msg)
|
||||
{
|
||||
AssertIOThread();
|
||||
NS_ASSERTION(mChannelState != ChannelError, "Shouldn't get here!");
|
||||
NS_ASSERTION(mChan->mChannelState != ChannelError, "Shouldn't get here!");
|
||||
MonitorAutoLock lock(*mChan->mMonitor);
|
||||
mChan->OnMessageReceivedFromLink(msg);
|
||||
}
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
void
|
||||
AsyncChannel::ProcessLink::OnEchoMessage(Message* msg)
|
||||
{
|
||||
AssertIOThread();
|
||||
OnMessageReceived(*msg);
|
||||
delete msg;
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::OnChannelOpened()
|
||||
{
|
||||
mChan->AssertLinkThread();
|
||||
{
|
||||
MonitorAutoLock lock(*mChan->mMonitor);
|
||||
mChan->mChannelState = ChannelOpening;
|
||||
}
|
||||
/*assert*/mTransport->Connect();
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::OnChannelConnected(int32 peer_pid)
|
||||
{
|
||||
AssertIOThread();
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(*mChan->mMonitor);
|
||||
mChan->mChannelState = ChannelConnected;
|
||||
mChan->mMonitor->Notify();
|
||||
}
|
||||
|
||||
if(mExistingListener)
|
||||
mExistingListener->OnChannelConnected(peer_pid);
|
||||
|
||||
mChan->mWorkerLoop->PostTask(
|
||||
FROM_HERE,
|
||||
NewRunnableMethod(mChan,
|
||||
&AsyncChannel::DispatchOnChannelConnected,
|
||||
peer_pid));
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::OnChannelError()
|
||||
{
|
||||
AssertIOThread();
|
||||
MonitorAutoLock lock(*mChan->mMonitor);
|
||||
mChan->OnChannelErrorFromLink();
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::ProcessLink::OnCloseChannel()
|
||||
{
|
||||
AssertIOThread();
|
||||
|
||||
mTransport->Close();
|
||||
|
||||
MonitorAutoLock lock(*mChan->mMonitor);
|
||||
mChan->mChannelState = ChannelClosed;
|
||||
mChan->mMonitor->Notify();
|
||||
}
|
||||
|
||||
//
|
||||
// The methods below run in the context of the link thread
|
||||
//
|
||||
|
||||
void
|
||||
AsyncChannel::OnMessageReceivedFromLink(const Message& msg)
|
||||
{
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (!MaybeInterceptSpecialIOMessage(msg))
|
||||
// wake up the worker, there's work to do
|
||||
|
@ -500,57 +635,10 @@ AsyncChannel::OnMessageReceived(const Message& msg)
|
|||
}
|
||||
|
||||
void
|
||||
AsyncChannel::OnEchoMessage(Message* msg)
|
||||
AsyncChannel::OnChannelErrorFromLink()
|
||||
{
|
||||
AssertIOThread();
|
||||
OnMessageReceived(*msg);
|
||||
delete msg;
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::OnChannelOpened()
|
||||
{
|
||||
AssertIOThread();
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
mChannelState = ChannelOpening;
|
||||
}
|
||||
/*assert*/mTransport->Connect();
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::DispatchOnChannelConnected(int32 peer_pid)
|
||||
{
|
||||
AssertWorkerThread();
|
||||
if (mListener)
|
||||
mListener->OnChannelConnected(peer_pid);
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::OnChannelConnected(int32 peer_pid)
|
||||
{
|
||||
AssertIOThread();
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
mChannelState = ChannelConnected;
|
||||
mMonitor.Notify();
|
||||
}
|
||||
|
||||
if(mExistingListener)
|
||||
mExistingListener->OnChannelConnected(peer_pid);
|
||||
|
||||
mWorkerLoop->PostTask(FROM_HERE, NewRunnableMethod(this,
|
||||
&AsyncChannel::DispatchOnChannelConnected,
|
||||
peer_pid));
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::OnChannelError()
|
||||
{
|
||||
AssertIOThread();
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (ChannelClosing != mChannelState)
|
||||
mChannelState = ChannelError;
|
||||
|
@ -561,8 +649,8 @@ AsyncChannel::OnChannelError()
|
|||
void
|
||||
AsyncChannel::PostErrorNotifyTask()
|
||||
{
|
||||
AssertIOThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
NS_ASSERTION(!mChannelErrorTask, "OnChannelError called twice?");
|
||||
|
||||
|
@ -572,23 +660,11 @@ AsyncChannel::PostErrorNotifyTask()
|
|||
mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
|
||||
}
|
||||
|
||||
void
|
||||
AsyncChannel::OnCloseChannel()
|
||||
{
|
||||
AssertIOThread();
|
||||
|
||||
mTransport->Close();
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
mChannelState = ChannelClosed;
|
||||
mMonitor.Notify();
|
||||
}
|
||||
|
||||
bool
|
||||
AsyncChannel::MaybeInterceptSpecialIOMessage(const Message& msg)
|
||||
{
|
||||
AssertIOThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (MSG_ROUTING_NONE == msg.routing_id()
|
||||
&& GOODBYE_MESSAGE_TYPE == msg.type()) {
|
||||
|
@ -601,8 +677,8 @@ AsyncChannel::MaybeInterceptSpecialIOMessage(const Message& msg)
|
|||
void
|
||||
AsyncChannel::ProcessGoodbyeMessage()
|
||||
{
|
||||
AssertIOThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
// TODO sort out Close() on this side racing with Close() on the
|
||||
// other side
|
||||
|
|
|
@ -65,7 +65,18 @@ struct HasResultCodes
|
|||
};
|
||||
};
|
||||
|
||||
class AsyncChannel : public Transport::Listener, protected HasResultCodes
|
||||
|
||||
class RefCountedMonitor : public Monitor
|
||||
{
|
||||
public:
|
||||
RefCountedMonitor()
|
||||
: Monitor("mozilla.ipc.AsyncChannel.mMonitor")
|
||||
{}
|
||||
|
||||
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedMonitor)
|
||||
};
|
||||
|
||||
class AsyncChannel : protected HasResultCodes
|
||||
{
|
||||
protected:
|
||||
typedef mozilla::Monitor Monitor;
|
||||
|
@ -125,15 +136,74 @@ public:
|
|||
void DispatchOnChannelConnected(int32 peer_pid);
|
||||
|
||||
//
|
||||
// These methods are called on the "IO" thread
|
||||
// Each AsyncChannel is associated with either a ProcessLink or a
|
||||
// ThreadLink via the field mLink. The type of link is determined
|
||||
// by whether this AsyncChannel is communicating with another
|
||||
// process or another thread. In the former case, file
|
||||
// descriptors or a socket are used via the I/O queue. In the
|
||||
// latter case, messages are enqueued directly onto the target
|
||||
// thread's work queue.
|
||||
//
|
||||
|
||||
// Implement the Transport::Listener interface
|
||||
NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
|
||||
NS_OVERRIDE virtual void OnChannelConnected(int32 peer_pid);
|
||||
NS_OVERRIDE virtual void OnChannelError();
|
||||
class Link {
|
||||
protected:
|
||||
AsyncChannel *mChan;
|
||||
|
||||
public:
|
||||
Link(AsyncChannel *aChan);
|
||||
virtual ~Link();
|
||||
|
||||
// n.b.: These methods all require that the channel monitor is
|
||||
// held when they are invoked.
|
||||
virtual void EchoMessage(Message *msg) = 0;
|
||||
virtual void SendMessage(Message *msg) = 0;
|
||||
virtual void SendClose() = 0;
|
||||
};
|
||||
|
||||
class ProcessLink : public Link, public Transport::Listener {
|
||||
protected:
|
||||
Transport* mTransport;
|
||||
MessageLoop* mIOLoop; // thread where IO happens
|
||||
Transport::Listener* mExistingListener; // channel's previous listener
|
||||
|
||||
void OnCloseChannel();
|
||||
void OnChannelOpened();
|
||||
void OnEchoMessage(Message* msg);
|
||||
|
||||
void AssertIOThread() const
|
||||
{
|
||||
NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
|
||||
"not on I/O thread!");
|
||||
}
|
||||
|
||||
public:
|
||||
ProcessLink(AsyncChannel *chan);
|
||||
virtual ~ProcessLink();
|
||||
void Open(Transport* aTransport, MessageLoop *aIOLoop, Side aSide);
|
||||
|
||||
// Run on the I/O thread, only when using inter-process link.
|
||||
// These methods acquire the monitor and forward to the
|
||||
// similarly named methods in AsyncChannel below
|
||||
// (OnMessageReceivedFromLink(), etc)
|
||||
NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
|
||||
NS_OVERRIDE virtual void OnChannelConnected(int32 peer_pid);
|
||||
NS_OVERRIDE virtual void OnChannelError();
|
||||
|
||||
virtual void EchoMessage(Message *msg);
|
||||
virtual void SendMessage(Message *msg);
|
||||
virtual void SendClose();
|
||||
};
|
||||
|
||||
protected:
|
||||
// The "link" thread is either the I/O thread (ProcessLink) or the
|
||||
// other actor's work thread (ThreadLink). In either case, it is
|
||||
// NOT our worker thread.
|
||||
void AssertLinkThread() const
|
||||
{
|
||||
NS_ABORT_IF_FALSE(mWorkerLoop != MessageLoop::current(),
|
||||
"on worker thread but should not be!");
|
||||
}
|
||||
|
||||
// Can be run on either thread
|
||||
void AssertWorkerThread() const
|
||||
{
|
||||
|
@ -141,17 +211,27 @@ protected:
|
|||
"not on worker thread!");
|
||||
}
|
||||
|
||||
void AssertIOThread() const
|
||||
{
|
||||
NS_ABORT_IF_FALSE(mIOLoop == MessageLoop::current(),
|
||||
"not on IO thread!");
|
||||
}
|
||||
|
||||
bool Connected() const {
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
return ChannelConnected == mChannelState;
|
||||
}
|
||||
|
||||
// Return true if |msg| is a special message targeted at the IO
|
||||
// thread, in which case it shouldn't be delivered to the worker.
|
||||
virtual bool MaybeInterceptSpecialIOMessage(const Message& msg);
|
||||
void ProcessGoodbyeMessage();
|
||||
|
||||
// Runs on the link thread. Invoked either from the I/O thread methods above
|
||||
// or directly from the other actor if using a thread-based link.
|
||||
//
|
||||
// n.b.: mMonitor is always held when these methods are invoked.
|
||||
// In the case of a ProcessLink, it is acquired by the ProcessLink.
|
||||
// In the case of a ThreadLink, it is acquired by the other actor,
|
||||
// which then invokes these methods directly.
|
||||
virtual void OnMessageReceivedFromLink(const Message& msg);
|
||||
virtual void OnChannelErrorFromLink();
|
||||
void PostErrorNotifyTask();
|
||||
|
||||
// Run on the worker thread
|
||||
void OnDispatchMessage(const Message& aMsg);
|
||||
virtual bool OnSpecialMessage(uint16 id, const Message& msg);
|
||||
|
@ -165,8 +245,6 @@ protected:
|
|||
|
||||
// Run on the worker thread
|
||||
|
||||
void SendThroughTransport(Message* msg) const;
|
||||
|
||||
void OnNotifyMaybeChannelError();
|
||||
virtual bool ShouldDeferNotifyMaybeError() const {
|
||||
return false;
|
||||
|
@ -176,30 +254,15 @@ protected:
|
|||
|
||||
virtual void Clear();
|
||||
|
||||
// Run on the IO thread
|
||||
|
||||
void OnChannelOpened();
|
||||
void OnCloseChannel();
|
||||
void PostErrorNotifyTask();
|
||||
void OnEchoMessage(Message* msg);
|
||||
|
||||
// Return true if |msg| is a special message targeted at the IO
|
||||
// thread, in which case it shouldn't be delivered to the worker.
|
||||
bool MaybeInterceptSpecialIOMessage(const Message& msg);
|
||||
void ProcessGoodbyeMessage();
|
||||
|
||||
Transport* mTransport;
|
||||
AsyncListener* mListener;
|
||||
ChannelState mChannelState;
|
||||
Monitor mMonitor;
|
||||
MessageLoop* mIOLoop; // thread where IO happens
|
||||
nsRefPtr<RefCountedMonitor> mMonitor;
|
||||
MessageLoop* mWorkerLoop; // thread where work is done
|
||||
bool mChild; // am I the child or parent?
|
||||
CancelableTask* mChannelErrorTask; // NotifyMaybeChannelError runnable
|
||||
Transport::Listener* mExistingListener; // channel's previous listener
|
||||
Link *mLink; // link to other thread/process
|
||||
};
|
||||
|
||||
|
||||
} // namespace ipc
|
||||
} // namespace mozilla
|
||||
#endif // ifndef ipc_glue_AsyncChannel_h
|
||||
|
|
|
@ -124,7 +124,7 @@ bool
|
|||
RPCChannel::EventOccurred() const
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
RPC_ASSERT(StackDepth() > 0, "not in wait loop");
|
||||
|
||||
return (!Connected() ||
|
||||
|
@ -155,7 +155,7 @@ RPCChannel::Call(Message* _msg, Message* reply)
|
|||
{
|
||||
nsAutoPtr<Message> msg(_msg);
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
RPC_ASSERT(!ProcessingSyncMessage(),
|
||||
"violation of sync handler invariant");
|
||||
RPC_ASSERT(msg->is_rpc(), "can only Call() RPC messages here");
|
||||
|
@ -167,7 +167,7 @@ RPCChannel::Call(Message* _msg, Message* reply)
|
|||
Message copy = *msg;
|
||||
CxxStackFrame f(*this, OUT_MESSAGE, ©);
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
if (!Connected()) {
|
||||
ReportConnectionError("RPCChannel");
|
||||
|
@ -179,7 +179,7 @@ RPCChannel::Call(Message* _msg, Message* reply)
|
|||
msg->set_rpc_local_stack_depth(1 + StackDepth());
|
||||
mStack.push(*msg);
|
||||
|
||||
SendThroughTransport(msg.forget());
|
||||
mLink->SendMessage(msg.forget());
|
||||
|
||||
while (1) {
|
||||
// if a handler invoked by *Dispatch*() spun a nested event
|
||||
|
@ -239,7 +239,7 @@ RPCChannel::Call(Message* _msg, Message* reply)
|
|||
}
|
||||
|
||||
if (!recvd.is_sync() && !recvd.is_rpc()) {
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*mMonitor);
|
||||
|
||||
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
||||
AsyncChannel::OnDispatchMessage(recvd);
|
||||
|
@ -250,7 +250,7 @@ RPCChannel::Call(Message* _msg, Message* reply)
|
|||
if (recvd.is_sync()) {
|
||||
RPC_ASSERT(mPending.empty(),
|
||||
"other side should have been blocked");
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*mMonitor);
|
||||
|
||||
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
||||
SyncChannel::OnDispatchMessage(recvd);
|
||||
|
@ -305,7 +305,7 @@ RPCChannel::Call(Message* _msg, Message* reply)
|
|||
// "snapshot" the current stack depth while we own the Monitor
|
||||
size_t stackDepth = StackDepth();
|
||||
{
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*mMonitor);
|
||||
// someone called in to us from the other side. handle the call
|
||||
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
||||
Incall(recvd, stackDepth);
|
||||
|
@ -320,7 +320,7 @@ void
|
|||
RPCChannel::MaybeUndeferIncall()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (mDeferred.empty())
|
||||
return;
|
||||
|
@ -349,7 +349,7 @@ void
|
|||
RPCChannel::EnqueuePendingMessages()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
MaybeUndeferIncall();
|
||||
|
||||
|
@ -371,10 +371,10 @@ void
|
|||
RPCChannel::FlushPendingRPCQueue()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
if (mDeferred.empty()) {
|
||||
if (mPending.empty())
|
||||
|
@ -396,11 +396,11 @@ RPCChannel::OnMaybeDequeueOne()
|
|||
// messages here
|
||||
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
|
||||
Message recvd;
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
if (!Connected()) {
|
||||
ReportConnectionError("RPCChannel");
|
||||
|
@ -447,7 +447,7 @@ void
|
|||
RPCChannel::Incall(const Message& call, size_t stackDepth)
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
RPC_ASSERT(call.is_rpc() && !call.is_reply(), "wrong message type");
|
||||
|
||||
// Race detection: see the long comment near
|
||||
|
@ -507,7 +507,7 @@ void
|
|||
RPCChannel::DispatchIncall(const Message& call)
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
RPC_ASSERT(call.is_rpc() && !call.is_reply(),
|
||||
"wrong message type");
|
||||
|
||||
|
@ -528,9 +528,9 @@ RPCChannel::DispatchIncall(const Message& call)
|
|||
reply->set_seqno(call.seqno());
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
if (ChannelConnected == mChannelState)
|
||||
SendThroughTransport(reply);
|
||||
mLink->SendMessage(reply);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -541,6 +541,8 @@ RPCChannel::BlockChild()
|
|||
|
||||
if (mChild)
|
||||
NS_RUNTIMEABORT("child tried to block parent");
|
||||
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
SendSpecialMessage(new BlockChildMessage());
|
||||
return true;
|
||||
}
|
||||
|
@ -552,6 +554,8 @@ RPCChannel::UnblockChild()
|
|||
|
||||
if (mChild)
|
||||
NS_RUNTIMEABORT("child tried to unblock parent");
|
||||
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
SendSpecialMessage(new UnblockChildMessage());
|
||||
return true;
|
||||
}
|
||||
|
@ -583,7 +587,7 @@ RPCChannel::BlockOnParent()
|
|||
if (!mChild)
|
||||
NS_RUNTIMEABORT("child tried to block parent");
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
if (mBlockedOnParent || AwaitingSyncReply() || 0 < StackDepth())
|
||||
NS_RUNTIMEABORT("attempt to block child when it's already blocked");
|
||||
|
@ -607,7 +611,7 @@ RPCChannel::BlockOnParent()
|
|||
Message recvd = mPending.front();
|
||||
mPending.pop();
|
||||
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*mMonitor);
|
||||
|
||||
CxxStackFrame f(*this, IN_MESSAGE, &recvd);
|
||||
if (recvd.is_rpc()) {
|
||||
|
@ -633,7 +637,7 @@ RPCChannel::UnblockFromParent()
|
|||
|
||||
if (!mChild)
|
||||
NS_RUNTIMEABORT("child tried to block parent");
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
mBlockedOnParent = false;
|
||||
}
|
||||
|
||||
|
@ -642,7 +646,7 @@ RPCChannel::ExitedCxxStack()
|
|||
{
|
||||
Listener()->OnExitedCxxStack();
|
||||
if (mSawRPCOutMsg) {
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
// see long comment in OnMaybeDequeueOne()
|
||||
EnqueuePendingMessages();
|
||||
mSawRPCOutMsg = false;
|
||||
|
@ -707,15 +711,15 @@ RPCChannel::DumpRPCStack(FILE* outfile, const char* const pfx) const
|
|||
}
|
||||
|
||||
//
|
||||
// The methods below run in the context of the IO thread, and can proxy
|
||||
// The methods below run in the context of the link thread, and can proxy
|
||||
// back to the methods above
|
||||
//
|
||||
|
||||
void
|
||||
RPCChannel::OnMessageReceived(const Message& msg)
|
||||
RPCChannel::OnMessageReceivedFromLink(const Message& msg)
|
||||
{
|
||||
AssertIOThread();
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (MaybeInterceptSpecialIOMessage(msg))
|
||||
return;
|
||||
|
@ -733,7 +737,8 @@ RPCChannel::OnMessageReceived(const Message& msg)
|
|||
|
||||
if (0 == StackDepth() && !mBlockedOnParent) {
|
||||
// the worker thread might be idle, make sure it wakes up
|
||||
mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
|
||||
mWorkerLoop->PostTask(FROM_HERE,
|
||||
new DequeueTask(mDequeueOneTask));
|
||||
}
|
||||
else if (!AwaitingSyncReply())
|
||||
NotifyWorkerThread();
|
||||
|
@ -741,20 +746,15 @@ RPCChannel::OnMessageReceived(const Message& msg)
|
|||
|
||||
|
||||
void
|
||||
RPCChannel::OnChannelError()
|
||||
RPCChannel::OnChannelErrorFromLink()
|
||||
{
|
||||
AssertIOThread();
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
|
||||
if (ChannelClosing != mChannelState)
|
||||
mChannelState = ChannelError;
|
||||
|
||||
// skip SyncChannel::OnError(); we subsume its duties
|
||||
if (AwaitingSyncReply() || 0 < StackDepth())
|
||||
if (0 < StackDepth())
|
||||
NotifyWorkerThread();
|
||||
|
||||
PostErrorNotifyTask();
|
||||
SyncChannel::OnChannelErrorFromLink();
|
||||
}
|
||||
|
||||
} // namespace ipc
|
||||
|
|
|
@ -162,12 +162,6 @@ public:
|
|||
NS_OVERRIDE
|
||||
virtual bool OnSpecialMessage(uint16 id, const Message& msg);
|
||||
|
||||
// Override the SyncChannel handler so we can dispatch RPC
|
||||
// messages. Called on the IO thread only.
|
||||
NS_OVERRIDE
|
||||
virtual void OnMessageReceived(const Message& msg);
|
||||
NS_OVERRIDE
|
||||
virtual void OnChannelError();
|
||||
|
||||
/**
|
||||
* If there is a pending RPC message, process all pending messages.
|
||||
|
@ -186,7 +180,11 @@ protected:
|
|||
void SpinInternalEventLoop();
|
||||
#endif
|
||||
|
||||
private:
|
||||
protected:
|
||||
NS_OVERRIDE virtual void OnMessageReceivedFromLink(const Message& msg);
|
||||
NS_OVERRIDE virtual void OnChannelErrorFromLink();
|
||||
|
||||
private:
|
||||
// Called on worker thread only
|
||||
|
||||
RPCListener* Listener() const {
|
||||
|
@ -325,7 +323,7 @@ protected:
|
|||
|
||||
// Called from both threads
|
||||
size_t StackDepth() const {
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
return mStack.size();
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ bool
|
|||
SyncChannel::EventOccurred()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
NS_ABORT_IF_FALSE(AwaitingSyncReply(), "not in wait loop");
|
||||
|
||||
return (!Connected() || 0 != mRecvd.type() || mRecvd.is_reply_error());
|
||||
|
@ -100,7 +100,7 @@ SyncChannel::Send(Message* _msg, Message* reply)
|
|||
nsAutoPtr<Message> msg(_msg);
|
||||
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertNotCurrentThreadOwns();
|
||||
mMonitor->AssertNotCurrentThreadOwns();
|
||||
NS_ABORT_IF_FALSE(!ProcessingSyncMessage(),
|
||||
"violation of sync handler invariant");
|
||||
NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
|
||||
|
@ -111,7 +111,7 @@ SyncChannel::Send(Message* _msg, Message* reply)
|
|||
|
||||
msg->set_seqno(NextSeqno());
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
|
||||
if (!Connected()) {
|
||||
ReportConnectionError("SyncChannel");
|
||||
|
@ -120,7 +120,7 @@ SyncChannel::Send(Message* _msg, Message* reply)
|
|||
|
||||
mPendingReply = msg->type() + 1;
|
||||
int32 msgSeqno = msg->seqno();
|
||||
SendThroughTransport(msg.forget());
|
||||
mLink->SendMessage(msg.forget());
|
||||
|
||||
while (1) {
|
||||
bool maybeTimedOut = !SyncChannel::WaitForNotify();
|
||||
|
@ -186,26 +186,27 @@ SyncChannel::OnDispatchMessage(const Message& msg)
|
|||
reply->set_seqno(msg.seqno());
|
||||
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
if (ChannelConnected == mChannelState)
|
||||
SendThroughTransport(reply);
|
||||
mLink->SendMessage(reply);
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// The methods below run in the context of the IO thread, and can proxy
|
||||
// The methods below run in the context of the link thread, and can proxy
|
||||
// back to the methods above
|
||||
//
|
||||
|
||||
void
|
||||
SyncChannel::OnMessageReceived(const Message& msg)
|
||||
SyncChannel::OnMessageReceivedFromLink(const Message& msg)
|
||||
{
|
||||
AssertIOThread();
|
||||
if (!msg.is_sync()) {
|
||||
return AsyncChannel::OnMessageReceived(msg);
|
||||
}
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
if (!msg.is_sync()) {
|
||||
AsyncChannel::OnMessageReceivedFromLink(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (MaybeInterceptSpecialIOMessage(msg))
|
||||
return;
|
||||
|
@ -224,19 +225,15 @@ SyncChannel::OnMessageReceived(const Message& msg)
|
|||
}
|
||||
|
||||
void
|
||||
SyncChannel::OnChannelError()
|
||||
SyncChannel::OnChannelErrorFromLink()
|
||||
{
|
||||
AssertIOThread();
|
||||
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
|
||||
if (ChannelClosing != mChannelState)
|
||||
mChannelState = ChannelError;
|
||||
AssertLinkThread();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (AwaitingSyncReply())
|
||||
NotifyWorkerThread();
|
||||
|
||||
PostErrorNotifyTask();
|
||||
AsyncChannel::OnChannelErrorFromLink();
|
||||
}
|
||||
|
||||
//
|
||||
|
@ -258,11 +255,11 @@ bool
|
|||
SyncChannel::ShouldContinueFromTimeout()
|
||||
{
|
||||
AssertWorkerThread();
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
bool cont;
|
||||
{
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*mMonitor);
|
||||
cont = static_cast<SyncListener*>(mListener)->OnReplyTimeout();
|
||||
}
|
||||
|
||||
|
@ -300,7 +297,7 @@ SyncChannel::WaitForNotify()
|
|||
// XXX could optimize away this syscall for "no timeout" case if desired
|
||||
PRIntervalTime waitStart = PR_IntervalNow();
|
||||
|
||||
mMonitor.Wait(timeout);
|
||||
mMonitor->Wait(timeout);
|
||||
|
||||
// if the timeout didn't expire, we know we received an event.
|
||||
// The converse is not true.
|
||||
|
@ -310,7 +307,7 @@ SyncChannel::WaitForNotify()
|
|||
void
|
||||
SyncChannel::NotifyWorkerThread()
|
||||
{
|
||||
mMonitor.Notify();
|
||||
mMonitor->Notify();
|
||||
}
|
||||
|
||||
#endif // ifndef OS_WIN
|
||||
|
|
|
@ -86,10 +86,6 @@ public:
|
|||
mTimeoutMs = (aTimeoutMs <= 0) ? kNoTimeout : aTimeoutMs;
|
||||
}
|
||||
|
||||
// Override the AsyncChannel handler so we can dispatch sync messages
|
||||
NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
|
||||
NS_OVERRIDE virtual void OnChannelError();
|
||||
|
||||
static bool IsPumpingMessages() {
|
||||
return sIsPumpingMessages;
|
||||
}
|
||||
|
@ -98,6 +94,7 @@ public:
|
|||
}
|
||||
|
||||
#ifdef OS_WIN
|
||||
public:
|
||||
struct NS_STACK_CLASS SyncStackFrame
|
||||
{
|
||||
SyncStackFrame(SyncChannel* channel, bool rpc);
|
||||
|
@ -135,6 +132,11 @@ protected:
|
|||
#endif // OS_WIN
|
||||
|
||||
protected:
|
||||
// Executed on the link thread
|
||||
// Override the AsyncChannel handler so we can dispatch sync messages
|
||||
NS_OVERRIDE virtual void OnMessageReceivedFromLink(const Message& msg);
|
||||
NS_OVERRIDE virtual void OnChannelErrorFromLink();
|
||||
|
||||
// Executed on the worker thread
|
||||
bool ProcessingSyncMessage() const {
|
||||
return mProcessingSyncMessage;
|
||||
|
@ -142,13 +144,6 @@ protected:
|
|||
|
||||
void OnDispatchMessage(const Message& aMsg);
|
||||
|
||||
NS_OVERRIDE
|
||||
bool OnSpecialMessage(uint16 id, const Message& msg)
|
||||
{
|
||||
// SyncChannel doesn't care about any special messages yet
|
||||
return AsyncChannel::OnSpecialMessage(id, msg);
|
||||
}
|
||||
|
||||
//
|
||||
// Return true if the wait ended because a notification was
|
||||
// received. That is, true => event received.
|
||||
|
@ -172,7 +167,7 @@ protected:
|
|||
|
||||
// On both
|
||||
bool AwaitingSyncReply() const {
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
return mPendingReply != 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -694,7 +694,7 @@ RPCChannel::SpinInternalEventLoop()
|
|||
|
||||
// Don't get wrapped up in here if the child connection dies.
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
if (!Connected()) {
|
||||
return;
|
||||
}
|
||||
|
@ -731,7 +731,7 @@ RPCChannel::SpinInternalEventLoop()
|
|||
bool
|
||||
SyncChannel::WaitForNotify()
|
||||
{
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
// Initialize global objects used in deferred messaging.
|
||||
Init();
|
||||
|
@ -739,7 +739,7 @@ SyncChannel::WaitForNotify()
|
|||
NS_ASSERTION(mTopFrame && !mTopFrame->mRPC,
|
||||
"Top frame is not a sync frame!");
|
||||
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*mMonitor);
|
||||
|
||||
bool retval = true;
|
||||
|
||||
|
@ -769,7 +769,7 @@ SyncChannel::WaitForNotify()
|
|||
MSG msg = { 0 };
|
||||
// Don't get wrapped up in here if the child connection dies.
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
if (!Connected()) {
|
||||
break;
|
||||
}
|
||||
|
@ -853,7 +853,7 @@ SyncChannel::WaitForNotify()
|
|||
bool
|
||||
RPCChannel::WaitForNotify()
|
||||
{
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
|
||||
if (!StackDepth() && !mBlockedOnParent) {
|
||||
// There is currently no way to recover from this condition.
|
||||
|
@ -866,7 +866,7 @@ RPCChannel::WaitForNotify()
|
|||
NS_ASSERTION(mTopFrame && mTopFrame->mRPC,
|
||||
"Top frame is not a sync frame!");
|
||||
|
||||
MonitorAutoUnlock unlock(mMonitor);
|
||||
MonitorAutoUnlock unlock(*mMonitor);
|
||||
|
||||
bool retval = true;
|
||||
|
||||
|
@ -929,7 +929,7 @@ RPCChannel::WaitForNotify()
|
|||
|
||||
// Don't get wrapped up in here if the child connection dies.
|
||||
{
|
||||
MonitorAutoLock lock(mMonitor);
|
||||
MonitorAutoLock lock(*mMonitor);
|
||||
if (!Connected()) {
|
||||
break;
|
||||
}
|
||||
|
@ -993,7 +993,7 @@ RPCChannel::WaitForNotify()
|
|||
void
|
||||
SyncChannel::NotifyWorkerThread()
|
||||
{
|
||||
mMonitor.AssertCurrentThreadOwns();
|
||||
mMonitor->AssertCurrentThreadOwns();
|
||||
NS_ASSERTION(mEvent, "No signal event to set, this is really bad!");
|
||||
if (!SetEvent(mEvent)) {
|
||||
NS_WARNING("Failed to set NotifyWorkerThread event!");
|
||||
|
|
Загрузка…
Ссылка в новой задаче