Bug 1732343 - Part 3: Support directly attaching handles to IPC messages on windows, r=handyman

Handles which are directly attached to IPC messages will be transferred by the
parent process. This is handled either directly by the parent process (if it is
one of the participants), or by relaying the message via the parent process if
it is not. Ordering issues are avoided here thanks to support in the mojo ports
code for messages being delivered out-of-order.

The actual handle values are encoded in the message after the message payload,
and removed from the message before handing it off to existing code, so it
should be fully transparent.

In addition, a new flag is added to the message header to support marking a
message as a "relay" message, as well as support for deserializing these
messages with an extra NodeName (the real target/source) in the message footer.

Differential Revision: https://phabricator.services.mozilla.com/D126565
This commit is contained in:
Nika Layzell 2021-11-04 19:20:18 +00:00
Родитель b8279164db
Коммит 4bd83c851e
9 изменённых файлов: 405 добавлений и 43 удалений

Просмотреть файл

@ -29,6 +29,9 @@ bool ChildProcessHost::CreateChannel() {
channel_id_ = IPC::Channel::GenerateVerifiedChannelID();
channel_.reset(
new IPC::Channel(channel_id_, IPC::Channel::MODE_SERVER, &listener_));
#if defined(OS_WIN)
channel_->StartAcceptingHandles(IPC::Channel::MODE_SERVER);
#endif
if (!channel_->Connect()) return false;
opening_channel_ = true;

Просмотреть файл

@ -31,6 +31,9 @@ ChildThread* ChildThread::current() {
void ChildThread::Init() {
auto channel = mozilla::MakeUnique<IPC::Channel>(
channel_name_, IPC::Channel::MODE_CLIENT, nullptr);
#if defined(OS_WIN)
channel->StartAcceptingHandles(IPC::Channel::MODE_CLIENT);
#endif
initial_port_ =
mozilla::ipc::NodeController::InitChildProcess(std::move(channel));

Просмотреть файл

@ -159,6 +159,11 @@ class Channel {
#elif defined(OS_WIN)
// Return the server pipe handle.
void* GetServerPipeHandle() const;
// Tell this pipe to accept handles. Exactly one side of the IPC connection
// must be set as `MODE_SERVER`, and that side will be responsible for calling
// `DuplicateHandle` to transfer the handle between processes.
void StartAcceptingHandles(Mode mode);
#endif // defined(OS_POSIX)
// On Windows: Generates a channel ID that, if passed to the client

Просмотреть файл

@ -61,9 +61,7 @@ Channel::ChannelImpl::ChannelImpl(const ChannelId& channel_id, Mode mode,
Listener* listener)
: ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)),
shared_secret_(0),
waiting_for_shared_secret_(false) {
ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
Init(mode, listener);
if (!CreatePipe(channel_id, mode)) {
@ -79,9 +77,7 @@ Channel::ChannelImpl::ChannelImpl(const ChannelId& channel_id,
Listener* listener)
: ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)),
shared_secret_(0),
waiting_for_shared_secret_(false) {
ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
Init(mode, listener);
if (mode == MODE_SERVER) {
@ -111,6 +107,9 @@ void Channel::ChannelImpl::Init(Mode mode, Listener* listener) {
output_queue_length_ = 0;
input_buf_offset_ = 0;
input_buf_ = mozilla::MakeUnique<char[]>(Channel::kReadBufferSize);
accept_handles_ = false;
privileged_ = false;
other_process_ = INVALID_HANDLE_VALUE;
}
void Channel::ChannelImpl::OutputQueuePush(mozilla::UniquePtr<Message> msg) {
@ -141,6 +140,12 @@ void Channel::ChannelImpl::Close() {
pipe_ = INVALID_HANDLE_VALUE;
}
// If we have a connection to the other process, close the handle.
if (other_process_ != INVALID_HANDLE_VALUE) {
CloseHandle(other_process_);
other_process_ = INVALID_HANDLE_VALUE;
}
while (input_state_.is_pending || output_state_.is_pending) {
MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
}
@ -453,9 +458,24 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
return false;
}
waiting_for_shared_secret_ = false;
// Now that we know the remote pid, open a privileged handle to the
// child process if needed to transfer handles to/from it.
if (privileged_ && other_process_ == INVALID_HANDLE_VALUE) {
other_process_ = OpenProcess(PROCESS_DUP_HANDLE, false, other_pid_);
if (!other_process_) {
other_process_ = INVALID_HANDLE_VALUE;
CHROMIUM_LOG(ERROR) << "Failed to acquire privileged handle to "
<< other_pid_ << ", cannot accept handles";
}
}
listener_->OnChannelConnected(other_pid_);
} else {
mozilla::LogIPCMessage::Run run(&m);
if (!AcceptHandles(m)) {
return false;
}
listener_->OnMessageReceived(std::move(m));
}
@ -512,6 +532,9 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages(
if (partial_write_iter_.isNothing()) {
AddIPCProfilerMarker(*m, other_pid_, MessageDirection::eSending,
MessagePhase::TransferStart);
if (!TransferHandles(*m)) {
return false;
}
Pickle::BufferList::IterImpl iter(m->Buffers());
partial_write_iter_.emplace(iter);
}
@ -582,6 +605,171 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
}
}
void Channel::ChannelImpl::StartAcceptingHandles(Mode mode) {
if (accept_handles_) {
MOZ_ASSERT(privileged_ == (mode == MODE_SERVER));
return;
}
accept_handles_ = true;
privileged_ = mode == MODE_SERVER;
if (privileged_ && other_pid_ != -1 &&
other_process_ == INVALID_HANDLE_VALUE) {
other_process_ = OpenProcess(PROCESS_DUP_HANDLE, false, other_pid_);
if (!other_process_) {
other_process_ = INVALID_HANDLE_VALUE;
CHROMIUM_LOG(ERROR) << "Failed to acquire privileged handle to "
<< other_pid_ << ", cannot accept handles";
}
}
}
static uint32_t HandleToUint32(HANDLE h) {
// Cast through uintptr_t and then unsigned int to make the truncation to
// 32 bits explicit. Handles are size of-pointer but are always 32-bit values.
// https://docs.microsoft.com/en-ca/windows/win32/winprog64/interprocess-communication
// says: 64-bit versions of Windows use 32-bit handles for interoperability.
return static_cast<uint32_t>(reinterpret_cast<uintptr_t>(h));
}
static HANDLE Uint32ToHandle(uint32_t h) {
return reinterpret_cast<HANDLE>(
static_cast<uintptr_t>(static_cast<int32_t>(h)));
}
bool Channel::ChannelImpl::AcceptHandles(Message& msg) {
MOZ_ASSERT(msg.num_handles() == 0);
uint32_t num_handles = msg.header()->num_handles;
if (num_handles == 0) {
return true;
}
if (!accept_handles_) {
CHROMIUM_LOG(ERROR) << "invalid message: " << msg.name()
<< ". channel is not configured to accept handles";
return false;
}
// Seek to the start of our handle payload.
mozilla::CheckedInt<uint32_t> handles_payload_size(num_handles);
handles_payload_size *= sizeof(uint32_t);
if (!handles_payload_size.isValid() ||
handles_payload_size.value() > msg.header()->payload_size) {
CHROMIUM_LOG(ERROR) << "invalid handle count " << num_handles
<< " or payload size: " << msg.header()->payload_size;
return false;
}
uint32_t handles_offset =
msg.header()->payload_size - handles_payload_size.value();
PickleIterator handles_start{msg};
if (!msg.IgnoreBytes(&handles_start, handles_offset)) {
CHROMIUM_LOG(ERROR) << "IgnoreBytes failed";
return false;
}
// Read in the handles themselves, transferring ownership as required.
nsTArray<mozilla::UniqueFileHandle> handles;
{
PickleIterator iter{handles_start};
for (uint32_t i = 0; i < num_handles; ++i) {
uint32_t handleValue;
if (!msg.ReadUInt32(&iter, &handleValue)) {
CHROMIUM_LOG(ERROR) << "failed to read handle value";
return false;
}
HANDLE handle = Uint32ToHandle(handleValue);
// If we're the privileged process, the remote process will have leaked
// the sent handles in its local address space, and be relying on us to
// duplicate them, otherwise the remote privileged side will have
// transferred the handles to us already.
if (privileged_) {
if (other_process_ == INVALID_HANDLE_VALUE) {
CHROMIUM_LOG(ERROR) << "other_process_ is invalid in AcceptHandles";
return false;
}
if (!::DuplicateHandle(
other_process_, handle, GetCurrentProcess(), &handle, 0, FALSE,
DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE)) {
CHROMIUM_LOG(ERROR) << "DuplicateHandle failed for handle " << handle
<< " in AcceptHandles";
return false;
}
}
// The handle is directly owned by this process now, and can be added to
// our `handles` array.
handles.AppendElement(mozilla::UniqueFileHandle(handle));
}
}
// We're done with the handle footer, truncate the message at that point.
msg.Truncate(&handles_start);
msg.SetAttachedFileHandles(std::move(handles));
msg.header()->num_handles = 0;
MOZ_ASSERT(msg.header()->payload_size == handles_offset);
MOZ_ASSERT(msg.num_handles() == num_handles);
return true;
}
bool Channel::ChannelImpl::TransferHandles(Message& msg) {
MOZ_ASSERT(msg.header()->num_handles == 0);
uint32_t num_handles = msg.num_handles();
if (num_handles == 0) {
return true;
}
if (!accept_handles_) {
CHROMIUM_LOG(ERROR) << "cannot send message: " << msg.name()
<< ". channel is not configured to accept handles";
return false;
}
#ifdef DEBUG
uint32_t handles_offset = msg.header()->payload_size;
#endif
// Write handles from `attached_handles_` into the message payload.
for (uint32_t i = 0; i < num_handles; ++i) {
// Release ownership of the handle. It'll be cloned when the parent process
// transfers it with DuplicateHandle either in this process or the remote
// process.
HANDLE handle = msg.attached_handles_[i].release();
// If we're the privileged process, transfer the HANDLE to our remote before
// sending the message. Otherwise, the remote privileged process will
// transfer the handle for us, so leak it.
if (privileged_) {
if (other_process_ == INVALID_HANDLE_VALUE) {
CHROMIUM_LOG(ERROR) << "other_process_ is invalid in TransferHandles";
return false;
}
if (!::DuplicateHandle(GetCurrentProcess(), handle, other_process_,
&handle, 0, FALSE,
DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE)) {
CHROMIUM_LOG(ERROR) << "DuplicateHandle failed for handle " << handle
<< " in TransferHandles";
return false;
}
}
if (!msg.WriteUInt32(HandleToUint32(handle))) {
CHROMIUM_LOG(ERROR) << "failed to write handle value " << handle;
return false;
}
}
msg.attached_handles_.Clear();
msg.header()->num_handles = num_handles;
MOZ_ASSERT(msg.header()->payload_size ==
handles_offset + (sizeof(uint32_t) * num_handles),
"Unexpected number of bytes written for handles footer?");
return true;
}
bool Channel::ChannelImpl::Unsound_IsClosed() const { return closed_; }
uint32_t Channel::ChannelImpl::Unsound_NumQueuedMessages() const {
@ -614,6 +802,10 @@ void* Channel::GetServerPipeHandle() const {
return channel_impl_->GetServerPipeHandle();
}
void Channel::StartAcceptingHandles(Mode mode) {
channel_impl_->StartAcceptingHandles(mode);
}
Channel::Listener* Channel::set_listener(Listener* listener) {
return channel_impl_->set_listener(listener);
}

Просмотреть файл

@ -32,13 +32,15 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
ChannelImpl(const ChannelId& channel_id, HANDLE server_pipe, Mode mode,
Listener* listener);
~ChannelImpl() {
if (pipe_ != INVALID_HANDLE_VALUE) {
if (pipe_ != INVALID_HANDLE_VALUE ||
other_process_ != INVALID_HANDLE_VALUE) {
Close();
}
}
bool Connect();
void Close();
HANDLE GetServerPipeHandle() const;
void StartAcceptingHandles(Mode mode);
Listener* set_listener(Listener* listener) {
Listener* old = listener_;
listener_ = listener;
@ -69,6 +71,11 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_written);
// Called on a Message immediately before it is sent/recieved to transfer
// handles to the remote process, or accept handles from the remote process.
bool AcceptHandles(Message& msg);
bool TransferHandles(Message& msg);
// MessageLoop::IOHandler implementation.
virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
DWORD bytes_transfered, DWORD error);
@ -78,15 +85,15 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
explicit State(ChannelImpl* channel);
~State();
MessageLoopForIO::IOContext context;
bool is_pending;
bool is_pending = false;
};
State input_state_;
State output_state_;
HANDLE pipe_;
HANDLE pipe_ = INVALID_HANDLE_VALUE;
Listener* listener_;
Listener* listener_ = nullptr;
// Messages to be sent are queued here.
mozilla::Queue<mozilla::UniquePtr<Message>, 64> output_queue_;
@ -98,7 +105,7 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
// We read from the pipe into this buffer
mozilla::UniquePtr<char[]> input_buf_;
size_t input_buf_offset_;
size_t input_buf_offset_ = 0;
// Large incoming messages that span multiple pipe buffers get built-up in the
// buffers of this message.
@ -107,15 +114,15 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
// In server-mode, we have to wait for the client to connect before we
// can begin reading. We make use of the input_state_ when performing
// the connect operation in overlapped mode.
bool waiting_connect_;
bool waiting_connect_ = false;
// This flag is set when processing incoming messages. It is used to
// avoid recursing through ProcessIncomingMessages, which could cause
// problems. TODO(darin): make this unnecessary
bool processing_incoming_;
bool processing_incoming_ = false;
// This flag is set after Close() is run on the channel.
std::atomic<bool> closed_;
std::atomic<bool> closed_ = false;
// We keep track of the PID of the other side of this channel so that we can
// record this when generating logs of IPC messages.
@ -125,7 +132,7 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
// read output_queue_length_ from any thread (if we're OK getting an
// occasional out-of-date or bogus value). We use output_queue_length_ to
// implement Unsound_NumQueuedMessages.
std::atomic<size_t> output_queue_length_;
std::atomic<size_t> output_queue_length_ = 0;
ScopedRunnableMethodFactory<ChannelImpl> factory_;
@ -133,11 +140,21 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
// a connection. If the value is non-zero, the client passes it in the hello
// and the host validates. (We don't send the zero value to preserve IPC
// compatibility with existing clients that don't validate the channel.)
int32_t shared_secret_;
int32_t shared_secret_ = 0;
// In server-mode, we wait for the channel at the other side of the pipe to
// send us back our shared secret, if we are using one.
bool waiting_for_shared_secret_;
bool waiting_for_shared_secret_ = false;
// Whether or not to accept handles from a remote process, and whether this
// process is the privileged side of a IPC::Channel which can transfer
// handles.
bool accept_handles_ = false;
bool privileged_ = false;
// A privileged process handle used to transfer HANDLEs to and from the remote
// process. This will only be used if `privileged_` is set.
HANDLE other_process_ = INVALID_HANDLE_VALUE;
#ifdef DEBUG
mozilla::UniquePtr<nsAutoOwningThread> _mOwningThread;

Просмотреть файл

@ -106,6 +106,7 @@ class Message : public mojo::core::ports::UserMessage, public Pickle {
COMPRESS_BIT = 0x0200,
COMPRESSALL_BIT = 0x0400,
CONSTRUCTOR_BIT = 0x0800,
RELAY_BIT = 0x1000,
};
public:
@ -146,12 +147,20 @@ class Message : public mojo::core::ports::UserMessage, public Pickle {
bool IsReply() const { return (mFlags & REPLY_BIT) != 0; }
bool IsReplyError() const { return (mFlags & REPLY_ERROR_BIT) != 0; }
bool IsRelay() const { return (mFlags & RELAY_BIT) != 0; }
private:
void SetSync() { mFlags |= SYNC_BIT; }
void SetInterrupt() { mFlags |= INTERRUPT_BIT; }
void SetReply() { mFlags |= REPLY_BIT; }
void SetReplyError() { mFlags |= REPLY_ERROR_BIT; }
void SetRelay(bool relay) {
if (relay) {
mFlags |= RELAY_BIT;
} else {
mFlags &= ~RELAY_BIT;
}
}
uint32_t mFlags;
};
@ -247,6 +256,9 @@ class Message : public mojo::core::ports::UserMessage, public Pickle {
uint32_t num_handles() const;
bool is_relay() const { return header()->flags.IsRelay(); }
void set_relay(bool new_relay) { header()->flags.SetRelay(new_relay); }
template <class T>
static bool Dispatch(const Message* msg, T* obj, void (T::*func)()) {
(obj->*func)();

Просмотреть файл

@ -393,9 +393,6 @@ struct ParamTraitsIPC : ParamTraitsWindows<P> {};
//
// A UniqueFileHandle may only be read once. After it has been read once, it
// will be consumed, and future reads will return an invalid handle.
//
// XXX: This is currently only implemented on POSIX, and will not work on
// windows yet!
template <>
struct ParamTraitsIPC<mozilla::UniqueFileHandle> {
typedef mozilla::UniqueFileHandle param_type;

Просмотреть файл

@ -163,6 +163,7 @@ bool NodeController::SendUserMessage(const PortRef& aPort,
}
auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent,
const NodeName* aRelayTarget,
uint32_t aType)
-> UniquePtr<IPC::Message> {
UniquePtr<IPC::Message> message;
@ -176,18 +177,30 @@ auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent,
message = MakeUnique<IPC::Message>(MSG_ROUTING_CONTROL, aType);
}
message->set_relay(aRelayTarget != nullptr);
size_t length = aEvent->GetSerializedSize();
if (aRelayTarget) {
length += sizeof(NodeName);
}
// Use an intermediate buffer to serialize to avoid potential issues with the
// segmented `IPC::Message` bufferlist. This should be fairly cheap, as the
// majority of events are fairly small.
Vector<char, 256, InfallibleAllocPolicy> buffer;
(void)buffer.initLengthUninitialized(aEvent->GetSerializedSize());
aEvent->Serialize(buffer.begin());
(void)buffer.initLengthUninitialized(length);
if (aRelayTarget) {
memcpy(buffer.begin(), aRelayTarget, sizeof(NodeName));
aEvent->Serialize(buffer.begin() + sizeof(NodeName));
} else {
aEvent->Serialize(buffer.begin());
}
message->WriteFooter(buffer.begin(), buffer.length());
#ifdef DEBUG
// Debug-assert that we can read the same data back out of the buffer.
MOZ_ASSERT(message->FooterSize() == aEvent->GetSerializedSize());
MOZ_ASSERT(message->FooterSize() == length);
Vector<char, 256, InfallibleAllocPolicy> buffer2;
(void)buffer2.initLengthUninitialized(message->FooterSize());
MOZ_ASSERT(message->ReadFooter(buffer2.begin(), buffer2.length(),
@ -198,8 +211,14 @@ auto NodeController::SerializeEventMessage(UniquePtr<Event> aEvent,
return message;
}
auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage)
auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage,
NodeName* aRelayTarget)
-> UniquePtr<Event> {
if (aMessage->is_relay() && !aRelayTarget) {
NODECONTROLLER_WARNING("Unexpected relay message '%s'", aMessage->name());
return nullptr;
}
Vector<char, 256, InfallibleAllocPolicy> buffer;
(void)buffer.initLengthUninitialized(aMessage->FooterSize());
// Truncate the message when reading the footer, so that the extra footer data
@ -212,7 +231,22 @@ auto NodeController::DeserializeEventMessage(UniquePtr<IPC::Message> aMessage)
return nullptr;
}
UniquePtr<Event> event = Event::Deserialize(buffer.begin(), buffer.length());
UniquePtr<Event> event;
if (aRelayTarget) {
MOZ_ASSERT(aMessage->is_relay());
if (buffer.length() < sizeof(NodeName)) {
NODECONTROLLER_WARNING(
"Insufficient space in message footer for message '%s'",
aMessage->name());
return nullptr;
}
memcpy(aRelayTarget, buffer.begin(), sizeof(NodeName));
event = Event::Deserialize(buffer.begin() + sizeof(NodeName),
buffer.length() - sizeof(NodeName));
} else {
event = Event::Deserialize(buffer.begin(), buffer.length());
}
if (!event) {
NODECONTROLLER_WARNING("Call to Event::Deserialize for message '%s' Failed",
aMessage->name());
@ -272,7 +306,22 @@ void NodeController::ForwardEvent(const NodeName& aNode,
if (aNode == mName) {
(void)mNode->AcceptEvent(std::move(aEvent));
} else {
UniquePtr<IPC::Message> message = SerializeEventMessage(std::move(aEvent));
// On Windows, messages holding HANDLEs must be relayed via the broker
// process so it can transfer handle ownership.
bool needsRelay = false;
#ifdef XP_WIN
if (!IsBroker() && aNode != kBrokerNodeName &&
aEvent->type() == Event::kUserMessage) {
auto* userEvent = static_cast<UserMessageEvent*>(aEvent.get());
needsRelay = userEvent->HasMessage() &&
userEvent->GetMessage<IPC::Message>()->num_handles() > 0;
}
#endif
UniquePtr<IPC::Message> message =
SerializeEventMessage(std::move(aEvent), needsRelay ? &aNode : nullptr);
MOZ_ASSERT(message->is_relay() == needsRelay,
"Message relay status set incorrectly");
RefPtr<NodeChannel> peer;
RefPtr<NodeChannel> broker;
@ -283,7 +332,7 @@ void NodeController::ForwardEvent(const NodeName& aNode,
// Check if we know this peer. If we don't, we'll need to request an
// introduction.
peer = state->mPeers.Get(aNode);
if (!peer) {
if (!peer || needsRelay) {
if (IsBroker()) {
NODECONTROLLER_WARNING("Ignoring message '%s' to unknown peer %s",
message->name(), ToString(aNode).c_str());
@ -298,18 +347,32 @@ void NodeController::ForwardEvent(const NodeName& aNode,
return;
}
auto& queue = state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
needsIntroduction = true;
return Queue<UniquePtr<IPC::Message>, 64>{};
});
queue.Push(std::move(message));
if (!needsRelay) {
auto& queue =
state->mPendingMessages.LookupOrInsertWith(aNode, [&]() {
needsIntroduction = true;
return Queue<UniquePtr<IPC::Message>, 64>{};
});
queue.Push(std::move(message));
}
}
}
// TODO: On windows, we can relay the message through the broker process
// here to handle transferring handles without using sync IPC.
MOZ_ASSERT(!needsIntroduction || !needsRelay,
"Only one of the two should ever be set");
if (needsIntroduction) {
if (needsRelay) {
#ifdef XP_WIN
NODECONTROLLER_LOG(LogLevel::Info,
"Relaying message '%s' for peer %s due to %lu handles",
message->name(), ToString(aNode).c_str(),
message->num_handles());
MOZ_ASSERT(broker);
broker->SendEventMessage(std::move(message));
#else
MOZ_ASSERT_UNREACHABLE("relaying messages is only supported on windows");
#endif
} else if (needsIntroduction) {
MOZ_ASSERT(broker);
broker->RequestIntroduction(aNode);
} else if (peer) {
@ -320,7 +383,7 @@ void NodeController::ForwardEvent(const NodeName& aNode,
void NodeController::BroadcastEvent(UniquePtr<Event> aEvent) {
UniquePtr<IPC::Message> message =
SerializeEventMessage(std::move(aEvent), BROADCAST_MESSAGE_TYPE);
SerializeEventMessage(std::move(aEvent), nullptr, BROADCAST_MESSAGE_TYPE);
if (IsBroker()) {
OnBroadcast(mName, std::move(message));
@ -352,7 +415,11 @@ void NodeController::OnEventMessage(const NodeName& aFromNode,
UniquePtr<IPC::Message> aMessage) {
AssertIOThread();
UniquePtr<Event> event = DeserializeEventMessage(std::move(aMessage));
bool isRelay = aMessage->is_relay();
NodeName relayTarget;
UniquePtr<Event> event = DeserializeEventMessage(
std::move(aMessage), isRelay ? &relayTarget : nullptr);
if (!event) {
NODECONTROLLER_WARNING("Invalid EventMessage from peer %s!",
ToString(aFromNode).c_str());
@ -360,6 +427,70 @@ void NodeController::OnEventMessage(const NodeName& aFromNode,
return;
}
NodeName fromNode = aFromNode;
#ifdef XP_WIN
if (isRelay) {
if (event->type() != Event::kUserMessage) {
NODECONTROLLER_WARNING(
"Unexpected relay of non-UserMessage event from peer %s!",
ToString(aFromNode).c_str());
DropPeer(aFromNode);
return;
}
// If we're the broker, then we'll need to forward this message on to the
// true recipient. To do this, we re-serialize the message, passing along
// the original source node, and send it to the final node.
if (IsBroker()) {
UniquePtr<IPC::Message> message =
SerializeEventMessage(std::move(event), &aFromNode);
if (!message) {
NODECONTROLLER_WARNING(
"Relaying EventMessage from peer %s failed to re-serialize!",
ToString(aFromNode).c_str());
DropPeer(aFromNode);
return;
}
MOZ_ASSERT(message->is_relay(), "Message stopped being a relay message?");
NODECONTROLLER_LOG(
LogLevel::Info,
"Relaying message '%s' from peer %s to peer %s (%lu handles)",
message->name(), ToString(aFromNode).c_str(),
ToString(relayTarget).c_str(), message->num_handles());
RefPtr<NodeChannel> peer;
{
auto state = mState.Lock();
peer = state->mPeers.Get(relayTarget);
}
if (!peer) {
NODECONTROLLER_WARNING(
"Dropping relayed message from %s to unknown peer %s",
ToString(aFromNode).c_str(), ToString(relayTarget).c_str());
return;
}
peer->SendEventMessage(std::move(message));
return;
}
// Otherwise, we're the final recipient, so we can continue & process the
// message as usual.
if (aFromNode != kBrokerNodeName) {
NODECONTROLLER_WARNING(
"Unexpected relayed EventMessage from non-broker peer %s!",
ToString(aFromNode).c_str());
DropPeer(aFromNode);
return;
}
fromNode = relayTarget;
NODECONTROLLER_LOG(LogLevel::Info, "Got relayed message from peer %s",
ToString(fromNode).c_str());
}
#endif
// If we're getting a requested port merge from another process, check to make
// sure that we're expecting the request, and record that the merge has
// arrived so we don't try to close the port on error.
@ -369,8 +500,8 @@ void NodeController::OnEventMessage(const NodeName& aFromNode,
if (!targetPort.is_valid()) {
NODECONTROLLER_WARNING(
"Unexpected MergePortEvent from peer %s for unknown port %s",
ToString(aFromNode).c_str(), ToString(event->port_name()).c_str());
DropPeer(aFromNode);
ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
DropPeer(fromNode);
return;
}
@ -393,8 +524,8 @@ void NodeController::OnEventMessage(const NodeName& aFromNode,
if (!expectingMerge) {
NODECONTROLLER_WARNING(
"Unexpected MergePortEvent from peer %s for port %s",
ToString(aFromNode).c_str(), ToString(event->port_name()).c_str());
DropPeer(aFromNode);
ToString(fromNode).c_str(), ToString(event->port_name()).c_str());
DropPeer(fromNode);
return;
}
}

Просмотреть файл

@ -105,8 +105,10 @@ class NodeController final : public mojo::core::ports::NodeDelegate,
~NodeController();
UniquePtr<IPC::Message> SerializeEventMessage(
UniquePtr<Event> aEvent, uint32_t aType = EVENT_MESSAGE_TYPE);
UniquePtr<Event> DeserializeEventMessage(UniquePtr<IPC::Message> aMessage);
UniquePtr<Event> aEvent, const NodeName* aRelayTarget = nullptr,
uint32_t aType = EVENT_MESSAGE_TYPE);
UniquePtr<Event> DeserializeEventMessage(UniquePtr<IPC::Message> aMessage,
NodeName* aRelayTarget = nullptr);
// Get the `NodeChannel` for the named node.
already_AddRefed<NodeChannel> GetNodeChannel(const NodeName& aName);