Bug 1792474 - Part 4: Avoid the IO thread hop when sending IPC messages, r=ipc-reviewers,jld

This involves some changes to IPC::Channel::ChannelImpl on all platforms in
order to ensure that they are threadsafe.

1. The ChannelImpl is now internally refcounted, making the correctness
   of parts of its lifecycle more clear.
2. Members of the channel are all annotated with `MOZ_GUARDED_BY` for either
   the `io_thread_` or `mutex_` depending on if they are required in order to
   send a message. This gives us some static checks that we won't deadlock.
3. The `closed_` field is removed, as thanks to the mutex, `pipe_` can now be
   checked directly from any thread instead. This reduces the risk of
   forgetting to update `closed_`.
4. `NodeChannel` now calls `Send()` without dispatching, which also required
   updating some other members to also be accessible from any thread, including
   changes to allow asynchronously reporting a channel error when `Send()`
   fails.
5. The Windows handling for `Connect()` was made more thread-safe to queue
   calls to `Send()` performed before `Connect()` returns. The posix
   `Connect()` handler already did this.

Differential Revision: https://phabricator.services.mozilla.com/D158162
This commit is contained in:
Nika Layzell 2022-10-07 01:51:29 +00:00
Родитель f51f359297
Коммит f5d6df8915
8 изменённых файлов: 384 добавлений и 261 удалений

Просмотреть файл

