Bug 1768476 - Part 1: Consistently pass IPC::Message around by UniquePtr, r=ipc-reviewers,mccr8

This makes passing around the type more consistent, and hopefully will make
changes to IPC::Message easier to work with in the future.

In addition, this should save us a few copies as we move the message type into
and out of UniquePtr, however I expect this won't make much of a difference.

Differential Revision: https://phabricator.services.mozilla.com/D145885
This commit is contained in:
Nika Layzell 2022-05-10 21:37:25 +00:00
Родитель 355dc69516
Коммит ed88dc4370
30 изменённых файлов: 265 добавлений и 308 удалений

Просмотреть файл

@ -3766,8 +3766,8 @@ PContentChild::Result ContentChild::OnMessageReceived(const Message& aMsg) {
}
#endif
PContentChild::Result ContentChild::OnMessageReceived(const Message& aMsg,
Message*& aReply) {
PContentChild::Result ContentChild::OnMessageReceived(
const Message& aMsg, UniquePtr<Message>& aReply) {
Result result = PContentChild::OnMessageReceived(aMsg, aReply);
if (aMsg.is_sync()) {

Просмотреть файл

@ -840,8 +840,8 @@ class ContentChild final : public PContentChild,
using PContentChild::OnMessageReceived;
#endif
virtual PContentChild::Result OnMessageReceived(const Message& aMsg,
Message*& aReply) override;
virtual PContentChild::Result OnMessageReceived(
const Message& aMsg, UniquePtr<Message>& aReply) override;
nsTArray<mozilla::UniquePtr<AlertObserver>> mAlertObservers;
RefPtr<ConsoleListener> mConsoleListener;

Просмотреть файл

@ -43,7 +43,8 @@ bool ChildProcessHost::CreateChannel() {
ChildProcessHost::ListenerHook::ListenerHook(ChildProcessHost* host)
: host_(host) {}
void ChildProcessHost::ListenerHook::OnMessageReceived(IPC::Message&& msg) {
void ChildProcessHost::ListenerHook::OnMessageReceived(
mozilla::UniquePtr<IPC::Message> msg) {
host_->OnMessageReceived(std::move(msg));
}
@ -59,6 +60,6 @@ void ChildProcessHost::ListenerHook::OnChannelError() {
}
void ChildProcessHost::ListenerHook::GetQueuedMessages(
std::queue<IPC::Message>& queue) {
std::queue<mozilla::UniquePtr<IPC::Message>>& queue) {
host_->GetQueuedMessages(queue);
}

Просмотреть файл

@ -39,7 +39,8 @@ class ChildProcessHost : public IPC::Channel::Listener {
bool CreateChannel();
// IPC::Channel::Listener implementation:
virtual void OnMessageReceived(IPC::Message&& msg) override {}
virtual void OnMessageReceived(
mozilla::UniquePtr<IPC::Message> msg) override {}
virtual void OnChannelConnected(base::ProcessId peer_pid) override {}
virtual void OnChannelError() override {}
@ -56,10 +57,12 @@ class ChildProcessHost : public IPC::Channel::Listener {
class ListenerHook : public IPC::Channel::Listener {
public:
explicit ListenerHook(ChildProcessHost* host);
virtual void OnMessageReceived(IPC::Message&& msg) override;
virtual void OnMessageReceived(
mozilla::UniquePtr<IPC::Message> msg) override;
virtual void OnChannelConnected(base::ProcessId peer_pid) override;
virtual void OnChannelError() override;
virtual void GetQueuedMessages(std::queue<IPC::Message>& queue) override;
virtual void GetQueuedMessages(
std::queue<mozilla::UniquePtr<IPC::Message>>& queue) override;
private:
ChildProcessHost* host_;

Просмотреть файл

@ -57,7 +57,7 @@ class Channel {
virtual ~Listener() = default;
// Called when a message is received.
virtual void OnMessageReceived(Message&& message) = 0;
virtual void OnMessageReceived(mozilla::UniquePtr<Message> message) = 0;
// Called when the channel is connected and we have received the internal
// Hello message from the peer.
@ -69,7 +69,8 @@ class Channel {
// If the listener has queued messages, swap them for |queue| like so
// swap(impl->my_queued_messages, queue);
virtual void GetQueuedMessages(std::queue<Message>& queue) {}
virtual void GetQueuedMessages(
std::queue<mozilla::UniquePtr<Message>>& queue) {}
};
enum Mode { MODE_SERVER, MODE_CLIENT };

Просмотреть файл

@ -399,15 +399,15 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
// Try to figure out how big the message is. Size is 0 if we haven't read
// enough of the header to know the size.
uint32_t message_length = 0;
if (incoming_message_.isSome()) {
message_length = incoming_message_.ref().size();
if (incoming_message_) {
message_length = incoming_message_->size();
} else {
message_length = Message::MessageSize(p, end);
}
if (!message_length) {
// We haven't seen the full message header.
MOZ_ASSERT(incoming_message_.isNothing());
MOZ_ASSERT(!incoming_message_);
// Move everything we have to the start of the buffer. We'll finish
// reading this message when we get more data. For now we leave it in
@ -421,10 +421,10 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
input_buf_offset_ = 0;
bool partial;
if (incoming_message_.isSome()) {
if (incoming_message_) {
// We already have some data for this message stored in
// incoming_message_. We want to append the new data there.
Message& m = incoming_message_.ref();
Message& m = *incoming_message_;
// How much data from this message remains to be added to
// incoming_message_?
@ -443,7 +443,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
// How much data from this message is stored in input_buf_?
uint32_t in_buf = std::min(message_length, uint32_t(end - p));
incoming_message_.emplace(p, in_buf);
incoming_message_ = mozilla::MakeUnique<Message>(p, in_buf);
p += in_buf;
// Are we done reading this message?
@ -454,7 +454,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
break;
}
Message& m = incoming_message_.ref();
Message& m = *incoming_message_;
if (m.header()->num_handles) {
// the message has file descriptors
@ -531,10 +531,10 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
return false;
}
#endif
listener_->OnMessageReceived(std::move(m));
listener_->OnMessageReceived(std::move(incoming_message_));
}
incoming_message_.reset();
incoming_message_ = nullptr;
}
input_overflow_fds_ = std::vector<int>(&fds[fds_i], &fds[num_fds]);
@ -542,14 +542,12 @@ bool Channel::ChannelImpl::ProcessIncomingMessages() {
// When the input data buffer is empty, the overflow fds should be too. If
// this is not the case, we probably have a rogue renderer which is trying
// to fill our descriptor table.
if (incoming_message_.isNothing() && input_buf_offset_ == 0 &&
if (!incoming_message_ && input_buf_offset_ == 0 &&
!input_overflow_fds_.empty()) {
// We close these descriptors in Close()
return false;
}
}
return true;
}
bool Channel::ChannelImpl::ProcessOutgoingMessages() {

Просмотреть файл

@ -135,7 +135,7 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
// Large incoming messages that span multiple pipe buffers get built-up in the
// buffers of this message.
mozilla::Maybe<Message> incoming_message_;
mozilla::UniquePtr<Message> incoming_message_;
std::vector<int> input_overflow_fds_;
// In server-mode, we have to wait for the client to connect before we

Просмотреть файл

@ -368,15 +368,15 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
// Try to figure out how big the message is. Size is 0 if we haven't read
// enough of the header to know the size.
uint32_t message_length = 0;
if (incoming_message_.isSome()) {
message_length = incoming_message_.ref().size();
if (incoming_message_) {
message_length = incoming_message_->size();
} else {
message_length = Message::MessageSize(p, end);
}
if (!message_length) {
// We haven't seen the full message header.
MOZ_ASSERT(incoming_message_.isNothing());
MOZ_ASSERT(!incoming_message_);
// Move everything we have to the start of the buffer. We'll finish
// reading this message when we get more data. For now we leave it in
@ -390,10 +390,10 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
input_buf_offset_ = 0;
bool partial;
if (incoming_message_.isSome()) {
if (incoming_message_) {
// We already have some data for this message stored in
// incoming_message_. We want to append the new data there.
Message& m = incoming_message_.ref();
Message& m = *incoming_message_;
// How much data from this message remains to be added to
// incoming_message_?
@ -412,7 +412,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
// How much data from this message is stored in input_buf_?
uint32_t in_buf = std::min(message_length, uint32_t(end - p));
incoming_message_.emplace(p, in_buf);
incoming_message_ = mozilla::MakeUnique<Message>(p, in_buf);
p += in_buf;
// Are we done reading this message?
@ -423,7 +423,7 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
break;
}
Message& m = incoming_message_.ref();
Message& m = *incoming_message_;
// Note: We set other_pid_ below when we receive a Hello message (which
// has no routing ID), but we only emit a profiler marker for messages
@ -467,10 +467,10 @@ bool Channel::ChannelImpl::ProcessIncomingMessages(
if (!AcceptHandles(m)) {
return false;
}
listener_->OnMessageReceived(std::move(m));
listener_->OnMessageReceived(std::move(incoming_message_));
}
incoming_message_.reset();
incoming_message_ = nullptr;
}
bytes_read = 0; // Get more data.

Просмотреть файл

@ -106,7 +106,7 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
// Large incoming messages that span multiple pipe buffers get built-up in the
// buffers of this message.
mozilla::Maybe<Message> incoming_message_;
mozilla::UniquePtr<Message> incoming_message_;
// In server-mode, we have to wait for the client to connect before we
// can begin reading. We make use of the input_state_ when performing

Просмотреть файл

@ -22,13 +22,6 @@ const mojo::core::ports::UserMessage::TypeInfo Message::kUserMessageTypeInfo{};
Message::~Message() { MOZ_COUNT_DTOR(IPC::Message); }
Message::Message()
: UserMessage(&kUserMessageTypeInfo), Pickle(sizeof(Header)) {
MOZ_COUNT_CTOR(IPC::Message);
header()->routing = header()->type = 0;
header()->num_handles = 0;
}
Message::Message(int32_t routing_id, msgid_t type, uint32_t segment_capacity,
HeaderFlags flags)
: UserMessage(&kUserMessageTypeInfo),
@ -53,30 +46,16 @@ Message::Message(const char* data, int data_len)
MOZ_COUNT_CTOR(IPC::Message);
}
Message::Message(Message&& other)
: UserMessage(&kUserMessageTypeInfo),
Pickle(std::move(other)),
attached_handles_(std::move(other.attached_handles_)),
attached_ports_(std::move(other.attached_ports_))
#if defined(OS_MACOSX)
,
attached_send_rights_(std::move(other.attached_send_rights_))
#endif
#ifdef FUZZING_SNAPSHOT
,
isFuzzMsg(other.isFuzzMsg)
#endif
{
MOZ_COUNT_CTOR(IPC::Message);
/*static*/ mozilla::UniquePtr<Message> Message::IPDLMessage(
int32_t routing_id, msgid_t type, uint32_t segment_capacity,
HeaderFlags flags) {
return mozilla::MakeUnique<Message>(routing_id, type, segment_capacity,
flags);
}
/*static*/ Message* Message::IPDLMessage(int32_t routing_id, msgid_t type,
HeaderFlags flags) {
return new Message(routing_id, type, 0, flags);
}
/*static*/ Message* Message::ForSyncDispatchError(NestedLevel level) {
auto* m = new Message(0, 0, 0, HeaderFlags(level));
/*static*/ mozilla::UniquePtr<Message> Message::ForSyncDispatchError(
NestedLevel level) {
auto m = mozilla::MakeUnique<Message>(0, 0, 0, HeaderFlags(level));
auto& flags = m->header()->flags;
flags.SetSync();
flags.SetReply();
@ -84,20 +63,6 @@ Message::Message(Message&& other)
return m;
}
Message& Message::operator=(Message&& other) {
*static_cast<Pickle*>(this) = std::move(other);
attached_handles_ = std::move(other.attached_handles_);
attached_ports_ = std::move(other.attached_ports_);
#if defined(OS_MACOSX)
attached_send_rights_ = std::move(other.attached_send_rights_);
#endif
#ifdef FUZZING_SNAPSHOT
isFuzzMsg = std::move(other.isFuzzMsg);
#endif
return *this;
}
void Message::WriteFooter(const void* data, uint32_t data_len) {
if (data_len == 0) {
return;

Просмотреть файл

@ -172,8 +172,6 @@ class Message : public mojo::core::ports::UserMessage, public Pickle {
virtual ~Message();
Message();
// Initialize a message with a user-defined type, priority value, and
// destination WebView ID.
Message(int32_t routing_id, msgid_t type,
@ -182,20 +180,22 @@ class Message : public mojo::core::ports::UserMessage, public Pickle {
Message(const char* data, int data_len);
Message(const Message& other) = delete;
Message(Message&& other);
Message& operator=(const Message& other) = delete;
Message& operator=(Message&& other);
Message(const Message&) = delete;
Message(Message&&) = delete;
Message& operator=(const Message&) = delete;
Message& operator=(Message&&) = delete;
// Helper method for the common case (default segmentCapacity, recording
// the write latency of messages) of IPDL message creation. This helps
// move the malloc and some of the parameter setting out of autogenerated
// code.
static Message* IPDLMessage(int32_t routing_id, msgid_t type,
HeaderFlags flags);
static mozilla::UniquePtr<Message> IPDLMessage(int32_t routing_id,
msgid_t type,
uint32_t segmentCapacity,
HeaderFlags flags);
// One-off constructors for special error-handling messages.
static Message* ForSyncDispatchError(NestedLevel level);
static mozilla::UniquePtr<Message> ForSyncDispatchError(NestedLevel level);
NestedLevel nested_level() const { return header()->flags.Level(); }
@ -214,8 +214,6 @@ class Message : public mojo::core::ports::UserMessage, public Pickle {
bool is_reply_error() const { return header()->flags.IsReplyError(); }
bool is_valid() const { return !!header(); }
msgid_t type() const { return header()->type; }
int32_t routing_id() const { return header()->routing; }

Просмотреть файл

@ -50,16 +50,16 @@ void ForkServer::InitProcess(int* aArgc, char*** aArgv) {
bool ForkServer::HandleMessages() {
// |sClientFd| is created by an instance of |IPC::Channel|.
// It sends a HELLO automatically.
IPC::Message hello;
UniquePtr<IPC::Message> hello;
mTcver->RecvInfallible(
hello, "Expect to receive a HELLO message from the parent process!");
MOZ_ASSERT(hello.type() == kHELLO_MESSAGE_TYPE);
MOZ_ASSERT(hello->type() == kHELLO_MESSAGE_TYPE);
// Send it back
mTcver->SendInfallible(hello, "Fail to ack the received HELLO!");
mTcver->SendInfallible(*hello, "Fail to ack the received HELLO!");
while (true) {
IPC::Message msg;
UniquePtr<IPC::Message> msg;
if (!mTcver->Recv(msg)) {
break;
}
@ -196,12 +196,10 @@ inline void SanitizeBuffers(IPC::Message& aMsg, std::vector<std::string>& aArgv,
* It will return in both the fork server process and the new content
* process. |mAppProcBuilder| is null for the fork server.
*/
void ForkServer::OnMessageReceived(IPC::Message&& message) {
IPC::Message msg(std::move(message));
void ForkServer::OnMessageReceived(UniquePtr<IPC::Message> message) {
std::vector<std::string> argv;
base::LaunchOptions options;
if (!ParseForkNewSubprocess(msg, argv, &options)) {
if (!ParseForkNewSubprocess(*message, argv, &options)) {
return;
}
@ -233,7 +231,7 @@ void ForkServer::OnMessageReceived(IPC::Message&& message) {
// Without this, the content processes that is forked later are
// able to read the content of buffers even the buffers have been
// released.
SanitizeBuffers(msg, argv, options);
SanitizeBuffers(*message, argv, options);
}
/**

Просмотреть файл

@ -24,7 +24,7 @@ class ForkServer {
bool HandleMessages();
// Called when a message is received.
void OnMessageReceived(IPC::Message&& message);
void OnMessageReceived(UniquePtr<IPC::Message> message);
static bool RunForkServer(int* aArgc, char*** aArgv);

Просмотреть файл

@ -64,9 +64,9 @@ bool ForkServiceChild::SendForkNewSubprocess(
// IPC::Channel created by the GeckoChildProcessHost has
// already send a HELLO. It is expected to receive a hello
// message from the fork server too.
IPC::Message hello;
UniquePtr<IPC::Message> hello;
mTcver->RecvInfallible(hello, "Fail to receive HELLO message");
MOZ_ASSERT(hello.type() == ForkServer::kHELLO_MESSAGE_TYPE);
MOZ_ASSERT(hello->type() == ForkServer::kHELLO_MESSAGE_TYPE);
mWaitForHello = false;
}
@ -84,7 +84,7 @@ bool ForkServiceChild::SendForkNewSubprocess(
return false;
}
IPC::Message reply;
UniquePtr<IPC::Message> reply;
if (!mTcver->Recv(reply)) {
MOZ_LOG(gForkServiceLog, LogLevel::Verbose,
("the pipe to the fork server is closed or having errors"));
@ -98,13 +98,13 @@ bool ForkServiceChild::SendForkNewSubprocess(
return true;
}
void ForkServiceChild::OnMessageReceived(IPC::Message&& message) {
if (message.type() != Reply_ForkNewSubprocess__ID) {
void ForkServiceChild::OnMessageReceived(UniquePtr<IPC::Message> message) {
if (message->type() != Reply_ForkNewSubprocess__ID) {
MOZ_LOG(gForkServiceLog, LogLevel::Verbose,
("unknown reply type %d", message.type()));
("unknown reply type %d", message->type()));
return;
}
IPC::MessageReader reader(message);
IPC::MessageReader reader(*message);
if (!ReadIPDLParam(&reader, nullptr, &mRecvPid)) {
MOZ_CRASH("Error deserializing 'pid_t'");

Просмотреть файл

@ -66,7 +66,7 @@ class ForkServiceChild {
private:
// Called when a message is received.
void OnMessageReceived(IPC::Message&& message);
void OnMessageReceived(UniquePtr<IPC::Message> message);
void OnError();
UniquePtr<MiniTransceiver> mTcver;

Просмотреть файл

@ -1585,7 +1585,7 @@ void GeckoChildProcessHost::OnChannelConnected(base::ProcessId peer_pid) {
lock.Notify();
}
void GeckoChildProcessHost::OnMessageReceived(IPC::Message&& aMsg) {
void GeckoChildProcessHost::OnMessageReceived(UniquePtr<IPC::Message> aMsg) {
// We never process messages ourself, just save them up for the next
// listener.
mQueue.push(std::move(aMsg));
@ -1609,7 +1609,7 @@ RefPtr<ProcessHandlePromise> GeckoChildProcessHost::WhenProcessHandleReady() {
return mHandlePromise;
}
void GeckoChildProcessHost::GetQueuedMessages(std::queue<IPC::Message>& queue) {
void GeckoChildProcessHost::GetQueuedMessages(std::queue<UniquePtr<IPC::Message>>& queue) {
// If this is called off the IO thread, bad things will happen.
DCHECK(MessageLoopForIO::current());
swap(queue, mQueue);

Просмотреть файл

@ -110,9 +110,9 @@ class GeckoChildProcessHost : public ChildProcessHost,
int32_t timeoutMs = 0);
virtual void OnChannelConnected(base::ProcessId peer_pid) override;
virtual void OnMessageReceived(IPC::Message&& aMsg) override;
virtual void OnMessageReceived(UniquePtr<IPC::Message> aMsg) override;
virtual void OnChannelError() override;
virtual void GetQueuedMessages(std::queue<IPC::Message>& queue) override;
virtual void GetQueuedMessages(std::queue<UniquePtr<IPC::Message>>& queue) override;
// Resolves to the process handle when it's available (see
// LaunchAndWaitForProcessHandle); use with AsyncLaunch.
@ -275,7 +275,7 @@ class GeckoChildProcessHost : public ChildProcessHost,
// them here until we hand off the eventual listener.
//
// FIXME/cjones: this strongly indicates bad design. Shame on us.
std::queue<IPC::Message> mQueue;
std::queue<UniquePtr<IPC::Message>> mQueue;
// Linux-Only. Set this up before we're called from a different thread.
nsCString mTmpDirName;

Просмотреть файл

@ -249,22 +249,22 @@ class AutoEnterTransaction {
return mTransaction;
}
void ReceivedReply(IPC::Message&& aMessage) {
MOZ_RELEASE_ASSERT(aMessage.seqno() == mSeqno);
MOZ_RELEASE_ASSERT(aMessage.transaction_id() == mTransaction);
void ReceivedReply(UniquePtr<IPC::Message> aMessage) {
MOZ_RELEASE_ASSERT(aMessage->seqno() == mSeqno);
MOZ_RELEASE_ASSERT(aMessage->transaction_id() == mTransaction);
MOZ_RELEASE_ASSERT(!mReply);
IPC_LOG("Reply received on worker thread: seqno=%d", mSeqno);
mReply = MakeUnique<IPC::Message>(std::move(aMessage));
mReply = std::move(aMessage);
MOZ_RELEASE_ASSERT(IsComplete());
}
void HandleReply(IPC::Message&& aMessage) {
void HandleReply(UniquePtr<IPC::Message> aMessage) {
mChan->mMonitor->AssertCurrentThreadOwns();
AutoEnterTransaction* cur = mChan->mTransactionStack;
MOZ_RELEASE_ASSERT(cur == this);
while (cur) {
MOZ_RELEASE_ASSERT(cur->mActive);
if (aMessage.seqno() == cur->mSeqno) {
if (aMessage->seqno() == cur->mSeqno) {
cur->ReceivedReply(std::move(aMessage));
break;
}
@ -515,7 +515,7 @@ void MessageChannel::AssertMaybeDeferredCountCorrect() {
size_t count = 0;
for (MessageTask* task : mPending) {
if (!IsAlwaysDeferred(task->Msg())) {
if (!IsAlwaysDeferred(*task->Msg())) {
count++;
}
}
@ -994,22 +994,22 @@ bool MessageChannel::ShouldDeferMessage(const Message& aMsg) {
aMsg.transaction_id() != CurrentNestedInsideSyncTransaction();
}
void MessageChannel::OnMessageReceivedFromLink(Message&& aMsg) {
void MessageChannel::OnMessageReceivedFromLink(UniquePtr<Message> aMsg) {
mMonitor->AssertCurrentThreadOwns();
if (MaybeInterceptSpecialIOMessage(aMsg)) {
if (MaybeInterceptSpecialIOMessage(*aMsg)) {
return;
}
mListener->OnChannelReceivedMessage(aMsg);
mListener->OnChannelReceivedMessage(*aMsg);
// If we're awaiting a sync reply, we know that it needs to be immediately
// handled to unblock us.
if (aMsg.is_sync() && aMsg.is_reply()) {
IPC_LOG("Received reply seqno=%d xid=%d", aMsg.seqno(),
aMsg.transaction_id());
if (aMsg->is_sync() && aMsg->is_reply()) {
IPC_LOG("Received reply seqno=%d xid=%d", aMsg->seqno(),
aMsg->transaction_id());
if (aMsg.seqno() == mTimedOutMessageSeqno) {
if (aMsg->seqno() == mTimedOutMessageSeqno) {
// Drop the message, but allow future sync messages to be sent.
IPC_LOG("Received reply to timedout message; igoring; xid=%d",
mTimedOutMessageSeqno);
@ -1026,53 +1026,47 @@ void MessageChannel::OnMessageReceivedFromLink(Message&& aMsg) {
}
// Nested messages cannot be compressed.
MOZ_RELEASE_ASSERT(aMsg.compress_type() == IPC::Message::COMPRESSION_NONE ||
aMsg.nested_level() == IPC::Message::NOT_NESTED);
MOZ_RELEASE_ASSERT(aMsg->compress_type() == IPC::Message::COMPRESSION_NONE ||
aMsg->nested_level() == IPC::Message::NOT_NESTED);
bool reuseTask = false;
if (aMsg.compress_type() == IPC::Message::COMPRESSION_ENABLED) {
if (aMsg->compress_type() == IPC::Message::COMPRESSION_ENABLED) {
bool compress =
(!mPending.isEmpty() &&
mPending.getLast()->Msg().type() == aMsg.type() &&
mPending.getLast()->Msg().routing_id() == aMsg.routing_id());
mPending.getLast()->Msg()->type() == aMsg->type() &&
mPending.getLast()->Msg()->routing_id() == aMsg->routing_id());
if (compress) {
// This message type has compression enabled, and the back of the
// queue was the same message type and routed to the same destination.
// Replace it with the newer message.
MOZ_RELEASE_ASSERT(mPending.getLast()->Msg().compress_type() ==
MOZ_RELEASE_ASSERT(mPending.getLast()->Msg()->compress_type() ==
IPC::Message::COMPRESSION_ENABLED);
mPending.getLast()->Msg() = std::move(aMsg);
reuseTask = true;
return;
}
} else if (aMsg.compress_type() == IPC::Message::COMPRESSION_ALL &&
} else if (aMsg->compress_type() == IPC::Message::COMPRESSION_ALL &&
!mPending.isEmpty()) {
for (MessageTask* p = mPending.getLast(); p; p = p->getPrevious()) {
if (p->Msg().type() == aMsg.type() &&
p->Msg().routing_id() == aMsg.routing_id()) {
if (p->Msg()->type() == aMsg->type() &&
p->Msg()->routing_id() == aMsg->routing_id()) {
// This message type has compression enabled, and the queue
// holds a message with the same message type and routed to the
// same destination. Erase it. Note that, since we always
// compress these redundancies, There Can Be Only One.
MOZ_RELEASE_ASSERT(p->Msg().compress_type() ==
MOZ_RELEASE_ASSERT(p->Msg()->compress_type() ==
IPC::Message::COMPRESSION_ALL);
MOZ_RELEASE_ASSERT(IsAlwaysDeferred(p->Msg()));
MOZ_RELEASE_ASSERT(IsAlwaysDeferred(*p->Msg()));
p->remove();
break;
}
}
}
bool alwaysDeferred = IsAlwaysDeferred(aMsg);
bool alwaysDeferred = IsAlwaysDeferred(*aMsg);
bool shouldWakeUp = AwaitingSyncReply() && !ShouldDeferMessage(aMsg);
bool shouldWakeUp = AwaitingSyncReply() && !ShouldDeferMessage(*aMsg);
IPC_LOG("Receive from link; seqno=%d, xid=%d, shouldWakeUp=%d", aMsg.seqno(),
aMsg.transaction_id(), shouldWakeUp);
if (reuseTask) {
return;
}
IPC_LOG("Receive from link; seqno=%d, xid=%d, shouldWakeUp=%d", aMsg->seqno(),
aMsg->transaction_id(), shouldWakeUp);
// There are two cases we're concerned about, relating to the state of the
// worker thread:
@ -1117,7 +1111,7 @@ void MessageChannel::PeekMessages(
MonitorAutoLock lock(*mMonitor);
for (MessageTask* it : mPending) {
const Message& msg = it->Msg();
const Message& msg = *it->Msg();
if (!aInvoke(msg)) {
break;
}
@ -1147,23 +1141,23 @@ void MessageChannel::ProcessPendingRequests(
return;
}
mozilla::Vector<Message> toProcess;
Vector<UniquePtr<Message>> toProcess;
for (MessageTask* p = mPending.getFirst(); p;) {
Message& msg = p->Msg();
UniquePtr<Message>& msg = p->Msg();
MOZ_RELEASE_ASSERT(!aTransaction.IsCanceled(),
"Calling ShouldDeferMessage when cancelled");
bool defer = ShouldDeferMessage(msg);
bool defer = ShouldDeferMessage(*msg);
// Only log the interesting messages.
if (msg.is_sync() ||
msg.nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg.seqno(), defer);
if (msg->is_sync() ||
msg->nested_level() == IPC::Message::NESTED_INSIDE_CPOW) {
IPC_LOG("ShouldDeferMessage(seqno=%d) = %d", msg->seqno(), defer);
}
if (!defer) {
MOZ_ASSERT(!IsAlwaysDeferred(msg));
MOZ_ASSERT(!IsAlwaysDeferred(*msg));
if (!toProcess.append(std::move(msg))) MOZ_CRASH();
@ -1182,15 +1176,15 @@ void MessageChannel::ProcessPendingRequests(
// Processing these messages could result in more messages, so we
// loop around to check for more afterwards.
for (auto it = toProcess.begin(); it != toProcess.end(); it++) {
ProcessPendingRequest(aProxy, std::move(*it));
for (auto& msg : toProcess) {
ProcessPendingRequest(aProxy, std::move(msg));
}
}
AssertMaybeDeferredCountCorrect();
}
bool MessageChannel::Send(UniquePtr<Message> aMsg, Message* aReply) {
bool MessageChannel::Send(UniquePtr<Message> aMsg, UniquePtr<Message>* aReply) {
mozilla::TimeStamp start = TimeStamp::Now();
if (aMsg->size() >= kMinTelemetryMessageSize) {
Telemetry::Accumulate(Telemetry::IPC_MESSAGE_SIZE2, aMsg->size());
@ -1393,12 +1387,13 @@ bool MessageChannel::Send(UniquePtr<Message> aMsg, Message* aReply) {
AddProfilerMarker(*reply, MessageDirection::eReceiving);
*aReply = std::move(*reply);
if (aReply->size() >= kMinTelemetryMessageSize) {
if (reply->size() >= kMinTelemetryMessageSize) {
Telemetry::Accumulate(Telemetry::IPC_REPLY_SIZE,
nsDependentCString(msgName), aReply->size());
nsDependentCString(msgName), reply->size());
}
*aReply = std::move(reply);
// NOTE: Only collect IPC_SYNC_MAIN_LATENCY_MS on the main thread (bug
// 1343729)
if (NS_IsMainThread() && latencyMs >= kMinTelemetrySyncIPCLatencyMs) {
@ -1415,15 +1410,15 @@ bool MessageChannel::HasPendingEvents() {
}
bool MessageChannel::ProcessPendingRequest(ActorLifecycleProxy* aProxy,
Message&& aUrgent) {
UniquePtr<Message> aUrgent) {
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent.seqno(),
aUrgent.transaction_id());
IPC_LOG("Process pending: seqno=%d, xid=%d", aUrgent->seqno(),
aUrgent->transaction_id());
// keep the error relevant information
msgid_t msgType = aUrgent.type();
msgid_t msgType = aUrgent->type();
DispatchMessage(aProxy, std::move(aUrgent));
if (!Connected()) {
@ -1469,10 +1464,10 @@ void MessageChannel::RunMessage(ActorLifecycleProxy* aProxy,
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
Message& msg = aTask.Msg();
UniquePtr<Message>& msg = aTask.Msg();
if (!Connected()) {
ReportConnectionError("RunMessage", msg.type());
ReportConnectionError("RunMessage", msg->type());
return;
}
@ -1484,21 +1479,21 @@ void MessageChannel::RunMessage(ActorLifecycleProxy* aProxy,
break;
}
MOZ_ASSERT(!ShouldRunMessage(task->Msg()) ||
aTask.Msg().priority() != task->Msg().priority());
MOZ_ASSERT(!ShouldRunMessage(*task->Msg()) ||
aTask.Msg()->priority() != task->Msg()->priority());
}
# endif
#endif
if (!ShouldRunMessage(msg)) {
if (!ShouldRunMessage(*msg)) {
return;
}
MOZ_RELEASE_ASSERT(aTask.isInList());
aTask.remove();
if (!IsAlwaysDeferred(msg)) {
if (!IsAlwaysDeferred(*msg)) {
mMaybeDeferredPendingCount--;
}
@ -1509,8 +1504,8 @@ NS_IMPL_ISUPPORTS_INHERITED(MessageChannel::MessageTask, CancelableRunnable,
nsIRunnablePriority, nsIRunnableIPCMessageType)
MessageChannel::MessageTask::MessageTask(MessageChannel* aChannel,
Message&& aMessage)
: CancelableRunnable(aMessage.name()),
UniquePtr<Message> aMessage)
: CancelableRunnable(aMessage->name()),
mMonitor(aChannel->mMonitor),
mChannel(aChannel),
mMessage(std::move(aMessage)),
@ -1520,9 +1515,9 @@ MessageChannel::MessageTask::MessageTask(MessageChannel* aChannel,
mFuzzStopped(false)
#endif
{
MOZ_DIAGNOSTIC_ASSERT(mMessage, "message may not be null");
#ifdef FUZZING_SNAPSHOT
if (mMessage.IsFuzzMsg()) {
if (mMessage->IsFuzzMsg()) {
MOZ_FUZZING_IPC_MT_CTOR();
}
#endif
@ -1533,9 +1528,9 @@ MessageChannel::MessageTask::~MessageTask() {
// We track fuzzing messages until their run is complete. To make sure
// that we don't miss messages that are for some reason destroyed without
// being run (e.g. canceled), we catch this condition in the destructor.
if (mMessage.IsFuzzMsg() && !mFuzzStopped) {
if (mMessage->IsFuzzMsg() && !mFuzzStopped) {
MOZ_FUZZING_IPC_MT_STOP();
} else if (!mMessage.IsFuzzMsg() && !fuzzing::Nyx::instance().started()) {
} else if (!mMessage->IsFuzzMsg() && !fuzzing::Nyx::instance().started()) {
MOZ_FUZZING_IPC_PRE_FUZZ_MT_STOP();
}
#endif
@ -1543,7 +1538,7 @@ MessageChannel::MessageTask::~MessageTask() {
nsresult MessageChannel::MessageTask::Run() {
#ifdef FUZZING_SNAPSHOT
if (!mMessage.IsFuzzMsg()) {
if (!mMessage->IsFuzzMsg()) {
if (fuzzing::Nyx::instance().started()) {
// Once we started fuzzing, prevent non-fuzzing tasks from being
// run and potentially blocking worker threads.
@ -1583,7 +1578,7 @@ nsresult MessageChannel::MessageTask::Run() {
Channel()->RunMessage(proxy, *this);
#ifdef FUZZING_SNAPSHOT
if (mMessage.IsFuzzMsg() && !mFuzzStopped) {
if (mMessage->IsFuzzMsg() && !mFuzzStopped) {
MOZ_FUZZING_IPC_MT_STOP();
mFuzzStopped = true;
}
@ -1603,14 +1598,14 @@ nsresult MessageChannel::MessageTask::Cancel() {
Channel()->AssertWorkerThread();
mMonitor->AssertSameMonitor(*Channel()->mMonitor);
if (!IsAlwaysDeferred(Msg())) {
if (!IsAlwaysDeferred(*Msg())) {
Channel()->mMaybeDeferredPendingCount--;
}
remove();
#ifdef FUZZING_SNAPSHOT
if (mMessage.IsFuzzMsg() && !mFuzzStopped) {
if (mMessage->IsFuzzMsg() && !mFuzzStopped) {
MOZ_FUZZING_IPC_MT_STOP();
mFuzzStopped = true;
}
@ -1632,7 +1627,11 @@ void MessageChannel::MessageTask::Post() {
NS_IMETHODIMP
MessageChannel::MessageTask::GetPriority(uint32_t* aPriority) {
switch (mMessage.priority()) {
if (!mMessage) {
return NS_ERROR_FAILURE;
}
switch (mMessage->priority()) {
case Message::NORMAL_PRIORITY:
*aPriority = PRIORITY_NORMAL;
break;
@ -1657,18 +1656,18 @@ MessageChannel::MessageTask::GetPriority(uint32_t* aPriority) {
NS_IMETHODIMP
MessageChannel::MessageTask::GetType(uint32_t* aType) {
if (!Msg().is_valid()) {
if (!mMessage) {
// If mMessage has been moved already elsewhere, we can't know what the type
// has been.
return NS_ERROR_FAILURE;
}
*aType = Msg().type();
*aType = mMessage->type();
return NS_OK;
}
void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy,
Message&& aMsg) {
UniquePtr<Message> aMsg) {
AssertWorkerThread();
mMonitor->AssertCurrentThreadOwns();
@ -1679,15 +1678,15 @@ void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy,
UniquePtr<Message> reply;
IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg.seqno(),
aMsg.transaction_id());
AddProfilerMarker(aMsg, MessageDirection::eReceiving);
IPC_LOG("DispatchMessage: seqno=%d, xid=%d", aMsg->seqno(),
aMsg->transaction_id());
AddProfilerMarker(*aMsg, MessageDirection::eReceiving);
{
AutoEnterTransaction transaction(this, aMsg);
AutoEnterTransaction transaction(this, *aMsg);
int id = aMsg.transaction_id();
MOZ_RELEASE_ASSERT(!aMsg.is_sync() || id == transaction.TransactionID());
int id = aMsg->transaction_id();
MOZ_RELEASE_ASSERT(!aMsg->is_sync() || id == transaction.TransactionID());
{
MonitorAutoUnlock unlock(*mMonitor);
@ -1695,10 +1694,10 @@ void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy,
mListener->ArtificialSleep();
if (aMsg.is_sync()) {
DispatchSyncMessage(aProxy, aMsg, *getter_Transfers(reply));
if (aMsg->is_sync()) {
DispatchSyncMessage(aProxy, *aMsg, reply);
} else {
DispatchAsyncMessage(aProxy, aMsg);
DispatchAsyncMessage(aProxy, *aMsg);
}
mListener->ArtificialSleep();
@ -1707,14 +1706,14 @@ void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy,
if (reply && transaction.IsCanceled()) {
// The transaction has been canceled. Don't send a reply.
IPC_LOG("Nulling out reply due to cancellation, seqno=%d, xid=%d",
aMsg.seqno(), id);
aMsg->seqno(), id);
reply = nullptr;
}
}
if (reply && ChannelConnected == mChannelState) {
IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg.seqno(),
aMsg.transaction_id());
IPC_LOG("Sending reply seqno=%d, xid=%d", aMsg->seqno(),
aMsg->transaction_id());
AddProfilerMarker(*reply, MessageDirection::eSending);
mLink->SendMessage(std::move(reply));
@ -1723,7 +1722,7 @@ void MessageChannel::DispatchMessage(ActorLifecycleProxy* aProxy,
void MessageChannel::DispatchSyncMessage(ActorLifecycleProxy* aProxy,
const Message& aMsg,
Message*& aReply) {
UniquePtr<Message>& aReply) {
AssertWorkerThread();
mozilla::TimeStamp start = TimeStamp::Now();
@ -2196,8 +2195,8 @@ void MessageChannel::DebugAbort(const char* file, int line, const char* cond,
MessageQueue pending = std::move(mPending);
while (!pending.isEmpty()) {
printf_stderr(" [ %s%s ]\n",
pending.getFirst()->Msg().is_sync() ? "sync" : "async",
pending.getFirst()->Msg().is_reply() ? "reply" : "");
pending.getFirst()->Msg()->is_sync() ? "sync" : "async",
pending.getFirst()->Msg()->is_reply() ? "reply" : "");
pending.popFirst();
}
@ -2324,20 +2323,20 @@ void MessageChannel::CancelTransaction(int transaction) {
bool foundSync = false;
for (MessageTask* p = mPending.getFirst(); p;) {
Message& msg = p->Msg();
UniquePtr<Message>& msg = p->Msg();
// If there was a race between the parent and the child, then we may
// have a queued sync message. We want to drop this message from the
// queue since if will get cancelled along with the transaction being
// cancelled. This happens if the message in the queue is
// NESTED_INSIDE_SYNC.
if (msg.is_sync() && msg.nested_level() != IPC::Message::NOT_NESTED) {
if (msg->is_sync() && msg->nested_level() != IPC::Message::NOT_NESTED) {
MOZ_RELEASE_ASSERT(!foundSync);
MOZ_RELEASE_ASSERT(msg.transaction_id() != transaction);
IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg.seqno(),
msg.transaction_id());
MOZ_RELEASE_ASSERT(msg->transaction_id() != transaction);
IPC_LOG("Removing msg from queue seqno=%d xid=%d", msg->seqno(),
msg->transaction_id());
foundSync = true;
if (!IsAlwaysDeferred(msg)) {
if (!IsAlwaysDeferred(*msg)) {
mMaybeDeferredPendingCount--;
}
p = p->removeAndGetNext();

Просмотреть файл

@ -265,8 +265,9 @@ class MessageChannel : HasResultCodes {
return mBuildIDsConfirmedMatch;
}
// Synchronously send |msg| (i.e., wait for |reply|)
bool Send(UniquePtr<Message> aMsg, Message* aReply) EXCLUDES(*mMonitor);
// Synchronously send |aMsg| (i.e., wait for |aReply|)
bool Send(UniquePtr<Message> aMsg, UniquePtr<Message>* aReply)
EXCLUDES(*mMonitor);
bool CanSend() const EXCLUDES(*mMonitor);
@ -396,19 +397,19 @@ class MessageChannel : HasResultCodes {
void ProcessPendingRequests(ActorLifecycleProxy* aProxy,
AutoEnterTransaction& aTransaction)
REQUIRES(*mMonitor);
bool ProcessPendingRequest(ActorLifecycleProxy* aProxy, Message&& aUrgent)
REQUIRES(*mMonitor);
bool ProcessPendingRequest(ActorLifecycleProxy* aProxy,
UniquePtr<Message> aUrgent) REQUIRES(*mMonitor);
void EnqueuePendingMessages() REQUIRES(*mMonitor);
// Dispatches an incoming message to its appropriate handler.
void DispatchMessage(ActorLifecycleProxy* aProxy, Message&& aMsg)
void DispatchMessage(ActorLifecycleProxy* aProxy, UniquePtr<Message> aMsg)
REQUIRES(*mMonitor);
// DispatchMessage will route to one of these functions depending on the
// protocol type of the message.
void DispatchSyncMessage(ActorLifecycleProxy* aProxy, const Message& aMsg,
Message*& aReply) EXCLUDES(*mMonitor);
UniquePtr<Message>& aReply) EXCLUDES(*mMonitor);
void DispatchAsyncMessage(ActorLifecycleProxy* aProxy, const Message& aMsg)
EXCLUDES(*mMonitor);
@ -480,7 +481,7 @@ class MessageChannel : HasResultCodes {
bool WasTransactionCanceled(int transaction);
bool ShouldDeferMessage(const Message& aMsg) REQUIRES(*mMonitor);
void OnMessageReceivedFromLink(Message&& aMsg) REQUIRES(*mMonitor);
void OnMessageReceivedFromLink(UniquePtr<Message> aMsg) REQUIRES(*mMonitor);
void OnChannelErrorFromLink() REQUIRES(*mMonitor);
private:
@ -509,7 +510,7 @@ class MessageChannel : HasResultCodes {
public nsIRunnablePriority,
public nsIRunnableIPCMessageType {
public:
explicit MessageTask(MessageChannel* aChannel, Message&& aMessage);
explicit MessageTask(MessageChannel* aChannel, UniquePtr<Message> aMessage);
MessageTask() = delete;
MessageTask(const MessageTask&) = delete;
@ -526,8 +527,14 @@ class MessageChannel : HasResultCodes {
return mScheduled;
}
Message& Msg() { return mMessage; }
const Message& Msg() const { return mMessage; }
UniquePtr<Message>& Msg() {
MOZ_DIAGNOSTIC_ASSERT(mMessage, "message was moved");
return mMessage;
}
const UniquePtr<Message>& Msg() const {
MOZ_DIAGNOSTIC_ASSERT(mMessage, "message was moved");
return mMessage;
}
void AssertMonitorHeld(const RefCountedMonitor& aMonitor) REQUIRES(aMonitor)
ASSERT_CAPABILITY(*mMonitor) {
@ -549,7 +556,7 @@ class MessageChannel : HasResultCodes {
// The channel which this MessageTask is associated with. Only valid while
// `mMonitor` is held, and this MessageTask `isInList()`.
MessageChannel* const mChannel;
Message mMessage;
UniquePtr<Message> mMessage;
bool mScheduled : 1 GUARDED_BY(*mMonitor);
#ifdef FUZZING_SNAPSHOT
bool mFuzzStopped;

Просмотреть файл

@ -186,7 +186,7 @@ void PortLink::OnPortStatusChanged() {
return;
}
mChan->OnMessageReceivedFromLink(std::move(*message));
mChan->OnMessageReceivedFromLink(std::move(message));
}
}

Просмотреть файл

@ -206,7 +206,7 @@ bool MiniTransceiver::RecvData(char* aDataBuf, size_t aBufSize,
return true;
}
bool MiniTransceiver::Recv(IPC::Message& aMsg) {
bool MiniTransceiver::Recv(UniquePtr<IPC::Message>& aMsg) {
#ifdef DEBUG
if (mState == STATE_RECEIVING) {
MOZ_CRASH(
@ -244,7 +244,7 @@ bool MiniTransceiver::Recv(IPC::Message& aMsg) {
"The number of file descriptors in the header is different from"
" the number actually received");
aMsg = std::move(*msg);
aMsg = std::move(msg);
return true;
}

Просмотреть файл

@ -50,8 +50,9 @@ class MiniTransceiver {
* \param aMsg will hold the content of the received message.
* \return false if the fd is closed or with an error.
*/
bool Recv(IPC::Message& aMsg);
inline bool RecvInfallible(IPC::Message& aMsg, const char* aCrashMessage) {
bool Recv(UniquePtr<IPC::Message>& aMsg);
inline bool RecvInfallible(UniquePtr<IPC::Message>& aMsg,
const char* aCrashMessage) {
bool Ok = Recv(aMsg);
if (!Ok) {
MOZ_CRASH_UNSAFE(aCrashMessage);

Просмотреть файл

@ -89,7 +89,7 @@ void NodeChannel::Start(bool aCallConnect) {
mExistingListener = mChannel->set_listener(this);
std::queue<IPC::Message> pending;
std::queue<UniquePtr<IPC::Message>> pending;
if (mExistingListener) {
mExistingListener->GetQueuedMessages(pending);
}
@ -230,31 +230,17 @@ void NodeChannel::DoSendMessage(UniquePtr<IPC::Message> aMessage) {
}
}
void NodeChannel::OnMessageReceived(IPC::Message&& aMessage) {
void NodeChannel::OnMessageReceived(UniquePtr<IPC::Message> aMessage) {
AssertIOThread();
if (!aMessage.is_valid()) {
NS_WARNING("Received an invalid message");
#ifdef FUZZING_SNAPSHOT
if (aMessage.IsFuzzMsg()) {
MOZ_FUZZING_NYX_PRINT("ERROR: Received invalid fuzzing IPC message?!\n");
MOZ_REALLY_CRASH(__LINE__);
}
#endif
OnChannelError();
return;
}
#ifdef FUZZING_SNAPSHOT
if (mBlockSendRecv && !aMessage.IsFuzzMsg()) {
if (mBlockSendRecv && !aMessage->IsFuzzMsg()) {
return;
}
#endif
IPC::MessageReader reader(aMessage);
switch (aMessage.type()) {
IPC::MessageReader reader(*aMessage);
switch (aMessage->type()) {
case REQUEST_INTRODUCTION_MESSAGE_TYPE: {
NodeName name;
if (IPC::ReadParam(&reader, &name)) {
@ -272,8 +258,7 @@ void NodeChannel::OnMessageReceived(IPC::Message&& aMessage) {
break;
}
case BROADCAST_MESSAGE_TYPE: {
mListener->OnBroadcast(mName,
MakeUnique<IPC::Message>(std::move(aMessage)));
mListener->OnBroadcast(mName, std::move(aMessage));
return;
}
case ACCEPT_INVITE_MESSAGE_TYPE: {
@ -294,14 +279,13 @@ void NodeChannel::OnMessageReceived(IPC::Message&& aMessage) {
case EVENT_MESSAGE_TYPE:
default: {
#ifdef FUZZING_SNAPSHOT
if (!fuzzing::IPCFuzzController::instance().ObserveIPCMessage(this,
aMessage)) {
if (!fuzzing::IPCFuzzController::instance().ObserveIPCMessage(
this, *aMessage)) {
return;
}
#endif
mListener->OnEventMessage(mName,
MakeUnique<IPC::Message>(std::move(aMessage)));
mListener->OnEventMessage(mName, std::move(aMessage));
return;
}
}

Просмотреть файл

@ -130,7 +130,7 @@ class NodeChannel final : public IPC::Channel::Listener {
void DoSendMessage(UniquePtr<IPC::Message> aMessage);
// IPC::Channel::Listener implementation
void OnMessageReceived(IPC::Message&& aMessage) override;
void OnMessageReceived(UniquePtr<IPC::Message> aMessage) override;
void OnChannelConnected(base::ProcessId aPeerPid) override;
void OnChannelError() override;

Просмотреть файл

@ -479,27 +479,26 @@ void IProtocol::SetManagerAndRegister(IProtocol* aManager, int32_t aId) {
aManager->RegisterID(this, aId);
}
bool IProtocol::ChannelSend(IPC::Message* aMsg) {
UniquePtr<IPC::Message> msg(aMsg);
bool IProtocol::ChannelSend(UniquePtr<IPC::Message> aMsg) {
if (CanSend()) {
// NOTE: This send call failing can only occur during toplevel channel
// teardown. As this is an async call, this isn't reasonable to predict or
// respond to, so just drop the message on the floor silently.
GetIPCChannel()->Send(std::move(msg));
GetIPCChannel()->Send(std::move(aMsg));
return true;
}
WarnMessageDiscarded(msg.get());
WarnMessageDiscarded(aMsg.get());
return false;
}
bool IProtocol::ChannelSend(IPC::Message* aMsg, IPC::Message* aReply) {
UniquePtr<IPC::Message> msg(aMsg);
bool IProtocol::ChannelSend(UniquePtr<IPC::Message> aMsg,
UniquePtr<IPC::Message>* aReply) {
if (CanSend()) {
return GetIPCChannel()->Send(std::move(msg), aReply);
return GetIPCChannel()->Send(std::move(aMsg), aReply);
}
WarnMessageDiscarded(msg.get());
WarnMessageDiscarded(aMsg.get());
return false;
}
@ -798,7 +797,7 @@ void IPDLResolverInner::ResolveOrReject(
WriteIPDLParam(&writer, actor, aResolve);
aWrite(reply.get(), actor);
actor->ChannelSend(reply.release());
actor->ChannelSend(std::move(reply));
}
void IPDLResolverInner::Destroy() {

Просмотреть файл

@ -240,8 +240,9 @@ class IProtocol : public HasResultCodes {
virtual Result OnMessageReceived(const Message& aMessage) = 0;
virtual Result OnMessageReceived(const Message& aMessage,
Message*& aReply) = 0;
virtual Result OnCallReceived(const Message& aMessage, Message*& aReply) = 0;
UniquePtr<Message>& aReply) = 0;
virtual Result OnCallReceived(const Message& aMessage,
UniquePtr<Message>& aReply) = 0;
bool AllocShmem(size_t aSize, Shmem::SharedMemory::SharedMemoryType aType,
Shmem* aOutMem);
bool AllocUnsafeShmem(size_t aSize,
@ -273,17 +274,18 @@ class IProtocol : public HasResultCodes {
void SetManagerAndRegister(IProtocol* aManager, int32_t aId);
// Helpers for calling `Send` on our underlying IPC channel.
bool ChannelSend(IPC::Message* aMsg);
bool ChannelSend(IPC::Message* aMsg, IPC::Message* aReply);
bool ChannelSend(UniquePtr<IPC::Message> aMsg);
bool ChannelSend(UniquePtr<IPC::Message> aMsg,
UniquePtr<IPC::Message>* aReply);
template <typename Value>
void ChannelSend(IPC::Message* aMsg, ResolveCallback<Value>&& aResolve,
void ChannelSend(UniquePtr<IPC::Message> aMsg,
ResolveCallback<Value>&& aResolve,
RejectCallback&& aReject) {
UniquePtr<IPC::Message> msg(aMsg);
if (CanSend()) {
GetIPCChannel()->Send(std::move(msg), this, std::move(aResolve),
GetIPCChannel()->Send(std::move(aMsg), this, std::move(aResolve),
std::move(aReject));
} else {
WarnMessageDiscarded(msg.get());
WarnMessageDiscarded(aMsg.get());
aReject(ResponseRejectReason::SendError);
}
}

Просмотреть файл

@ -1811,7 +1811,7 @@ def _generateMessageConstructor(md, segmentSize, protocol, forReply=False):
FunctionDecl(
clsname,
params=[Decl(Type("int32_t"), routingId.name)],
ret=Type("IPC::Message", ptr=True),
ret=Type("mozilla::UniquePtr<IPC::Message>"),
)
)
@ -1878,29 +1878,21 @@ def _generateMessageConstructor(md, segmentSize, protocol, forReply=False):
)
segmentSize = int(segmentSize)
if segmentSize:
func.addstmt(
StmtReturn(
ExprNew(
Type("IPC::Message"),
args=[
routingId,
ExprVar(msgid),
ExprLiteral.Int(int(segmentSize)),
flags,
],
)
)
)
else:
func.addstmt(
StmtReturn(
ExprCall(
ExprVar("IPC::Message::IPDLMessage"),
args=[routingId, ExprVar(msgid), flags],
)
if not segmentSize:
segmentSize = 0
func.addstmt(
StmtReturn(
ExprCall(
ExprVar("IPC::Message::IPDLMessage"),
args=[
routingId,
ExprVar(msgid),
ExprLiteral.Int(int(segmentSize)),
flags,
],
)
)
)
return func
@ -4120,7 +4112,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
def makeHandlerMethod(name, switch, hasReply, dispatches=False):
params = [Decl(Type("Message", const=True, ref=True), msgvar.name)]
if hasReply:
params.append(Decl(Type("Message", ref=True, ptr=True), replyvar.name))
params.append(Decl(Type("UniquePtr<Message>", ref=True), replyvar.name))
method = MethodDefn(
MethodDecl(
@ -4711,7 +4703,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
sendok, sendstmts = self.sendBlocking(md, msgvar, replyvar)
replystmts = self.deserializeReply(
md,
ExprAddrOf(replyvar),
replyvar,
self.side,
errfnSendCtor,
errfnSentinel(ExprLiteral.NULL),
@ -4727,7 +4719,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
// Synchronously send the constructor message to the other side. If
// the send fails, e.g. due to the remote side shutting down, the
// actor will be destroyed and potentially freed.
Message ${replyvar};
UniquePtr<Message> ${replyvar};
$*{sendstmts}
if (!(${sendok})) {
@ -4838,12 +4830,12 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
sendok, sendstmts = self.sendBlocking(md, msgvar, replyvar, actorvar)
method.addstmts(
stmts
+ [Whitespace.NL, StmtDecl(Decl(Type("Message"), replyvar.name))]
+ [Whitespace.NL, StmtDecl(Decl(Type("UniquePtr<Message>"), replyvar.name))]
+ sendstmts
)
destmts = self.deserializeReply(
md, ExprAddrOf(replyvar), self.side, errfnSend, errfnSentinel(), actorvar
md, replyvar, self.side, errfnSend, errfnSentinel(), actorvar
)
ifsendok = StmtIf(ExprLiteral.FALSE)
ifsendok.addifstmts(destmts)
@ -4976,12 +4968,12 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
failif.addifstmt(StmtReturn.FALSE)
desstmts = self.deserializeReply(
md, ExprAddrOf(replyvar), self.side, errfnSend, errfnSentinel()
md, replyvar, self.side, errfnSend, errfnSentinel()
)
method.addstmts(
serstmts
+ [Whitespace.NL, StmtDecl(Decl(Type("Message"), replyvar.name))]
+ [Whitespace.NL, StmtDecl(Decl(Type("UniquePtr<Message>"), replyvar.name))]
+ sendstmts
+ [failif]
+ desstmts
@ -5088,7 +5080,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
stmts = (
[
StmtDecl(
Decl(Type("IPC::Message", ptr=True), msgvar.name),
Decl(Type("UniquePtr<IPC::Message>"), msgvar.name),
init=ExprCall(ExprVar(md.pqMsgCtorFunc()), args=[routingId]),
),
StmtDecl(
@ -5417,7 +5409,7 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
Whitespace.NL,
StmtDecl(
Decl(Type("IPC::MessageReader"), readervar.name),
initargs=[self.replyvar, ExprVar.THIS],
initargs=[ExprDeref(self.replyvar), ExprVar.THIS],
),
]
+ declstmts
@ -5459,7 +5451,12 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
stmts.append(
StmtExpr(
ExprCall(
send, args=[msgexpr, ExprMove(resolvefn), ExprMove(rejectfn)]
send,
args=[
ExprMove(msgexpr),
ExprMove(resolvefn),
ExprMove(rejectfn),
],
)
)
)
@ -5467,7 +5464,8 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
else:
stmts.append(
StmtDecl(
Decl(Type.BOOL, sendok.name), init=ExprCall(send, args=[msgexpr])
Decl(Type.BOOL, sendok.name),
init=ExprCall(send, args=[ExprMove(msgexpr)]),
)
)
retvar = sendok
@ -5514,7 +5512,8 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
ExprAssn(
sendok,
ExprCall(
send, args=[msgexpr, ExprAddrOf(replyexpr)]
send,
args=[ExprMove(msgexpr), ExprAddrOf(replyexpr)],
),
)
),

Просмотреть файл

@ -714,11 +714,13 @@ NS_IMETHODIMP IPCFuzzController::IPCFuzzLoop::Run() {
std::move(msg));
#else
// For asynchronous injection, we have to post to the I/O thread instead.
XRE_GetIOMessageLoop()->PostTask(
NewRunnableMethod<StoreCopyPassByRRef<IPC::Message&&> >(
"NodeChannel::OnMessageReceived",
IPCFuzzController::instance().nodeChannel,
&NodeChannel::OnMessageReceived, std::move(*msg)));
XRE_GetIOMessageLoop()->PostTask(NS_NewRunnableFunction(
"NodeChannel::OnMessageReceived",
[msg = std::move(msg),
nodeChannel =
RefPtr{IPCFuzzController::instance().nodeChannel}]() mutable {
nodeChannel->OnMessageReceived(std::move(msg));
}));
#endif
#ifdef MOZ_FUZZ_IPC_SYNC_AFTER_EACH_MSG

Просмотреть файл

@ -65,12 +65,12 @@ class IPCFuzzController {
friend class IPCFuzzController;
public:
NS_DECL_NSIRUNNABLE
IPCFuzzLoop();
NS_INLINE_DECL_REFCOUNTING_INHERITED(IPCFuzzLoop, Runnable)
private:
NS_DECL_NSIRUNNABLE
~IPCFuzzLoop() = default;
};
public:

Просмотреть файл

@ -94,7 +94,7 @@ void FuzzProtocol(T* aProtocol, const uint8_t* aData, size_t aSize,
if (m.is_sync()) {
UniquePtr<IPC::Message> reply;
aProtocol->OnMessageReceived(m, *getter_Transfers(reply));
aProtocol->OnMessageReceived(m, reply);
} else {
aProtocol->OnMessageReceived(m);
}