first cut at AsyncChannel and SyncChannel. only RPCChannel is currently under warranty.

This commit is contained in:
Chris Jones 2009-07-13 16:55:04 -05:00
Родитель 2e18128825
Коммит dc50039a28
9 изменённых файлов: 728 добавлений и 188 удалений

Просмотреть файл

@ -18,6 +18,10 @@
#include "base/ref_counted.h"
#endif
#if defined(CHROMIUM_MOZILLA_BUILD)
#define IPC_MESSAGE_ENABLE_RPC
#endif
namespace base {
class FileDescriptor;
}
@ -77,6 +81,13 @@ class Message : public Pickle {
return (header()->flags & SYNC_BIT) != 0;
}
#if defined(IPC_MESSAGE_ENABLE_RPC)
// True if this is a synchronous message.
bool is_rpc() const {
return (header()->flags & RPC_BIT) != 0;
}
#endif
// Set this on a reply to a synchronous message.
void set_reply() {
header()->flags |= REPLY_BIT;
@ -196,7 +207,9 @@ class Message : public Pickle {
bool dont_log() const { return dont_log_; }
#endif
#if !defined(CHROMIUM_MOZILLA_BUILD)
protected:
#endif
friend class Channel;
friend class MessageReplyDeserializer;
friend class SyncMessage;
@ -205,6 +218,16 @@ class Message : public Pickle {
header()->flags |= SYNC_BIT;
}
#if defined(IPC_MESSAGE_ENABLE_RPC)
void set_rpc() {
header()->flags |= RPC_BIT;
}
#endif
#if defined(CHROMIUM_MOZILLA_BUILD)
protected:
#endif
// flags
enum {
PRIORITY_MASK = 0x0003,
@ -214,6 +237,9 @@ class Message : public Pickle {
UNBLOCK_BIT = 0x0020,
PUMPING_MSGS_BIT= 0x0040,
HAS_SENT_TIME_BIT = 0x0080,
#if defined(IPC_MESSAGE_ENABLE_RPC)
RPC_BIT = 0x0100,
#endif
};
#pragma pack(push, 2)

175
ipc/glue/AsyncChannel.cpp Normal file
Просмотреть файл

@ -0,0 +1,175 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Mozilla Plugin App.
*
* The Initial Developer of the Original Code is
* Chris Jones <jones.chris.g@gmail.com>
* Portions created by the Initial Developer are Copyright (C) 2009
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#include "mozilla/ipc/AsyncChannel.h"
#include "mozilla/ipc/GeckoThread.h"
#include "nsDebug.h"
template<>
struct RunnableMethodTraits<mozilla::ipc::AsyncChannel>
{
static void RetainCallee(mozilla::ipc::AsyncChannel* obj) { }
static void ReleaseCallee(mozilla::ipc::AsyncChannel* obj) { }
};
namespace mozilla {
namespace ipc {
bool
AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop)
{
NS_PRECONDITION(!mTransport, "Open() called > once");
NS_PRECONDITION(aTransport, "need transport layer");
// FIXME need to check for valid channel
mTransport = aTransport;
mTransport->set_listener(this);
// FIXME do away with this
bool needOpen = true;
if(!aIOLoop) {
needOpen = false;
aIOLoop = BrowserProcessSubThread
::GetMessageLoop(BrowserProcessSubThread::IO);
}
mIOLoop = aIOLoop;
mWorkerLoop = MessageLoop::current();
NS_ASSERTION(mIOLoop, "need an IO loop");
NS_ASSERTION(mWorkerLoop, "need a worker loop");
if (needOpen) {
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&AsyncChannel::OnChannelOpened));
}
return true;
}
void
AsyncChannel::Close()
{
// FIXME impl
mChannelState = ChannelClosed;
}
bool
AsyncChannel::Send(Message* msg)
{
NS_PRECONDITION(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this, &AsyncChannel::OnSend, msg));
return true;
}
void
AsyncChannel::OnDispatchMessage(const Message& msg)
{
NS_ASSERTION(!msg.is_reply(), "can't process replies here");
NS_ASSERTION(!(msg.is_sync() || msg.is_rpc()), "async dispatch only");
switch (mListener->OnMessageReceived(msg)) {
case Listener::MsgProcessed:
return;
case Listener::MsgNotKnown:
case Listener::MsgNotAllowed:
case Listener::MsgPayloadError:
case Listener::MsgRouteError:
case Listener::MsgValueError:
// FIXME/cjones: error handling; OnError()?
return;
default:
NOTREACHED();
return;
}
}
//
// The methods below run in the context of the IO thread, and can proxy
// back to the methods above
//
void
AsyncChannel::OnMessageReceived(const Message& msg)
{
// wake up the worker, there's work to do
mWorkerLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&AsyncChannel::OnDispatchMessage,
msg));
}
void
AsyncChannel::OnChannelConnected(int32 peer_pid)
{
mChannelState = ChannelIdle;
}
void
AsyncChannel::OnChannelError()
{
// FIXME/cjones impl
mChannelState = ChannelError;
}
void
AsyncChannel::OnChannelOpened()
{
mChannelState = ChannelOpening;
/*assert*/mTransport->Connect();
}
void
AsyncChannel::OnSend(Message* aMsg)
{
mTransport->Send(aMsg);
// mTransport deletes aMsg
}
} // namespace ipc
} // namespace mozilla