@ -101,6 +101,9 @@ class Channel {
// |listener| receives a callback on the current thread for each newly
// received message.
//
// The Channel must be created and destroyed on the IO thread, and all
// methods, unless otherwise noted, are only safe to call on the I/O thread.
//
Channel(const ChannelId& channel_id, Mode mode, Listener* listener);
// Initialize a pre-created channel |pipe| as |mode|.
@ -123,8 +126,8 @@ class Channel {
// Send a message over the Channel to the listener on the other end.
//
// |message| must be allocated using operator new. This object will be
// deleted once the contents of the Message have been sent.
// This method may be called from any thread, so long as the `Channel` is not
// destroyed before it returns.
//
// If you Send() a message on a Close()'d channel, we delete the message
// immediately.
@ -189,7 +192,7 @@ class Channel {
private:
// PIMPL to which all channel calls are delegated.
class ChannelImpl;
ChannelImpl* channel_impl_;
RefPtr<ChannelImpl> channel_impl_;
enum {
#if defined(OS_MACOSX)

Просмотреть файл

@ -142,14 +142,14 @@ void Channel::SetClientChannelFd(int fd) { gClientChannelFd = fd; }
#endif // defined(MOZ_WIDGET_ANDROID)
Channel::ChannelImpl::ChannelImpl(const ChannelId& channel_id, Mode mode,
Listener* listener) {
Listener* listener)
: io_thread_(MessageLoopForIO::current()->SerialEventTarget()) {
Init(mode, listener);
if (!CreatePipe(mode)) {
CHROMIUM_LOG(WARNING) << "Unable to create pipe in "
<< (mode == MODE_SERVER ? "server" : "client")
<< " mode error(" << strerror(errno) << ").";
closed_ = true;
return;
}
@ -157,7 +157,8 @@ Channel::ChannelImpl::ChannelImpl(const ChannelId& channel_id, Mode mode,
}
Channel::ChannelImpl::ChannelImpl(ChannelHandle pipe, Mode mode,
Listener* listener) {
Listener* listener)
: io_thread_(MessageLoopForIO::current()->SerialEventTarget()) {
Init(mode, listener);
SetPipe(pipe.release());
@ -165,6 +166,8 @@ Channel::ChannelImpl::ChannelImpl(ChannelHandle pipe, Mode mode,
}
void Channel::ChannelImpl::SetPipe(int fd) {
io_thread_.AssertOnCurrentThread();
pipe_ = fd;
pipe_buf_len_ = 0;
if (fd >= 0) {
@ -206,7 +209,6 @@ void Channel::ChannelImpl::Init(Mode mode, Listener* listener) {
client_pipe_ = -1;
listener_ = listener;
waiting_connect_ = true;
closed_ = false;
#if defined(OS_MACOSX)
last_pending_fd_id_ = 0;
other_task_ = nullptr;
@ -238,7 +240,7 @@ bool Channel::ChannelImpl::EnqueueHelloMessage() {
mozilla::UniquePtr<Message> msg(
new Message(MSG_ROUTING_NONE, HELLO_MESSAGE_TYPE));
if (!msg->WriteInt(base::GetCurrentProcId())) {
Close();
CloseLocked();
return false;
}
@ -247,6 +249,12 @@ bool Channel::ChannelImpl::EnqueueHelloMessage() {
}
bool Channel::ChannelImpl::Connect() {
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
return ConnectLocked();
}
bool Channel::ChannelImpl::ConnectLocked() {
if (pipe_ == -1) {
return false;
}
@ -302,7 +310,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
}
} else if (bytes_read == 0) {
// The pipe has closed...
Close();
CloseLocked();
return false;
}
DCHECK(bytes_read);
@ -392,7 +400,9 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
// stored in incoming_message_ followed by data in input_buf_ (followed by
// other messages).
while (p < end) {
// NOTE: We re-check `pipe_` after each message to make sure we weren't
// closed while calling `OnMessageReceived` or `OnChannelConnected`.
while (p < end && pipe_ != -1) {
// Try to figure out how big the message is. Size is 0 if we haven't read
// enough of the header to know the size.
uint32_t message_length = 0;
@ -514,7 +524,9 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
m.type() == HELLO_MESSAGE_TYPE) {
// The Hello message contains only the process id.
other_pid_ = MessageIterator(m).NextInt();
listener_->OnChannelConnected(other_pid_);
int32_t other_pid = other_pid_;
mozilla::MutexAutoUnlock unlock(mutex_);
listener_->OnChannelConnected(other_pid);
#if defined(OS_MACOSX)
} else if (m.routing_id() == MSG_ROUTING_NONE &&
m.type() == RECEIVED_FDS_MESSAGE_TYPE) {
@ -528,6 +540,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
return false;
}
#endif
mozilla::MutexAutoUnlock unlock(mutex_);
listener_->OnMessageReceived(std::move(incoming_message_));
}
@ -548,6 +561,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
}
bool Channel::ChannelImpl::ProcessOutgoingMessages() {
// NOTE: This method may be called on threads other than `io_thread_`.
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
// no connection?
is_blocked_on_write_ = false;
@ -691,8 +705,11 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
// FD over atomically.
case EMSGSIZE:
// Because this is likely to result in a busy-wait, we'll try to make
// it easier for the receiver to make progress.
sched_yield();
// it easier for the receiver to make progress, but only if we're on
// the I/O thread already.
if (io_thread_.IsOnCurrentThread()) {
sched_yield();
}
break;
#endif
default:
@ -717,12 +734,22 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
MOZ_DIAGNOSTIC_ASSERT(!partial_write_->iter_.Done());
}
// Tell libevent to call us back once things are unblocked.
is_blocked_on_write_ = true;
MessageLoopForIO::current()->WatchFileDescriptor(
pipe_,
false, // One shot
MessageLoopForIO::WATCH_WRITE, &write_watcher_, this);
if (io_thread_.IsOnCurrentThread()) {
// If we're on the I/O thread already, tell libevent to call us back
// when things are unblocked.
MessageLoopForIO::current()->WatchFileDescriptor(
pipe_,
false, // One shot
MessageLoopForIO::WATCH_WRITE, &write_watcher_, this);
} else {
// Otherwise, emulate being called back from libevent on the I/O thread,
// which will re-try the write, and then potentially start watching if
// still necessary.
io_thread_.Dispatch(mozilla::NewRunnableMethod<int>(
"ChannelImpl::ContinueProcessOutgoing", this,
&ChannelImpl::OnFileCanWriteWithoutBlocking, -1));
}
return true;
} else {
MOZ_ASSERT(partial_write_->handles_.Length() == num_fds,
@ -758,6 +785,9 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
}
bool Channel::ChannelImpl::Send(mozilla::UniquePtr<Message> message) {
// NOTE: This method may be called on threads other than `io_thread_`.
mozilla::MutexAutoLock lock(mutex_);
#ifdef IPC_MESSAGE_DEBUG_EXTRA
DLOG(INFO) << "sending message @" << message.get() << " on channel @" << this
<< " with type " << message->type() << " ("
@ -773,7 +803,7 @@ bool Channel::ChannelImpl::Send(mozilla::UniquePtr<Message> message) {
// to pop anything off output_queue; output_queue will only get emptied when
// the channel is destructed. We might as well delete message now, instead
// of waiting for the channel to be destructed.
if (closed_) {
if (pipe_ == -1) {
if (mozilla::ipc::LoggingEnabled()) {
fprintf(stderr,
"Can't send message %s, because this channel is closed.\n",
@ -794,12 +824,14 @@ bool Channel::ChannelImpl::Send(mozilla::UniquePtr<Message> message) {
void Channel::ChannelImpl::GetClientFileDescriptorMapping(int* src_fd,
int* dest_fd) const {
io_thread_.AssertOnCurrentThread();
DCHECK(mode_ == MODE_SERVER);
*src_fd = client_pipe_;
*dest_fd = gClientChannelFd;
}
void Channel::ChannelImpl::CloseClientFileDescriptor() {
io_thread_.AssertOnCurrentThread();
if (client_pipe_ != -1) {
IGNORE_EINTR(close(client_pipe_));
client_pipe_ = -1;
@ -808,9 +840,13 @@ void Channel::ChannelImpl::CloseClientFileDescriptor() {
// Called by libevent when we can read from th pipe without blocking.
void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) {
if (!waiting_connect_ && fd == pipe_) {
io_thread_.AssertOnCurrentThread();
mozilla::ReleasableMutexAutoLock lock(mutex_);
if (!waiting_connect_ && fd == pipe_ && pipe_ != -1) {
if (!ProcessIncomingMessages()) {
Close();
CloseLocked();
lock.Unlock();
listener_->OnChannelError();
// The OnChannelError() call may delete this, so we need to exit now.
return;
@ -835,7 +871,7 @@ void Channel::ChannelImpl::CloseDescriptors(uint32_t pending_fd_id) {
void Channel::ChannelImpl::OutputQueuePush(mozilla::UniquePtr<Message> msg) {
mozilla::LogIPCMessage::LogDispatchWithPid(msg.get(), other_pid_);
MOZ_DIAGNOSTIC_ASSERT(!closed_);
MOZ_DIAGNOSTIC_ASSERT(pipe_ != -1);
msg->AssertAsLargeAsHeader();
output_queue_.Push(std::move(msg));
}
@ -849,13 +885,23 @@ void Channel::ChannelImpl::OutputQueuePop() {
// Called by libevent when we can write to the pipe without blocking.
void Channel::ChannelImpl::OnFileCanWriteWithoutBlocking(int fd) {
if (!ProcessOutgoingMessages()) {
Close();
RefPtr<ChannelImpl> grip(this);
io_thread_.AssertOnCurrentThread();
mozilla::ReleasableMutexAutoLock lock(mutex_);
if (pipe_ != -1 && !ProcessOutgoingMessages()) {
CloseLocked();
lock.Unlock();
listener_->OnChannelError();
}
}
void Channel::ChannelImpl::Close() {
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
CloseLocked();
}
void Channel::ChannelImpl::CloseLocked() {
// Close can be called multiple times, so we need to make sure we're
// idempotent.
@ -887,23 +933,27 @@ void Channel::ChannelImpl::Close() {
other_task_ = nullptr;
#endif
closed_ = true;
}
#if defined(OS_MACOSX)
void Channel::ChannelImpl::SetOtherMachTask(task_t task) {
if (NS_WARN_IF(closed_)) {
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
if (NS_WARN_IF(pipe_ == -1)) {
return;
}
MOZ_ASSERT(accept_mach_ports_ && privileged_ && waiting_connect_);
other_task_ = mozilla::RetainMachSendRight(task);
// Now that `other_task_` is provided, we can continue connecting.
Connect();
ConnectLocked();
}
void Channel::ChannelImpl::StartAcceptingMachPorts(Mode mode) {
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
if (accept_mach_ports_) {
MOZ_ASSERT(privileged_ == (MODE_SERVER == mode));
return;
@ -1152,8 +1202,6 @@ bool Channel::ChannelImpl::TransferMachPorts(Message& msg) {
}
#endif
bool Channel::ChannelImpl::IsClosed() const { return closed_; }
//------------------------------------------------------------------------------
// Channel's methods simply call through to ChannelImpl.
Channel::Channel(const ChannelId& channel_id, Mode mode, Listener* listener)
@ -1166,10 +1214,7 @@ Channel::Channel(ChannelHandle pipe, Mode mode, Listener* listener)
MOZ_COUNT_CTOR(IPC::Channel);
}
Channel::~Channel() {
MOZ_COUNT_DTOR(IPC::Channel);
delete channel_impl_;
}
Channel::~Channel() { MOZ_COUNT_DTOR(IPC::Channel); }
bool Channel::Connect() { return channel_impl_->Connect(); }

Просмотреть файл

@ -19,10 +19,13 @@
#include "base/message_loop.h"
#include "base/task.h"
#include "mozilla/EventTargetCapability.h"
#include "mozilla/Maybe.h"
#include "mozilla/Mutex.h"
#include "mozilla/Queue.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/UniquePtrExtensions.h"
#include "nsISupports.h"
namespace IPC {
@ -30,70 +33,92 @@ namespace IPC {
// socketpairs. See the .cc file for an overview of the implementation.
class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING_WITH_DELETE_ON_EVENT_TARGET(
ChannelImpl, io_thread_.GetEventTarget());
using ChannelId = Channel::ChannelId;
// Mirror methods of Channel, see ipc_channel.h for description.
ChannelImpl(const ChannelId& channel_id, Mode mode, Listener* listener);
ChannelImpl(ChannelHandle pipe, Mode mode, Listener* listener);
~ChannelImpl() { Close(); }
bool Connect();
void Close();
bool Connect() MOZ_EXCLUDES(mutex_);
void Close() MOZ_EXCLUDES(mutex_);
Listener* set_listener(Listener* listener) {
io_thread_.AssertOnCurrentThread();
Listener* old = listener_;
listener_ = listener;
return old;
}
bool Send(mozilla::UniquePtr<Message> message);
// NOTE: `Send` may be called on threads other than the I/O thread.
bool Send(mozilla::UniquePtr<Message> message) MOZ_EXCLUDES(mutex_);
void GetClientFileDescriptorMapping(int* src_fd, int* dest_fd) const;
void CloseClientFileDescriptor();
int32_t OtherPid() const { return other_pid_; }
int32_t OtherPid() MOZ_EXCLUDES(mutex_) {
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
return other_pid_;
}
// See the comment in ipc_channel.h for info on IsClosed()
bool IsClosed() const;
// NOTE: `IsClosed` may be called on threads other than the I/O thread.
bool IsClosed() MOZ_EXCLUDES(mutex_) {
mozilla::MutexAutoLock lock(mutex_);
return pipe_ == -1;
}
#if defined(OS_MACOSX)
void SetOtherMachTask(task_t task);
void SetOtherMachTask(task_t task) MOZ_EXCLUDES(mutex_);
void StartAcceptingMachPorts(Mode mode);
void StartAcceptingMachPorts(Mode mode) MOZ_EXCLUDES(mutex_);
#endif
private:
void Init(Mode mode, Listener* listener);
bool CreatePipe(Mode mode);
void SetPipe(int fd);
bool PipeBufHasSpaceAfter(size_t already_written);
bool EnqueueHelloMessage();
~ChannelImpl() { Close(); }
bool ProcessIncomingMessages();
bool ProcessOutgoingMessages();
void Init(Mode mode, Listener* listener) MOZ_REQUIRES(mutex_, io_thread_);
bool CreatePipe(Mode mode) MOZ_REQUIRES(mutex_, io_thread_);
void SetPipe(int fd) MOZ_REQUIRES(mutex_, io_thread_);
bool PipeBufHasSpaceAfter(size_t already_written) MOZ_REQUIRES(mutex_);
bool EnqueueHelloMessage() MOZ_REQUIRES(mutex_, io_thread_);
bool ConnectLocked() MOZ_REQUIRES(mutex_, io_thread_);
void CloseLocked() MOZ_REQUIRES(mutex_, io_thread_);
bool ProcessIncomingMessages() MOZ_REQUIRES(mutex_, io_thread_);
bool ProcessOutgoingMessages() MOZ_REQUIRES(mutex_);
// MessageLoopForIO::Watcher implementation.
virtual void OnFileCanReadWithoutBlocking(int fd) override;
virtual void OnFileCanWriteWithoutBlocking(int fd) override;
#if defined(OS_MACOSX)
void CloseDescriptors(uint32_t pending_fd_id);
void CloseDescriptors(uint32_t pending_fd_id)
MOZ_REQUIRES(mutex_, io_thread_);
// Called on a Message immediately before it is sent/recieved to transfer
// handles to the remote process, or accept handles from the remote process.
bool AcceptMachPorts(Message& msg);
bool TransferMachPorts(Message& msg);
bool AcceptMachPorts(Message& msg) MOZ_REQUIRES(mutex_, io_thread_);
bool TransferMachPorts(Message& msg) MOZ_REQUIRES(mutex_);
#endif
void OutputQueuePush(mozilla::UniquePtr<Message> msg);
void OutputQueuePop();
void OutputQueuePush(mozilla::UniquePtr<Message> msg) MOZ_REQUIRES(mutex_);
void OutputQueuePop() MOZ_REQUIRES(mutex_);
Mode mode_;
mozilla::Mutex mutex_{"ChannelImpl"};
const mozilla::EventTargetCapability<nsISerialEventTarget> io_thread_;
Mode mode_ MOZ_GUARDED_BY(io_thread_);
// After accepting one client connection on our server socket we want to
// stop listening.
MessageLoopForIO::FileDescriptorWatcher read_watcher_;
MessageLoopForIO::FileDescriptorWatcher write_watcher_;
MessageLoopForIO::FileDescriptorWatcher read_watcher_
MOZ_GUARDED_BY(io_thread_);
MessageLoopForIO::FileDescriptorWatcher write_watcher_
MOZ_GUARDED_BY(io_thread_);
// Indicates whether we're currently blocked waiting for a write to complete.
bool is_blocked_on_write_;
bool is_blocked_on_write_ MOZ_GUARDED_BY(mutex_);
// If sending a message blocks then we use this iterator to keep track of
// where in the message we are. It gets reset when the message is finished
@ -102,21 +127,24 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
Pickle::BufferList::IterImpl iter_;
mozilla::Span<const mozilla::UniqueFileHandle> handles_;
};
mozilla::Maybe<PartialWrite> partial_write_;
mozilla::Maybe<PartialWrite> partial_write_ MOZ_GUARDED_BY(mutex_);
int pipe_;
int client_pipe_; // The client end of our socketpair().
unsigned pipe_buf_len_; // The SO_SNDBUF value of pipe_, or 0 if unknown.
int pipe_ MOZ_GUARDED_BY(mutex_);
// The client end of our socketpair().
int client_pipe_ MOZ_GUARDED_BY(io_thread_);
// The SO_SNDBUF value of pipe_, or 0 if unknown.
unsigned pipe_buf_len_ MOZ_GUARDED_BY(mutex_);
Listener* listener_;
Listener* listener_ MOZ_GUARDED_BY(io_thread_);
// Messages to be sent are queued here.
mozilla::Queue<mozilla::UniquePtr<Message>, 64> output_queue_;
mozilla::Queue<mozilla::UniquePtr<Message>, 64> output_queue_
MOZ_GUARDED_BY(mutex_);
// We read from the pipe into these buffers.
size_t input_buf_offset_;
mozilla::UniquePtr<char[]> input_buf_;
mozilla::UniquePtr<char[]> input_cmsg_buf_;
size_t input_buf_offset_ MOZ_GUARDED_BY(io_thread_);
mozilla::UniquePtr<char[]> input_buf_ MOZ_GUARDED_BY(io_thread_);
mozilla::UniquePtr<char[]> input_cmsg_buf_ MOZ_GUARDED_BY(io_thread_);
// The control message buffer will hold all of the file descriptors that will
// be read in during a single recvmsg call. Message::WriteFileDescriptor
@ -137,20 +165,18 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
// Large incoming messages that span multiple pipe buffers get built-up in the
// buffers of this message.
mozilla::UniquePtr<Message> incoming_message_;
std::vector<int> input_overflow_fds_;
mozilla::UniquePtr<Message> incoming_message_ MOZ_GUARDED_BY(io_thread_);
std::vector<int> input_overflow_fds_ MOZ_GUARDED_BY(io_thread_);
// In server-mode, we have to wait for the client to connect before we
// can begin reading. We make use of the input_state_ when performing
// the connect operation in overlapped mode.
bool waiting_connect_;
// This flag is set after we've closed the channel.
std::atomic<bool> closed_;
// Will be set to `true` until `Connect()` has been called and communication
// is ready. For privileged connections on macOS, this will not be cleared
// until the peer mach port has been provided to allow transferring mach
// ports.
bool waiting_connect_ MOZ_GUARDED_BY(mutex_) = true;
// We keep track of the PID of the other side of this channel so that we can
// record this when generating logs of IPC messages.
int32_t other_pid_ = -1;
int32_t other_pid_ MOZ_GUARDED_BY(mutex_) = -1;
#if defined(OS_MACOSX)
struct PendingDescriptors {
@ -158,19 +184,19 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
nsTArray<mozilla::UniqueFileHandle> handles;
};
std::list<PendingDescriptors> pending_fds_;
std::list<PendingDescriptors> pending_fds_ MOZ_GUARDED_BY(mutex_);
// A generation ID for RECEIVED_FD messages.
uint32_t last_pending_fd_id_;
uint32_t last_pending_fd_id_ MOZ_GUARDED_BY(mutex_);
// Whether or not to accept mach ports from a remote process, and whether this
// process is the privileged side of a IPC::Channel which can transfer mach
// ports.
bool accept_mach_ports_ = false;
bool privileged_ = false;
bool accept_mach_ports_ MOZ_GUARDED_BY(mutex_) = false;
bool privileged_ MOZ_GUARDED_BY(mutex_) = false;
// If available, the task port for the remote process.
mozilla::UniqueMachSendRight other_task_;
mozilla::UniqueMachSendRight other_task_ MOZ_GUARDED_BY(mutex_);
#endif
DISALLOW_COPY_AND_ASSIGN(ChannelImpl);

Просмотреть файл

@ -31,22 +31,10 @@
using namespace mozilla::ipc;
// ChannelImpl is used on the IPC thread, but constructed on a different thread,
// so it has to hold the nsAutoOwningThread as a pointer, and we need a slightly
// different macro.
#ifdef DEBUG
# define ASSERT_OWNINGTHREAD(_class) \
if (nsAutoOwningThread* owningThread = _mOwningThread.get()) { \
owningThread->AssertOwnership(#_class " not thread-safe"); \
}
#else
# define ASSERT_OWNINGTHREAD(_class) ((void)0)
#endif
namespace IPC {
//------------------------------------------------------------------------------
Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) {
Channel::ChannelImpl::State::State(ChannelImpl* channel) {
memset(&context.overlapped, 0, sizeof(context.overlapped));
context.handler = channel;
}
@ -60,9 +48,9 @@ Channel::ChannelImpl::State::~State() {
Channel::ChannelImpl::ChannelImpl(const ChannelId& channel_id, Mode mode,
Listener* listener)
: ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
: io_thread_(MessageLoopForIO::current()->SerialEventTarget()),
ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)) {
Init(mode, listener);
if (!CreatePipe(channel_id, mode)) {
@ -75,13 +63,12 @@ Channel::ChannelImpl::ChannelImpl(const ChannelId& channel_id, Mode mode,
Channel::ChannelImpl::ChannelImpl(ChannelHandle pipe, Mode mode,
Listener* listener)
: ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
: io_thread_(MessageLoopForIO::current()->SerialEventTarget()),
ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)) {
Init(mode, listener);
if (!pipe) {
closed_ = true;
return;
}
@ -95,11 +82,11 @@ void Channel::ChannelImpl::Init(Mode mode, Listener* listener) {
// Verify that we fit in a "quantum-spaced" jemalloc bucket.
static_assert(sizeof(*this) <= 512, "Exceeded expected size class");
mode_ = mode;
pipe_ = INVALID_HANDLE_VALUE;
listener_ = listener;
waiting_connect_ = (mode == MODE_SERVER);
waiting_connect_ = true;
processing_incoming_ = false;
closed_ = false;
input_buf_offset_ = 0;
input_buf_ = mozilla::MakeUnique<char[]>(Channel::kReadBufferSize);
accept_handles_ = false;
@ -118,8 +105,15 @@ void Channel::ChannelImpl::OutputQueuePop() {
}
void Channel::ChannelImpl::Close() {
ASSERT_OWNINGTHREAD(ChannelImpl);
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
CloseLocked();
}
void Channel::ChannelImpl::CloseLocked() {
// If we still have pending I/O, cancel it. The references inside
// `input_state_` and `output_state_` will keep the buffers alive until they
// complete.
if (input_state_.is_pending || output_state_.is_pending) {
CancelIo(pipe_);
}
@ -137,22 +131,24 @@ void Channel::ChannelImpl::Close() {
other_process_ = INVALID_HANDLE_VALUE;
}
// Don't return from `CloseLocked()` until the IO has been completed,
// otherwise the IO thread may exit with outstanding IO, leaking the
// ChannelImpl.
//
// It's OK to unlock here, as calls to `Send` from other threads will be
// rejected, due to `pipe_` having been cleared.
while (input_state_.is_pending || output_state_.is_pending) {
mozilla::MutexAutoUnlock unlock(mutex_);
MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
}
while (!output_queue_.IsEmpty()) {
OutputQueuePop();
}
#ifdef DEBUG
_mOwningThread = nullptr;
#endif
closed_ = true;
}
bool Channel::ChannelImpl::Send(mozilla::UniquePtr<Message> message) {
ASSERT_OWNINGTHREAD(ChannelImpl);
mozilla::MutexAutoLock lock(mutex_);
#ifdef IPC_MESSAGE_DEBUG_EXTRA
DLOG(INFO) << "sending message @" << message.get() << " on channel @" << this
@ -165,7 +161,7 @@ bool Channel::ChannelImpl::Send(mozilla::UniquePtr<Message> message) {
"Channel::ChannelImpl::Send", std::move(message));
#endif
if (closed_) {
if (pipe_ == INVALID_HANDLE_VALUE) {
if (mozilla::ipc::LoggingEnabled()) {
fprintf(stderr,
"Can't send message %s, because this channel is closed.\n",
@ -178,7 +174,9 @@ bool Channel::ChannelImpl::Send(mozilla::UniquePtr<Message> message) {
// ensure waiting to write
if (!waiting_connect_) {
if (!output_state_.is_pending) {
if (!ProcessOutgoingMessages(NULL, 0)) return false;
if (!ProcessOutgoingMessages(NULL, 0, false)) {
return false;
}
}
}
@ -230,7 +228,6 @@ bool Channel::ChannelImpl::CreatePipe(const ChannelId& channel_id, Mode mode) {
if (pipe_ == INVALID_HANDLE_VALUE) {
// If this process is being closed, the pipe may be gone already.
CHROMIUM_LOG(WARNING) << "failed to create pipe: " << GetLastError();
closed_ = true;
return false;
}
@ -258,38 +255,43 @@ bool Channel::ChannelImpl::EnqueueHelloMessage() {
}
bool Channel::ChannelImpl::Connect() {
#ifdef DEBUG
if (!_mOwningThread) {
_mOwningThread = mozilla::MakeUnique<nsAutoOwningThread>();
}
#endif
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
if (pipe_ == INVALID_HANDLE_VALUE) return false;
MessageLoopForIO::current()->RegisterIOHandler(pipe_, this);
// Check to see if there is a client connected to our pipe...
if (waiting_connect_) {
if (mode_ == MODE_SERVER) {
DCHECK(!input_state_.is_pending);
if (!ProcessConnection()) {
return false;
}
} else {
waiting_connect_ = false;
}
if (!input_state_.is_pending) {
// Complete setup asynchronously. By not setting input_state_.is_pending
// to true, we indicate to OnIOCompleted that this is the special
// initialization signal.
MessageLoopForIO::current()->PostTask(factory_.NewRunnableMethod(
&Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0));
// to `this`, we indicate to OnIOCompleted that this is the special
// initialization signal, while keeping a reference through the
// `RunnableMethod`.
io_thread_.Dispatch(
mozilla::NewRunnableMethod<MessageLoopForIO::IOContext*, DWORD, DWORD>(
"ContinueConnect", this, &ChannelImpl::OnIOCompleted,
&input_state_.context, 0, 0));
}
if (!waiting_connect_) ProcessOutgoingMessages(NULL, 0);
if (!waiting_connect_) {
DCHECK(!output_state_.is_pending);
ProcessOutgoingMessages(NULL, 0, false);
}
return true;
}
bool Channel::ChannelImpl::ProcessConnection() {
ASSERT_OWNINGTHREAD(ChannelImpl);
if (input_state_.is_pending) input_state_.is_pending = false;
DCHECK(!input_state_.is_pending);
// Do we have a client connected to our pipe?
if (INVALID_HANDLE_VALUE == pipe_) return false;
@ -306,7 +308,7 @@ bool Channel::ChannelImpl::ProcessConnection() {
switch (err) {
case ERROR_IO_PENDING:
input_state_.is_pending = true;
input_state_.is_pending = this;
break;
case ERROR_PIPE_CONNECTED:
waiting_connect_ = false;
@ -323,10 +325,10 @@ bool Channel::ChannelImpl::ProcessConnection() {
}
bool Channel::ChannelImpl::ProcessIncomingMessages(
MessageLoopForIO::IOContext* context, DWORD bytes_read) {
ASSERT_OWNINGTHREAD(ChannelImpl);
if (input_state_.is_pending) {
input_state_.is_pending = false;
MessageLoopForIO::IOContext* context, DWORD bytes_read, bool was_pending) {
DCHECK(!input_state_.is_pending);
if (was_pending) {
DCHECK(context);
if (!context || !bytes_read) return false;
@ -346,7 +348,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
if (!ok) {
DWORD err = GetLastError();
if (err == ERROR_IO_PENDING) {
input_state_.is_pending = true;
input_state_.is_pending = this;
return true;
}
if (err != ERROR_BROKEN_PIPE) {
@ -354,7 +356,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
}
return false;
}
input_state_.is_pending = true;
input_state_.is_pending = this;
return true;
}
DCHECK(bytes_read);
@ -364,7 +366,9 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
const char* p = input_buf_.get();
const char* end = input_buf_.get() + input_buf_offset_ + bytes_read;
while (p < end) {
// NOTE: We re-check `pipe_` after each message to make sure we weren't
// closed while calling `OnMessageReceived` or `OnChannelConnected`.
while (p < end && INVALID_HANDLE_VALUE != pipe_) {
// Try to figure out how big the message is. Size is 0 if we haven't read
// enough of the header to know the size.
uint32_t message_length = 0;
@ -444,8 +448,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
if (waiting_for_shared_secret_ && (it.NextInt() != shared_secret_)) {
NOTREACHED();
// Something went wrong. Abort connection.
Close();
listener_->OnChannelError();
// NOTE: Caller will `Close()` and notify `OnChannelError`.
return false;
}
waiting_for_shared_secret_ = false;
@ -461,12 +464,15 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
}
}
listener_->OnChannelConnected(other_pid_);
int32_t other_pid = other_pid_;
mozilla::MutexAutoUnlock unlock(mutex_);
listener_->OnChannelConnected(other_pid);
} else {
mozilla::LogIPCMessage::Run run(&m);
if (!AcceptHandles(m)) {
return false;
}
mozilla::MutexAutoUnlock unlock(mutex_);
listener_->OnMessageReceived(std::move(incoming_message_));
}
@ -478,14 +484,13 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
}
bool Channel::ChannelImpl::ProcessOutgoingMessages(
MessageLoopForIO::IOContext* context, DWORD bytes_written) {
MessageLoopForIO::IOContext* context, DWORD bytes_written,
bool was_pending) {
DCHECK(!output_state_.is_pending);
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
// no connection?
ASSERT_OWNINGTHREAD(ChannelImpl);
if (output_state_.is_pending) {
if (was_pending) {
DCHECK(context);
output_state_.is_pending = false;
if (!context || bytes_written == 0) {
DWORD err = GetLastError();
if (err != ERROR_BROKEN_PIPE) {
@ -541,7 +546,7 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages(
if (!ok) {
DWORD err = GetLastError();
if (err == ERROR_IO_PENDING) {
output_state_.is_pending = true;
output_state_.is_pending = this;
#ifdef IPC_MESSAGE_DEBUG_EXTRA
DLOG(INFO) << "sent pending message @" << m << " on channel @" << this
@ -561,40 +566,58 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages(
<< " with type " << m->type();
#endif
output_state_.is_pending = true;
output_state_.is_pending = this;
return true;
}
void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
DWORD bytes_transfered, DWORD error) {
// NOTE: In case the pending reference was the last reference, release it
// outside of the lock.
RefPtr<ChannelImpl> was_pending;
io_thread_.AssertOnCurrentThread();
mozilla::ReleasableMutexAutoLock lock(mutex_);
bool ok;
ASSERT_OWNINGTHREAD(ChannelImpl);
if (context == &input_state_.context) {
if (waiting_connect_) {
if (!ProcessConnection()) return;
was_pending = input_state_.is_pending.forget();
bool was_waiting_connect = waiting_connect_;
if (was_waiting_connect) {
if (!ProcessConnection()) {
return;
}
// We may have some messages queued up to send...
if (!output_queue_.IsEmpty() && !output_state_.is_pending)
ProcessOutgoingMessages(NULL, 0);
if (input_state_.is_pending) return;
if (!output_queue_.IsEmpty() && !output_state_.is_pending) {
ProcessOutgoingMessages(NULL, 0, false);
}
if (input_state_.is_pending) {
return;
}
// else, fall-through and look for incoming messages...
}
// we don't support recursion through OnMessageReceived yet!
DCHECK(!processing_incoming_);
processing_incoming_ = true;
ok = ProcessIncomingMessages(context, bytes_transfered);
ok = ProcessIncomingMessages(context, bytes_transfered,
was_pending && !was_waiting_connect);
processing_incoming_ = false;
} else {
DCHECK(context == &output_state_.context);
ok = ProcessOutgoingMessages(context, bytes_transfered);
was_pending = output_state_.is_pending.forget();
ok = ProcessOutgoingMessages(context, bytes_transfered, was_pending);
}
if (!ok && INVALID_HANDLE_VALUE != pipe_) {
// We don't want to re-enter Close().
Close();
CloseLocked();
lock.Unlock();
listener_->OnChannelError();
}
}
void Channel::ChannelImpl::StartAcceptingHandles(Mode mode) {
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
if (accept_handles_) {
MOZ_ASSERT(privileged_ == (mode == MODE_SERVER));
return;
@ -739,8 +762,6 @@ bool Channel::ChannelImpl::TransferHandles(Message& msg) {
return true;
}
bool Channel::ChannelImpl::IsClosed() const { return closed_; }
//------------------------------------------------------------------------------
// Channel's methods simply call through to ChannelImpl.
Channel::Channel(const ChannelId& channel_id, Mode mode, Listener* listener)
@ -753,10 +774,7 @@ Channel::Channel(ChannelHandle pipe, Mode mode, Listener* listener)
MOZ_COUNT_CTOR(IPC::Channel);
}
Channel::~Channel() {
MOZ_COUNT_DTOR(IPC::Channel);
delete channel_impl_;
}
Channel::~Channel() { MOZ_COUNT_DTOR(IPC::Channel); }
bool Channel::Connect() { return channel_impl_->Connect(); }

Просмотреть файл

@ -17,7 +17,9 @@
#include "base/task.h"
#include "nsISupportsImpl.h"
#include "mozilla/EventTargetCapability.h"
#include "mozilla/Maybe.h"
#include "mozilla/Mutex.h"
#include "mozilla/Queue.h"
#include "mozilla/UniquePtr.h"
@ -25,131 +27,150 @@ namespace IPC {
class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING_WITH_DELETE_ON_EVENT_TARGET(
ChannelImpl, io_thread_.GetEventTarget());
using ChannelId = Channel::ChannelId;
using ChannelHandle = Channel::ChannelHandle;
// Mirror methods of Channel, see ipc_channel.h for description.
ChannelImpl(const ChannelId& channel_id, Mode mode, Listener* listener);
ChannelImpl(ChannelHandle pipe, Mode mode, Listener* listener);
bool Connect() MOZ_EXCLUDES(mutex_);
void Close() MOZ_EXCLUDES(mutex_);
void StartAcceptingHandles(Mode mode) MOZ_EXCLUDES(mutex_);
Listener* set_listener(Listener* listener) {
io_thread_.AssertOnCurrentThread();
Listener* old = listener_;
listener_ = listener;
return old;
}
// NOTE: `Send` may be called on threads other than the I/O thread.
bool Send(mozilla::UniquePtr<Message> message) MOZ_EXCLUDES(mutex_);
int32_t OtherPid() MOZ_EXCLUDES(mutex_) {
io_thread_.AssertOnCurrentThread();
mozilla::MutexAutoLock lock(mutex_);
return other_pid_;
}
// See the comment in ipc_channel.h for info on IsClosed()
// NOTE: `IsClosed` may be called on threads other than the I/O thread.
bool IsClosed() MOZ_EXCLUDES(mutex_) {
mozilla::MutexAutoLock lock(mutex_);
return pipe_ == INVALID_HANDLE_VALUE;
}
private:
~ChannelImpl() {
io_thread_.AssertOnCurrentThread();
if (pipe_ != INVALID_HANDLE_VALUE ||
other_process_ != INVALID_HANDLE_VALUE) {
Close();
}
}
bool Connect();
void Close();
void StartAcceptingHandles(Mode mode);
Listener* set_listener(Listener* listener) {
Listener* old = listener_;
listener_ = listener;
return old;
}
bool Send(mozilla::UniquePtr<Message> message);
int32_t OtherPid() const { return other_pid_; }
void Init(Mode mode, Listener* listener) MOZ_REQUIRES(mutex_, io_thread_);
// See the comment in ipc_channel.h for info on IsClosed()
bool IsClosed() const;
private:
void Init(Mode mode, Listener* listener);
void OutputQueuePush(mozilla::UniquePtr<Message> msg);
void OutputQueuePop();
void OutputQueuePush(mozilla::UniquePtr<Message> msg) MOZ_REQUIRES(mutex_);
void OutputQueuePop() MOZ_REQUIRES(mutex_);
const ChannelId PipeName(const ChannelId& channel_id, int32_t* secret) const;
bool CreatePipe(const ChannelId& channel_id, Mode mode);
bool EnqueueHelloMessage();
bool CreatePipe(const ChannelId& channel_id, Mode mode)
MOZ_REQUIRES(mutex_, io_thread_);
bool EnqueueHelloMessage() MOZ_REQUIRES(mutex_, io_thread_);
void CloseLocked() MOZ_REQUIRES(mutex_, io_thread_);
bool ProcessConnection();
bool ProcessConnection() MOZ_REQUIRES(mutex_, io_thread_);
bool ProcessIncomingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_read);
DWORD bytes_read, bool was_pending)
MOZ_REQUIRES(mutex_, io_thread_);
bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_written);
DWORD bytes_written, bool was_pending)
MOZ_REQUIRES(mutex_);
// Called on a Message immediately before it is sent/recieved to transfer
// handles to the remote process, or accept handles from the remote process.
bool AcceptHandles(Message& msg);
bool TransferHandles(Message& msg);
bool AcceptHandles(Message& msg) MOZ_REQUIRES(mutex_, io_thread_);
bool TransferHandles(Message& msg) MOZ_REQUIRES(mutex_);
// MessageLoop::IOHandler implementation.
virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
DWORD bytes_transfered, DWORD error);
private:
mozilla::Mutex mutex_{"ChannelImpl"};
const mozilla::EventTargetCapability<nsISerialEventTarget> io_thread_;
Mode mode_ MOZ_GUARDED_BY(io_thread_);
struct State {
explicit State(ChannelImpl* channel);
~State();
MessageLoopForIO::IOContext context;
bool is_pending = false;
// When there is pending I/O, this holds a strong reference to the
// ChannelImpl to prevent it from going away.
RefPtr<ChannelImpl> is_pending;
};
State input_state_;
State output_state_;
State input_state_ MOZ_GUARDED_BY(io_thread_);
State output_state_ MOZ_GUARDED_BY(mutex_);
HANDLE pipe_ = INVALID_HANDLE_VALUE;
HANDLE pipe_ MOZ_GUARDED_BY(mutex_) = INVALID_HANDLE_VALUE;
Listener* listener_ = nullptr;
Listener* listener_ MOZ_GUARDED_BY(io_thread_) = nullptr;
// Messages to be sent are queued here.
mozilla::Queue<mozilla::UniquePtr<Message>, 64> output_queue_;
mozilla::Queue<mozilla::UniquePtr<Message>, 64> output_queue_
MOZ_GUARDED_BY(mutex_);
// If sending a message blocks then we use this iterator to keep track of
// where in the message we are. It gets reset when the message is finished
// sending.
mozilla::Maybe<Pickle::BufferList::IterImpl> partial_write_iter_;
mozilla::Maybe<Pickle::BufferList::IterImpl> partial_write_iter_
MOZ_GUARDED_BY(mutex_);
// We read from the pipe into this buffer
mozilla::UniquePtr<char[]> input_buf_;
size_t input_buf_offset_ = 0;
mozilla::UniquePtr<char[]> input_buf_ MOZ_GUARDED_BY(io_thread_);
size_t input_buf_offset_ MOZ_GUARDED_BY(io_thread_) = 0;
// Large incoming messages that span multiple pipe buffers get built-up in the
// buffers of this message.
mozilla::UniquePtr<Message> incoming_message_;
mozilla::UniquePtr<Message> incoming_message_ MOZ_GUARDED_BY(io_thread_);
// In server-mode, we have to wait for the client to connect before we
// can begin reading. We make use of the input_state_ when performing
// the connect operation in overlapped mode.
bool waiting_connect_ = false;
// Will be set to `true` until `Connect()` has been called, and, if in
// server-mode, the client has connected. The `input_state_` is used to wait
// for the client to connect in overlapped mode.
bool waiting_connect_ MOZ_GUARDED_BY(mutex_) = true;
// This flag is set when processing incoming messages. It is used to
// avoid recursing through ProcessIncomingMessages, which could cause
// problems. TODO(darin): make this unnecessary
bool processing_incoming_ = false;
// This flag is set after Close() is run on the channel.
std::atomic<bool> closed_ = false;
bool processing_incoming_ MOZ_GUARDED_BY(io_thread_) = false;
// We keep track of the PID of the other side of this channel so that we can
// record this when generating logs of IPC messages.
int32_t other_pid_ = -1;
ScopedRunnableMethodFactory<ChannelImpl> factory_;
int32_t other_pid_ MOZ_GUARDED_BY(mutex_) = -1;
// This is a unique per-channel value used to authenticate the client end of
// a connection. If the value is non-zero, the client passes it in the hello
// and the host validates. (We don't send the zero value to preserve IPC
// compatibility with existing clients that don't validate the channel.)
int32_t shared_secret_ = 0;
int32_t shared_secret_ MOZ_GUARDED_BY(io_thread_) = 0;
// In server-mode, we wait for the channel at the other side of the pipe to
// send us back our shared secret, if we are using one.
bool waiting_for_shared_secret_ = false;
bool waiting_for_shared_secret_ MOZ_GUARDED_BY(io_thread_) = false;
// Whether or not to accept handles from a remote process, and whether this
// process is the privileged side of a IPC::Channel which can transfer
// handles.
bool accept_handles_ = false;
bool privileged_ = false;
bool accept_handles_ MOZ_GUARDED_BY(mutex_) = false;
bool privileged_ MOZ_GUARDED_BY(mutex_) = false;
// A privileged process handle used to transfer HANDLEs to and from the remote
// process. This will only be used if `privileged_` is set.
HANDLE other_process_ = INVALID_HANDLE_VALUE;
#ifdef DEBUG
mozilla::UniquePtr<nsAutoOwningThread> _mOwningThread;
#endif
HANDLE other_process_ MOZ_GUARDED_BY(mutex_) = INVALID_HANDLE_VALUE;
DISALLOW_COPY_AND_ASSIGN(ChannelImpl);
};

Просмотреть файл

@ -47,12 +47,7 @@ NodeChannel::NodeChannel(const NodeName& aName,
mOtherPid(aPid),
mChannel(std::move(aChannel)) {}
NodeChannel::~NodeChannel() {
AssertIOThread();
if (!mClosed) {
mChannel->Close();
}
}
NodeChannel::~NodeChannel() { Close(); }
// Called when the NodeChannel's refcount drops to `0`.
void NodeChannel::Destroy() {
@ -108,7 +103,7 @@ void NodeChannel::Start(bool aCallConnect) {
// Handle any events the previous listener had queued up. Make sure to stop
// if an error causes our channel to become closed.
while (!pending.empty() && !mClosed) {
while (!pending.empty() && mState != State::Closed) {
OnMessageReceived(std::move(pending.front()));
pending.pop();
}
@ -118,11 +113,10 @@ void NodeChannel::Start(bool aCallConnect) {
void NodeChannel::Close() {
AssertIOThread();
if (!mClosed) {
if (mState.exchange(State::Closed) != State::Closed) {
mChannel->Close();
mChannel->set_listener(mExistingListener);
}
mClosed = true;
}
void NodeChannel::SetOtherPid(base::ProcessId aNewPid) {
@ -141,7 +135,7 @@ void NodeChannel::SetOtherPid(base::ProcessId aNewPid) {
void NodeChannel::SetMachTaskPort(task_t aTask) {
AssertIOThread();
if (!mClosed) {
if (mState != State::Closed) {
mChannel->SetOtherMachTask(aTask);
}
}
@ -205,28 +199,31 @@ void NodeChannel::SendMessage(UniquePtr<IPC::Message> aMessage) {
}
aMessage->AssertAsLargeAsHeader();
XRE_GetIOMessageLoop()->PostTask(
NewRunnableMethod<StoreCopyPassByRRef<UniquePtr<IPC::Message>>>(
"NodeChannel::DoSendMessage", this, &NodeChannel::DoSendMessage,
std::move(aMessage)));
}
void NodeChannel::DoSendMessage(UniquePtr<IPC::Message> aMessage) {
#ifdef FUZZING_SNAPSHOT
if (mBlockSendRecv) {
return;
}
#endif
AssertIOThread();
if (mClosed) {
if (mState != State::Active) {
NS_WARNING("Dropping message as channel has been closed");
return;
}
// NOTE: As this is not guaranteed to be running on the I/O thread, the
// channel may have become closed since we checked above. IPC::Channel will
// handle that and return `false` here, so we can re-check `mState`.
if (!mChannel->Send(std::move(aMessage))) {
NS_WARNING("Call to Send() failed");
OnChannelError();
// If we're still active, update `mState` to `State::Closing`, and dispatch
// a runnable to actually close our channel.
State expected = State::Active;
if (mState.compare_exchange_strong(expected, State::Closing)) {
XRE_GetIOMessageLoop()->PostTask(
NewRunnableMethod("NodeChannel::CloseForSendError", this,
&NodeChannel::OnChannelError));
}
}
}
@ -315,10 +312,14 @@ void NodeChannel::OnChannelConnected(base::ProcessId aPeerPid) {
void NodeChannel::OnChannelError() {
AssertIOThread();
State prev = mState.exchange(State::Closed);
if (prev == State::Closed) {
return;
}
// Clean up the channel and make sure we're no longer the active listener.
mChannel->Close();
MOZ_ALWAYS_TRUE(this == mChannel->set_listener(mExistingListener));
mClosed = true;
// Tell our listener about the error.
mListener->OnChannelError(mName);

Просмотреть файл

@ -108,6 +108,7 @@ class NodeChannel final : public IPC::Channel::Listener {
void SetName(const NodeName& aNewName) { mName = aNewName; }
#ifdef FUZZING_SNAPSHOT
// MUST BE CALLED FROM THE IO THREAD.
const NodeName& GetName() { return mName; }
#endif
@ -127,7 +128,6 @@ class NodeChannel final : public IPC::Channel::Listener {
void SetOtherPid(base::ProcessId aNewPid);
void SendMessage(UniquePtr<IPC::Message> aMessage);
void DoSendMessage(UniquePtr<IPC::Message> aMessage);
// IPC::Channel::Listener implementation
void OnMessageReceived(UniquePtr<IPC::Message> aMessage) override;
@ -150,15 +150,20 @@ class NodeChannel final : public IPC::Channel::Listener {
// be read from other threads.
std::atomic<base::ProcessId> mOtherPid;
// WARNING: This must only be accessed on the IO thread.
mozilla::UniquePtr<IPC::Channel> mChannel;
// WARNING: Most methods on the IPC::Channel are only safe to call on the IO
// thread, however it is safe to call `Send()` and `IsClosed()` from other
// threads. See IPC::Channel's documentation for details.
const mozilla::UniquePtr<IPC::Channel> mChannel;
// WARNING: This must only be accessed on the IO thread.
bool mClosed = false;
// The state will start out as `State::Active`, and will only transition to
// `State::Closed` on the IO thread. If a Send fails, the state will
// transition to `State::Closing`, and a runnable will be dispatched to the
// I/O thread to notify callbacks.
enum class State { Active, Closing, Closed };
std::atomic<State> mState = State::Active;
#ifdef FUZZING_SNAPSHOT
// WARNING: This must only be accessed on the IO thread.
bool mBlockSendRecv = false;
std::atomic<bool> mBlockSendRecv = false;
#endif
// WARNING: Must only be accessed on the IO thread.

Просмотреть файл

@ -563,23 +563,28 @@ void NodeController::OnBroadcast(const NodeName& aFromNode,
return;
}
nsTArray<RefPtr<NodeChannel>> peers;
{
auto state = mState.Lock();
peers.SetCapacity(state->mPeers.Count());
for (const auto& peer : state->mPeers.Values()) {
// NOTE: This `clone` operation is only supported for a limited number of
// message types by the ports API, which provides some extra security by
// only allowing those specific types of messages to be broadcasted.
// Messages which don't support `Clone` cannot be broadcast, and the ports
// library will not attempt to broadcast them.
auto clone = event->Clone();
if (!clone) {
NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
break;
}
peer->SendEventMessage(SerializeEventMessage(std::move(clone)));
peers.AppendElement(peer);
}
}
for (const auto& peer : peers) {
// NOTE: This `clone` operation is only supported for a limited number of
// message types by the ports API, which provides some extra security by
// only allowing those specific types of messages to be broadcasted.
// Messages which don't support `Clone` cannot be broadcast, and the ports
// library will not attempt to broadcast them.
auto clone = event->Clone();
if (!clone) {
NODECONTROLLER_WARNING("Attempt to broadcast unsupported message");
break;
}
peer->SendEventMessage(SerializeEventMessage(std::move(clone)));
}
}
void NodeController::OnIntroduce(const NodeName& aFromNode,
@ -627,13 +632,12 @@ void NodeController::OnIntroduce(const NodeName& aFromNode,
}
// Deliver any pending messages, then remove the entry from our table. We do
// this while `peersLock` is still held to ensure that these messages are
// this while `mState` is still held to ensure that these messages are
// all sent before another thread can observe the newly created channel.
// This is safe because `SendEventMessage` currently unconditionally
// dispatches to the IO thread, so we can't deadlock. We'll call
// `nodeChannel->Start()` with the lock not held, but on the IO thread,
// before any actual `Send` requests are processed so the channel will
// already be opened by that time.
// As the channel hasn't been `Connect()`-ed yet, this will only queue the
// messages up to be sent, so is OK to do with the mutex held. These
// messages will be processed to be sent during `Start()` below, which is
// performed outside of the lock.
if (auto pending = state->mPendingMessages.Lookup(aIntroduction.mName)) {
while (!pending->IsEmpty()) {
nodeChannel->SendEventMessage(pending->Pop());