Bug 893242 - Part 1: Add Unsound_IsClosed() and Unsound_NumQueuedMessages() to AsyncChannel. r=bent

This commit is contained in:
Justin Lebar 2013-07-17 14:31:10 -07:00
Родитель c56bcbc911
Коммит eacb45e46d
7 изменённых файлов: 172 добавлений и 10 удалений

Просмотреть файл

@ -100,6 +100,12 @@ class Channel : public Message::Sender {
// immediately.
virtual bool Send(Message* message);
// Unsound_IsClosed() and Unsound_NumQueuedMessages() are safe to call from
// any thread, but the value returned may be out of date, because we don't
// use any synchronization when reading or writing it.
bool Unsound_IsClosed() const;
uint32_t Unsound_NumQueuedMessages() const;
#if defined(OS_POSIX)
// On POSIX an IPC::Channel wraps a socketpair(), this method returns the
// FD # for the client end of the socket and the equivalent FD# to use for

Просмотреть файл

@ -293,6 +293,7 @@ void Channel::ChannelImpl::Init(Mode mode, Listener* listener) {
#if defined(OS_MACOSX)
last_pending_fd_id_ = 0;
#endif
output_queue_length_ = 0;
}
bool Channel::ChannelImpl::CreatePipe(const std::wstring& channel_id,
@ -364,7 +365,7 @@ bool Channel::ChannelImpl::EnqueueHelloMessage() {
return false;
}
output_queue_.push(msg.release());
OutputQueuePush(msg.release());
return true;
}
@ -574,7 +575,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
IPC::Message::PRIORITY_NORMAL);
DCHECK(m.fd_cookie() != 0);
fdAck->set_fd_cookie(m.fd_cookie());
output_queue_.push(fdAck);
OutputQueuePush(fdAck);
#endif
m.file_descriptor_set()->SetDescriptors(
@ -730,7 +731,7 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
DLOG(INFO) << "sent message @" << msg << " on channel @" << this <<
" with type " << msg->type();
#endif
output_queue_.pop();
OutputQueuePop();
delete msg;
}
}
@ -761,7 +762,7 @@ bool Channel::ChannelImpl::Send(Message* message) {
return false;
}
output_queue_.push(message);
OutputQueuePush(message);
if (!waiting_connect_) {
if (!is_blocked_on_write_) {
if (!ProcessOutgoingMessages())
@ -850,6 +851,18 @@ void Channel::ChannelImpl::CloseDescriptors(uint32_t pending_fd_id)
}
#endif
void Channel::ChannelImpl::OutputQueuePush(Message* msg)
{
output_queue_.push(msg);
output_queue_length_++;
}
void Channel::ChannelImpl::OutputQueuePop()
{
output_queue_.pop();
output_queue_length_--;
}
// Called by libevent when we can write to the pipe without blocking.
void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) {
if (!ProcessOutgoingMessages()) {
@ -859,7 +872,7 @@ void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) {
}
void Channel::ChannelImpl::Close() {
// Close can be called multiple time, so we need to make sure we're
// Close can be called multiple times, so we need to make sure we're
// idempotent.
// Unregister libevent for the listening socket and close it.
@ -890,7 +903,7 @@ void Channel::ChannelImpl::Close() {
while (!output_queue_.empty()) {
Message* m = output_queue_.front();
output_queue_.pop();
OutputQueuePop();
delete m;
}
@ -913,6 +926,16 @@ void Channel::ChannelImpl::Close() {
closed_ = true;
}
bool Channel::ChannelImpl::Unsound_IsClosed() const
{
return closed_;
}
uint32_t Channel::ChannelImpl::Unsound_NumQueuedMessages() const
{
return output_queue_length_;
}
//------------------------------------------------------------------------------
// Channel's methods simply call through to ChannelImpl.
Channel::Channel(const std::wstring& channel_id, Mode mode,
@ -956,4 +979,12 @@ void Channel::CloseClientFileDescriptor() {
channel_impl_->CloseClientFileDescriptor();
}
bool Channel::Unsound_IsClosed() const {
return channel_impl_->Unsound_IsClosed();
}
uint32_t Channel::Unsound_NumQueuedMessages() const {
return channel_impl_->Unsound_NumQueuedMessages();
}
} // namespace IPC

Просмотреть файл

@ -42,6 +42,11 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
}
void CloseClientFileDescriptor();
// See the comment in ipc_channel.h for info on Unsound_IsClosed() and
// Unsound_NumQueuedMessages().
bool Unsound_IsClosed() const;
uint32_t Unsound_NumQueuedMessages() const;
private:
void Init(Mode mode, Listener* listener);
bool CreatePipe(const std::wstring& channel_id, Mode mode);
@ -58,6 +63,9 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
void CloseDescriptors(uint32_t pending_fd_id);
#endif
void OutputQueuePush(Message* msg);
void OutputQueuePop();
Mode mode_;
// After accepting one client connection on our server socket we want to
@ -144,6 +152,12 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
uint32_t last_pending_fd_id_;
#endif
// This variable is updated so it matches output_queue_.size(), except we can
// read output_queue_length_ from any thread (if we're OK getting an
// occasional out-of-date or bogus value). We use output_queue_length_ to
// implement Unsound_NumQueuedMessages.
size_t output_queue_length_;
ScopedRunnableMethodFactory<ChannelImpl> factory_;
DISALLOW_COPY_AND_ASSIGN(ChannelImpl);

Просмотреть файл

@ -70,6 +70,19 @@ void Channel::ChannelImpl::Init(Mode mode, Listener* listener) {
waiting_connect_ = (mode == MODE_SERVER);
processing_incoming_ = false;
closed_ = false;
output_queue_length_ = 0;
}
void Channel::ChannelImpl::OutputQueuePush(Message* msg)
{
output_queue_.push(msg);
output_queue_length_++;
}
void Channel::ChannelImpl::OutputQueuePop()
{
output_queue_.pop();
output_queue_length_--;
}
HANDLE Channel::ChannelImpl::GetServerPipeHandle() const {
@ -100,7 +113,7 @@ void Channel::ChannelImpl::Close() {
while (!output_queue_.empty()) {
Message* m = output_queue_.front();
output_queue_.pop();
OutputQueuePop();
delete m;
}
@ -134,7 +147,7 @@ bool Channel::ChannelImpl::Send(Message* message) {
return false;
}
output_queue_.push(message);
OutputQueuePush(message);
// ensure waiting to write
if (!waiting_connect_) {
if (!output_state_.is_pending) {
@ -200,7 +213,7 @@ bool Channel::ChannelImpl::EnqueueHelloMessage() {
return false;
}
output_queue_.push(m.release());
OutputQueuePush(m.release());
return true;
}
@ -369,7 +382,7 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages(
// Message was sent.
DCHECK(!output_queue_.empty());
Message* m = output_queue_.front();
output_queue_.pop();
OutputQueuePop();
delete m;
}
@ -442,6 +455,16 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
}
}
bool Channel::ChannelImpl::Unsound_IsClosed() const
{
return closed_;
}
uint32_t Channel::ChannelImpl::Unsound_NumQueuedMessages() const
{
return output_queue_length_;
}
//------------------------------------------------------------------------------
// Channel's methods simply call through to ChannelImpl.
Channel::Channel(const std::wstring& channel_id, Mode mode,
@ -478,4 +501,12 @@ bool Channel::Send(Message* message) {
return channel_impl_->Send(message);
}
bool Channel::Unsound_IsClosed() const {
return channel_impl_->Unsound_IsClosed();
}
uint32_t Channel::Unsound_NumQueuedMessages() const {
return channel_impl_->Unsound_NumQueuedMessages();
}
} // namespace IPC

Просмотреть файл

@ -36,8 +36,18 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
return old;
}
bool Send(Message* message);
// See the comment in ipc_channel.h for info on Unsound_IsClosed() and
// Unsound_NumQueuedMessages().
bool Unsound_IsClosed() const;
uint32_t Unsound_NumQueuedMessages() const;
private:
void Init(Mode mode, Listener* listener);
void OutputQueuePush(Message* msg);
void OutputQueuePop();
const std::wstring PipeName(const std::wstring& channel_id) const;
bool CreatePipe(const std::wstring& channel_id, Mode mode);
bool EnqueueHelloMessage();
@ -89,6 +99,12 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
// This flag is set after Close() is run on the channel.
bool closed_;
// This variable is updated so it matches output_queue_.size(), except we can
// read output_queue_length_ from any thread (if we're OK getting an
// occasional out-of-date or bogus value). We use output_queue_length_ to
// implement Unsound_NumQueuedMessages.
size_t output_queue_length_;
ScopedRunnableMethodFactory<ChannelImpl> factory_;
scoped_ptr<NonThreadSafe> thread_check_;

Просмотреть файл

@ -270,6 +270,20 @@ AsyncChannel::ThreadLink::SendClose()
mTargetChan->OnChannelErrorFromLink();
}
bool
AsyncChannel::ThreadLink::Unsound_IsClosed() const
{
MonitorAutoLock lock(*mChan->mMonitor);
return mChan->mChannelState == ChannelClosed;
}
uint32_t
AsyncChannel::ThreadLink::Unsound_NumQueuedMessages() const
{
// ThreadLinks don't have a message queue.
return 0;
}
AsyncChannel::AsyncChannel(AsyncListener* aListener)
: mListener(aListener->asWeakPtr()),
mChannelState(ChannelClosed),
@ -678,6 +692,26 @@ AsyncChannel::DispatchOnChannelConnected(int32_t peer_pid)
mListener->OnChannelConnected(peer_pid);
}
bool
AsyncChannel::Unsound_IsClosed() const
{
if (!mLink) {
return true;
}
return mLink->Unsound_IsClosed();
}
uint32_t
AsyncChannel::Unsound_NumQueuedMessages() const
{
if (!mLink) {
return 0;
}
return mLink->Unsound_NumQueuedMessages();
}
//
// The methods below run in the context of the IO thread
//
@ -787,6 +821,18 @@ AsyncChannel::ProcessLink::OnCloseChannel()
mChan->mMonitor->Notify();
}
bool
AsyncChannel::ProcessLink::Unsound_IsClosed() const
{
return mTransport->Unsound_IsClosed();
}
uint32_t
AsyncChannel::ProcessLink::Unsound_NumQueuedMessages() const
{
return mTransport->Unsound_NumQueuedMessages();
}
//
// The methods below run in the context of the link thread
//