132
ipc/glue/AsyncChannel.h Normal file
Просмотреть файл

@ -0,0 +1,132 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Mozilla Plugin App.
*
* The Initial Developer of the Original Code is
* Chris Jones <jones.chris.g@gmail.com>
* Portions created by the Initial Developer are Copyright (C) 2009
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#ifndef ipc_glue_AsyncChannel_h
#define ipc_glue_AsyncChannel_h 1
#include "base/basictypes.h"
#include "base/message_loop.h"
#include "chrome/common/ipc_channel.h"
namespace mozilla {
namespace ipc {
//-----------------------------------------------------------------------------
class AsyncChannel : public IPC::Channel::Listener
{
protected:
enum ChannelState {
ChannelClosed,
ChannelOpening,
ChannelIdle, // => connected
ChannelWaiting, // => connected
ChannelError
};
public:
typedef IPC::Channel Transport;
typedef IPC::Message Message;
class /*NS_INTERFACE_CLASS*/ Listener
{
public:
enum Result {
MsgProcessed,
MsgNotKnown,
MsgNotAllowed,
MsgPayloadError,
MsgRouteError,
MsgValueError,
};
virtual ~Listener() { }
virtual Result OnMessageReceived(const Message& aMessage) = 0;
};
AsyncChannel(Listener* aListener) :
mTransport(0),
mListener(aListener),
mChannelState(ChannelClosed),
mIOLoop(),
mWorkerLoop()
{
}
virtual ~AsyncChannel()
{
if (mTransport)
Close();
mTransport = 0;
}
// Open from the perspective of the RPC layer; the transport
// should already be connected, or ready to connect.
bool Open(Transport* aTransport, MessageLoop* aIOLoop=0);
// Close from the perspective of the RPC layer; leaves the
// underlying transport channel open, however.
void Close();
// Asynchronously send a message to the other side of the channel
bool Send(Message* msg);
// Implement the IPC::Channel::Listener interface
virtual void OnMessageReceived(const Message& msg);
virtual void OnChannelConnected(int32 peer_pid);
virtual void OnChannelError();
protected:
// Additional methods that execute on the worker thread
void OnDispatchMessage(const Message& aMsg);
// Additional methods that execute on the IO thread
void OnChannelOpened();
void OnSend(Message* aMsg);
Transport* mTransport;
Listener* mListener;
ChannelState mChannelState;
MessageLoop* mIOLoop; // thread where IO happens
MessageLoop* mWorkerLoop; // thread where work is done
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_AsyncChannel_h

Просмотреть файл

@ -54,23 +54,27 @@ EXPORTS_IPC = \
$(NULL)
EXPORTS_mozilla/ipc = \
AsyncChannel.h \
GeckoChildProcessHost.h \
GeckoThread.h \
MessageTypes.h \
ProtocolUtils.h \
RPCChannel.h \
SyncChannel.h \
ScopedXREEmbed.h \
$(NULL)
ENABLE_CXX_EXCEPTIONS = 1
CPPSRCS += \
GeckoChildProcessHost.cpp \
GeckoThread.cpp \
MessagePump.cpp \
RPCChannel.cpp \
ScopedXREEmbed.cpp \
StringUtil.cpp \
CPPSRCS += \
AsyncChannel.cpp \
GeckoChildProcessHost.cpp \
GeckoThread.cpp \
MessagePump.cpp \
RPCChannel.cpp \
ScopedXREEmbed.cpp \
StringUtil.cpp \
SyncChannel.cpp \
$(NULL)
include $(topsrcdir)/ipc/app/defs.mk

Просмотреть файл

@ -1,6 +1,7 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
* ***** BEGIN LICENSE BLOCK *****
*/
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
@ -54,84 +55,62 @@ namespace mozilla {
namespace ipc {
bool
RPCChannel::Open(Transport* aTransport, MessageLoop* aIOLoop)
{
NS_PRECONDITION(!mTransport, "Open() called > once");
NS_PRECONDITION(aTransport, "need transport layer");
// FIXME need to check for valid channel
mTransport = aTransport;
mTransport->set_listener(this);
// FIXME do away with this
bool needOpen = true;
if(!aIOLoop) {
needOpen = false;
aIOLoop = BrowserProcessSubThread
::GetMessageLoop(BrowserProcessSubThread::IO);
}
mIOLoop = aIOLoop;
mWorkerLoop = MessageLoop::current();
NS_ASSERTION(mIOLoop, "need an IO loop");
NS_ASSERTION(mWorkerLoop, "need a worker loop");
if (needOpen) {
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnChannelOpened));
}
return true;
}
void
RPCChannel::Close()
{
// FIXME impl
mChannelState = ChannelClosed;
}
bool
RPCChannel::Call(Message* msg, Message* reply)
{
NS_PRECONDITION(MSG_ROUTING_NONE != msg->routing_id(), "need a route");
NS_PRECONDITION(msg->is_rpc(), "can only Call() RPC messages here");
mMutex.Lock();
mChannelState = ChannelWaiting;
mPending.push(*msg);
mIOLoop->PostTask(FROM_HERE, NewRunnableMethod(this,
&RPCChannel::SendCall,
msg));
AsyncChannel::Send(msg);
while (1) {
// here we're waiting for something to happen. it may either
// be a reply to an outstanding message, or a recursive call
// from the other side
// here we're waiting for something to happen. it may either:
// (1) a reply to an outstanding message
// (2) a recursive call from the other side
// or
// (3) any other message
mCvar.Wait();
Message recvd = mPending.top();
mPending.pop();
if (recvd.is_reply()) {
if (!recvd.is_rpc()) {
SyncChannel::OnDispatchMessage(recvd);
// FIXME/cjones: error handling
}
// RPC reply message
else if (recvd.is_reply()) {
NS_ASSERTION(0 < mPending.size(), "invalid RPC stack");
const Message& pending = mPending.top();
if (recvd.type() != (pending.type()+1)) {
// FIXME/cjones: handle error
NS_ASSERTION(0, "somebody's misbehavin'");
}
// we received a reply to our most recent message. pop this
// frame and return the reply
NS_ASSERTION(0 < mPending.size(), "invalid RPC stack");
mPending.pop();
*reply = recvd;
if (!WaitingForReply()) {
mChannelState = ChannelIdle;
}
mMutex.Unlock();
return true;
}
// RPC in-call
else {
mMutex.Unlock();
// someone called in to us from the other side. handle the call
if (!ProcessIncomingCall(recvd))
return false;
OnDispatchMessage(recvd);
// FIXME/cjones: error handling
mMutex.Lock();
}
@ -142,41 +121,36 @@ RPCChannel::Call(Message* msg, Message* reply)
return true;
}
bool
RPCChannel::ProcessIncomingCall(Message call)
void
RPCChannel::OnDispatchMessage(const Message& call)
{
Message* reply;
if (!call.is_rpc()) {
return SyncChannel::OnDispatchMessage(call);
}
switch (mListener->OnCallReceived(call, reply)) {
Message* reply;
switch (static_cast<Listener*>(mListener)->OnCallReceived(call, reply)) {
case Listener::MsgProcessed:
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::SendReply,
&RPCChannel::OnSendReply,
reply));
return true;
return;
case Listener::MsgNotKnown:
case Listener::MsgNotAllowed:
case Listener::MsgPayloadError:
case Listener::MsgRouteError:
case Listener::MsgValueError:
//OnError()?
return false;
// FIXME/cjones: error handling; OnError()?
return;
default:
NOTREACHED();
return false;
return;
}
}
void
RPCChannel::OnIncomingCall(Message msg)
{
NS_ASSERTION(0 == mPending.size(),
"woke up the worker thread when it had outstanding work!");
ProcessIncomingCall(msg);
}
//
// The methods below run in the context of the IO thread, and can proxy
// back to the methods above
@ -184,12 +158,14 @@ RPCChannel::OnIncomingCall(Message msg)
void
RPCChannel::OnMessageReceived(const Message& msg)
{MutexAutoLock lock(mMutex);
{
MutexAutoLock lock(mMutex);
if (0 == mPending.size()) {
// wake up the worker, there's work to do
mWorkerLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnIncomingCall,
&RPCChannel::OnDispatchMessage,
msg));
}
else {
@ -199,38 +175,6 @@ RPCChannel::OnMessageReceived(const Message& msg)
}
}
void
RPCChannel::OnChannelConnected(int32 peer_pid)
{
mChannelState = ChannelConnected;
}
void
RPCChannel::OnChannelError()
{
// FIXME/cjones impl
mChannelState = ChannelError;
}
void
RPCChannel::OnChannelOpened()
{
mChannelState = ChannelOpening;
/*assert*/mTransport->Connect();
}
void
RPCChannel::SendCall(Message* aCall)
{
mTransport->Send(aCall);
}
void
RPCChannel::SendReply(Message* aReply)
{
mTransport->Send(aReply);
}
} // namespace ipc
} // namespace mozilla

Просмотреть файл

@ -42,108 +42,51 @@
// FIXME/cjones probably shouldn't depend on this
#include <stack>
#include "base/basictypes.h"
#include "base/message_loop.h"
#include "chrome/common/ipc_channel.h"
#include "mozilla/CondVar.h"
#include "mozilla/Mutex.h"
#include "mozilla/ipc/SyncChannel.h"
namespace mozilla {
namespace ipc {
//-----------------------------------------------------------------------------
class RPCChannel : public IPC::Channel::Listener
class RPCChannel : public SyncChannel
{
private:
typedef mozilla::CondVar CondVar;
typedef mozilla::Mutex Mutex;
enum ChannelState {
ChannelClosed,
ChannelOpening,
ChannelConnected,
ChannelError
};
public:
typedef IPC::Channel Transport;
typedef IPC::Message Message;
class Listener
class Listener : public SyncChannel::Listener
{
public:
enum Result {
MsgProcessed,
MsgNotKnown,
MsgNotAllowed,
MsgPayloadError,
MsgRouteError,
MsgValueError,
};
virtual ~Listener() { }
virtual Result OnMessageReceived(const Message& aMessage) = 0;
virtual Result OnMessageReceived(const Message& aMessage,
Message*& aReply) = 0;
virtual Result OnCallReceived(const Message& aMessage,
Message*& aReply) = 0;
};
/**
* Convert the asynchronous channel |aChannel| into a channel with
* RPC semantics. Received messages are passed down to
* |aListener|.
*
* FIXME do away with |aMode|
*/
RPCChannel(Listener* aListener) :
mTransport(0),
mListener(aListener),
mChannelState(ChannelClosed),
mMutex("mozilla.ipc.RPCChannel.mMutex"),
mCvar(mMutex, "mozilla.ipc.RPCChannel.mCvar")
SyncChannel(aListener)
{
}
virtual ~RPCChannel()
{
if (mTransport)
Close();
mTransport = 0;
// FIXME/cjones: impl
}
// Open from the perspective of the RPC layer; the transport
// should already be connected, or ready to connect.
bool Open(Transport* aTransport, MessageLoop* aIOLoop=0);
// Close from the perspective of the RPC layer; leaves the
// underlying transport channel open, however.
void Close();
// Implement the IPC::Channel::Listener interface
virtual void OnMessageReceived(const Message& msg);
virtual void OnChannelConnected(int32 peer_pid);
virtual void OnChannelError();
// Make an RPC to the other side of the channel
virtual bool Call(Message* msg, Message* reply);
bool Call(Message* msg, Message* reply);
// Override the SyncChannel handler so we can dispatch RPC messages
virtual void OnMessageReceived(const Message& msg);
private:
// Task created when we're idle (wrt this channel), and the other
// side has made an RPC to us
void OnIncomingCall(Message msg);
// Process an RPC made from the other side to here
bool ProcessIncomingCall(Message msg);
// Executed on worker thread
virtual bool WaitingForReply() {
mMutex.AssertCurrentThreadOwns();
return mPending.size() > 0 || SyncChannel::WaitingForReply();
}
void OnChannelOpened();
void SendCall(Message* aCall);
void SendReply(Message* aReply);
void OnDispatchMessage(const Message& msg);
Transport* mTransport;
Listener* mListener;
ChannelState mChannelState;
MessageLoop* mIOLoop; // thread where IO happens
MessageLoop* mWorkerLoop; // thread where work is done
Mutex mMutex;
CondVar mCvar;
std::stack<Message> mPending;
};

174
ipc/glue/SyncChannel.cpp Normal file
Просмотреть файл

@ -0,0 +1,174 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Mozilla Plugin App.
*
* The Initial Developer of the Original Code is
* Chris Jones <jones.chris.g@gmail.com>
* Portions created by the Initial Developer are Copyright (C) 2009
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#include "mozilla/ipc/SyncChannel.h"
#include "mozilla/ipc/GeckoThread.h"
#include "nsDebug.h"
using mozilla::MutexAutoLock;
template<>
struct RunnableMethodTraits<mozilla::ipc::SyncChannel>
{
static void RetainCallee(mozilla::ipc::SyncChannel* obj) { }
static void ReleaseCallee(mozilla::ipc::SyncChannel* obj) { }
};
namespace mozilla {
namespace ipc {
bool
SyncChannel::Send(Message* msg, Message* reply)
{
NS_PRECONDITION(msg->is_sync(), "can only Send() sync messages here");
MutexAutoLock lock(mMutex);
mChannelState = ChannelWaiting;
mPendingReply = msg->type() + 1;
/*assert*/AsyncChannel::Send(msg);
while (1) {
// here we're waiting for something to happen. it may be either:
// (1) the reply we're waiting for (mPendingReply)
// or
// (2) any other message
//
// In case (1), we return this reply back to the caller.
// In case (2), we defer processing of the message until our reply
// comes back.
mCvar.Wait();
if (mRecvd.is_reply() && mPendingReply == mRecvd.type()) {
// case (1)
mPendingReply = 0;
*reply = mRecvd;
if (!WaitingForReply()) {
mChannelState = ChannelIdle;
}
return true;
}
else {
// case (2)
NS_ASSERTION(!mRecvd.is_reply(), "can't process replies here");
// post a task to our own event loop
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, mRecvd));
}
}
}
void
SyncChannel::OnDispatchMessage(const Message& msg)
{
NS_ASSERTION(!msg.is_reply(), "can't process replies here");
NS_ASSERTION(!msg.is_rpc(), "sync or async only here");
if (!msg.is_sync()) {
return AsyncChannel::OnDispatchMessage(msg);
}
Message* reply;
switch (static_cast<Listener*>(mListener)->OnMessageReceived(msg, reply)) {
case Listener::MsgProcessed:
mIOLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&SyncChannel::OnSendReply,
reply));
return;
case Listener::MsgNotKnown:
case Listener::MsgNotAllowed:
case Listener::MsgPayloadError:
case Listener::MsgRouteError:
case Listener::MsgValueError:
// FIXME/cjones: error handling; OnError()?
return;
default:
NOTREACHED();
return;
}
}
//
// The methods below run in the context of the IO thread, and can proxy
// back to the methods above
//
void
SyncChannel::OnMessageReceived(const Message& msg)
{
MutexAutoLock lock(mMutex);
if (ChannelIdle == mChannelState) {
// wake up the worker, there's work to do
if (msg.is_sync()) {
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnDispatchMessage, msg));
}
else {
return AsyncChannel::OnMessageReceived(msg);
}
}
else if (ChannelWaiting == mChannelState) {
// let the worker know something new has happened
mRecvd = msg;
mCvar.Notify();
}
else {
// FIXME/cjones: could reach here in error conditions. impl me
NOTREACHED();
}
}
void
SyncChannel::OnSendReply(Message* aReply)
{
mTransport->Send(aReply);
}
} // namespace ipc
} // namespace mozilla

116
ipc/glue/SyncChannel.h Normal file
Просмотреть файл

@ -0,0 +1,116 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: sw=4 ts=4 et :
*/
/* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is Mozilla Plugin App.
*
* The Initial Developer of the Original Code is
* Chris Jones <jones.chris.g@gmail.com>
* Portions created by the Initial Developer are Copyright (C) 2009
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* Alternatively, the contents of this file may be used under the terms of
* either the GNU General Public License Version 2 or later (the "GPL"), or
* the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
* in which case the provisions of the GPL or the LGPL are applicable instead
* of those above. If you wish to allow use of your version of this file only
* under the terms of either the GPL or the LGPL, and not to allow others to
* use your version of this file under the terms of the MPL, indicate your
* decision by deleting the provisions above and replace them with the notice
* and other provisions required by the GPL or the LGPL. If you do not delete
* the provisions above, a recipient may use your version of this file under
* the terms of any one of the MPL, the GPL or the LGPL.
*
* ***** END LICENSE BLOCK ***** */
#ifndef ipc_glue_SyncChannel_h
#define ipc_glue_SyncChannel_h 1
#include <queue>
#include "mozilla/CondVar.h"
#include "mozilla/Mutex.h"
#include "mozilla/ipc/AsyncChannel.h"
namespace mozilla {
namespace ipc {
//-----------------------------------------------------------------------------
class SyncChannel : public AsyncChannel
{
protected:
typedef mozilla::CondVar CondVar;
typedef mozilla::Mutex Mutex;
typedef uint16 MessageId;
typedef std::queue<Message> MessageQueue;
public:
class /*NS_INTERFACE_CLASS*/ Listener :
public AsyncChannel::Listener
{
public:
virtual ~Listener() { }
virtual Result OnMessageReceived(const Message& aMessage) = 0;
virtual Result OnMessageReceived(const Message& aMessage,
Message*& aReply) = 0;
};
SyncChannel(Listener* aListener) :
AsyncChannel(aListener),
mMutex("mozilla.ipc.SyncChannel.mMutex"),
mCvar(mMutex, "mozilla.ipc.SyncChannel.mCvar")
{
}
virtual ~SyncChannel()
{
// FIXME/cjones: impl
}
bool Send(Message* msg) {
return AsyncChannel::Send(msg);
}
// Synchronously send |msg| (i.e., wait for |reply|)
bool Send(Message* msg, Message* reply);
// Override the AsyncChannel handler so we can dispatch sync messages
virtual void OnMessageReceived(const Message& msg);
protected:
// Executed on the worker thread
virtual bool WaitingForReply() {
mMutex.AssertCurrentThreadOwns();
return mPendingReply != 0;
}
void OnDispatchMessage(const Message& aMsg);
// Executed on the IO thread.
void OnSendReply(Message* msg);
Mutex mMutex;
CondVar mCvar;
MessageId mPendingReply;
Message mRecvd;
};
} // namespace ipc
} // namespace mozilla
#endif // ifndef ipc_glue_SyncChannel_h

Просмотреть файл

@ -827,9 +827,24 @@ class GenerateProtocolActorHeader(Visitor):
cxx.ExprCall(cxx.ExprSelect(msgvar, '->', 'set_routing_id'),
[ route ])))
if md.decl.type.isAsync():
sendmethod = 'Send'
elif md.decl.type.isSync():
sendmethod = 'Send'
impl.addstmt(cxx.StmtExpr(
cxx.ExprCall(cxx.ExprSelect(msgvar, '->', 'set_sync'),
[ ])))
elif md.decl.type.isRpc():
sendmethod = 'Call'
impl.addstmt(cxx.StmtExpr(
cxx.ExprCall(cxx.ExprSelect(msgvar, '->', 'set_rpc'),
[ ])))
else:
assert 0
sendcall = cxx.ExprCall(
cxx.ExprSelect(
cxx.ExprVar('mChannel'), self.channelsel, 'Call'),
cxx.ExprVar('mChannel'), self.channelsel, sendmethod),
[ msgvar ])
if hasreply:
sendcall.args.append(cxx.ExprAddrOf(replyvar))
@ -1018,6 +1033,17 @@ class GenerateProtocolActorHeader(Visitor):
cxx.ExprSelect(replyvar, '->', 'set_reply'),
[ ])))
if md.decl.type.isSync():
block.addstmt(cxx.StmtExpr(cxx.ExprCall(
cxx.ExprSelect(replyvar, '->', 'set_sync'),
[ ])))
elif md.decl.type.isRpc():
block.addstmt(cxx.StmtExpr(cxx.ExprCall(
cxx.ExprSelect(replyvar, '->', 'set_rpc'),
[ ])))
else:
assert 0
block.addstmt(cxx.StmtReturn(cxx.ExprVar('MsgProcessed')))
if md.decl.type.isAsync():