Просмотреть файл

@ -123,6 +123,15 @@ public:
// Send OnChannelConnected notification to listeners.
void DispatchOnChannelConnected(int32_t peer_pid);
// Unsound_IsClosed and Unsound_NumQueuedMessages are safe to call from any
// thread, but they make no guarantees about whether you'll get an
// up-to-date value; the values are written on one thread and read without
// locking, on potentially different threads. Thus you should only use
// them when you don't particularly care about getting a recent value (e.g.
// in a memory report).
bool Unsound_IsClosed() const;
uint32_t Unsound_NumQueuedMessages() const;
//
// Each AsyncChannel is associated with either a ProcessLink or a
// ThreadLink via the field mLink. The type of link is determined
@ -146,6 +155,9 @@ public:
virtual void EchoMessage(Message *msg) = 0;
virtual void SendMessage(Message *msg) = 0;
virtual void SendClose() = 0;
virtual bool Unsound_IsClosed() const = 0;
virtual uint32_t Unsound_NumQueuedMessages() const = 0;
};
class ProcessLink : public Link, public Transport::Listener {
@ -181,6 +193,9 @@ public:
virtual void EchoMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendClose() MOZ_OVERRIDE;
virtual bool Unsound_IsClosed() const MOZ_OVERRIDE;
virtual uint32_t Unsound_NumQueuedMessages() const MOZ_OVERRIDE;
};
class ThreadLink : public Link {
@ -194,6 +209,9 @@ public:
virtual void EchoMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendMessage(Message *msg) MOZ_OVERRIDE;
virtual void SendClose() MOZ_OVERRIDE;
virtual bool Unsound_IsClosed() const MOZ_OVERRIDE;
virtual uint32_t Unsound_NumQueuedMessages() const MOZ_OVERRIDE;
};
protected: