зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1706374 - Part 2: Format mojo/core/ports with clang-format, r=handyman
Differential Revision: https://phabricator.services.mozilla.com/D112766
This commit is contained in:
Родитель
8008933c20
Коммит
2a2037d8e9
|
@ -94,16 +94,13 @@ static_assert(sizeof(UserMessageReadAckEventData) % kPortsMessageAlignment == 0,
|
|||
|
||||
} // namespace
|
||||
|
||||
Event::PortDescriptor::PortDescriptor() {
|
||||
memset(padding, 0, sizeof(padding));
|
||||
}
|
||||
Event::PortDescriptor::PortDescriptor() { memset(padding, 0, sizeof(padding)); }
|
||||
|
||||
Event::~Event() = default;
|
||||
|
||||
// static
|
||||
ScopedEvent Event::Deserialize(const void* buffer, size_t num_bytes) {
|
||||
if (num_bytes < sizeof(SerializedHeader))
|
||||
return nullptr;
|
||||
if (num_bytes < sizeof(SerializedHeader)) return nullptr;
|
||||
|
||||
const auto* header = static_cast<const SerializedHeader*>(buffer);
|
||||
const PortName& port_name = header->port_name;
|
||||
|
@ -150,9 +147,7 @@ void Event::Serialize(void* buffer) const {
|
|||
SerializeData(header + 1);
|
||||
}
|
||||
|
||||
ScopedEvent Event::Clone() const {
|
||||
return nullptr;
|
||||
}
|
||||
ScopedEvent Event::Clone() const { return nullptr; }
|
||||
|
||||
UserMessageEvent::~UserMessageEvent() = default;
|
||||
|
||||
|
@ -180,14 +175,12 @@ bool UserMessageEvent::NotifyWillBeRoutedExternally() {
|
|||
ScopedEvent UserMessageEvent::Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
size_t num_bytes) {
|
||||
if (num_bytes < sizeof(UserMessageEventData))
|
||||
return nullptr;
|
||||
if (num_bytes < sizeof(UserMessageEventData)) return nullptr;
|
||||
|
||||
const auto* data = static_cast<const UserMessageEventData*>(buffer);
|
||||
base::CheckedNumeric<size_t> port_data_size = data->num_ports;
|
||||
port_data_size *= sizeof(PortDescriptor) + sizeof(PortName);
|
||||
if (!port_data_size.IsValid())
|
||||
return nullptr;
|
||||
if (!port_data_size.IsValid()) return nullptr;
|
||||
|
||||
base::CheckedNumeric<size_t> total_size = port_data_size.ValueOrDie();
|
||||
total_size += sizeof(UserMessageEventData);
|
||||
|
@ -213,8 +206,7 @@ UserMessageEvent::UserMessageEvent(const PortName& port_name,
|
|||
: Event(Type::kUserMessage, port_name), sequence_num_(sequence_num) {}
|
||||
|
||||
size_t UserMessageEvent::GetSizeIfSerialized() const {
|
||||
if (!message_)
|
||||
return 0;
|
||||
if (!message_) return 0;
|
||||
return message_->GetSizeIfSerialized();
|
||||
}
|
||||
|
||||
|
@ -255,9 +247,7 @@ ScopedEvent PortAcceptedEvent::Deserialize(const PortName& port_name,
|
|||
return std::make_unique<PortAcceptedEvent>(port_name);
|
||||
}
|
||||
|
||||
size_t PortAcceptedEvent::GetSerializedDataSize() const {
|
||||
return 0;
|
||||
}
|
||||
size_t PortAcceptedEvent::GetSerializedDataSize() const { return 0; }
|
||||
|
||||
void PortAcceptedEvent::SerializeData(void* buffer) const {}
|
||||
|
||||
|
@ -278,8 +268,7 @@ ObserveProxyEvent::~ObserveProxyEvent() = default;
|
|||
ScopedEvent ObserveProxyEvent::Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
size_t num_bytes) {
|
||||
if (num_bytes < sizeof(ObserveProxyEventData))
|
||||
return nullptr;
|
||||
if (num_bytes < sizeof(ObserveProxyEventData)) return nullptr;
|
||||
|
||||
const auto* data = static_cast<const ObserveProxyEventData*>(buffer);
|
||||
return std::make_unique<ObserveProxyEvent>(
|
||||
|
@ -316,8 +305,7 @@ ObserveProxyAckEvent::~ObserveProxyAckEvent() = default;
|
|||
ScopedEvent ObserveProxyAckEvent::Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
size_t num_bytes) {
|
||||
if (num_bytes < sizeof(ObserveProxyAckEventData))
|
||||
return nullptr;
|
||||
if (num_bytes < sizeof(ObserveProxyAckEventData)) return nullptr;
|
||||
|
||||
const auto* data = static_cast<const ObserveProxyAckEventData*>(buffer);
|
||||
return std::make_unique<ObserveProxyAckEvent>(port_name,
|
||||
|
@ -349,8 +337,7 @@ ObserveClosureEvent::~ObserveClosureEvent() = default;
|
|||
ScopedEvent ObserveClosureEvent::Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
size_t num_bytes) {
|
||||
if (num_bytes < sizeof(ObserveClosureEventData))
|
||||
return nullptr;
|
||||
if (num_bytes < sizeof(ObserveClosureEventData)) return nullptr;
|
||||
|
||||
const auto* data = static_cast<const ObserveClosureEventData*>(buffer);
|
||||
return std::make_unique<ObserveClosureEvent>(port_name,
|
||||
|
@ -381,10 +368,8 @@ MergePortEvent::~MergePortEvent() = default;
|
|||
|
||||
// static
|
||||
ScopedEvent MergePortEvent::Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
size_t num_bytes) {
|
||||
if (num_bytes < sizeof(MergePortEventData))
|
||||
return nullptr;
|
||||
const void* buffer, size_t num_bytes) {
|
||||
if (num_bytes < sizeof(MergePortEventData)) return nullptr;
|
||||
|
||||
const auto* data = static_cast<const MergePortEventData*>(buffer);
|
||||
return std::make_unique<MergePortEvent>(port_name, data->new_port_name,
|
||||
|
@ -402,21 +387,16 @@ void MergePortEvent::SerializeData(void* buffer) const {
|
|||
}
|
||||
|
||||
UserMessageReadAckRequestEvent::UserMessageReadAckRequestEvent(
|
||||
const PortName& port_name,
|
||||
uint64_t sequence_num_to_acknowledge)
|
||||
const PortName& port_name, uint64_t sequence_num_to_acknowledge)
|
||||
: Event(Type::kUserMessageReadAckRequest, port_name),
|
||||
sequence_num_to_acknowledge_(sequence_num_to_acknowledge) {
|
||||
}
|
||||
sequence_num_to_acknowledge_(sequence_num_to_acknowledge) {}
|
||||
|
||||
UserMessageReadAckRequestEvent::~UserMessageReadAckRequestEvent() = default;
|
||||
|
||||
// static
|
||||
ScopedEvent UserMessageReadAckRequestEvent::Deserialize(
|
||||
const PortName& port_name,
|
||||
const void* buffer,
|
||||
size_t num_bytes) {
|
||||
if (num_bytes < sizeof(UserMessageReadAckRequestEventData))
|
||||
return nullptr;
|
||||
const PortName& port_name, const void* buffer, size_t num_bytes) {
|
||||
if (num_bytes < sizeof(UserMessageReadAckRequestEventData)) return nullptr;
|
||||
|
||||
const auto* data =
|
||||
static_cast<const UserMessageReadAckRequestEventData*>(buffer);
|
||||
|
@ -434,11 +414,9 @@ void UserMessageReadAckRequestEvent::SerializeData(void* buffer) const {
|
|||
}
|
||||
|
||||
UserMessageReadAckEvent::UserMessageReadAckEvent(
|
||||
const PortName& port_name,
|
||||
uint64_t sequence_num_acknowledged)
|
||||
const PortName& port_name, uint64_t sequence_num_acknowledged)
|
||||
: Event(Type::kUserMessageReadAck, port_name),
|
||||
sequence_num_acknowledged_(sequence_num_acknowledged) {
|
||||
}
|
||||
sequence_num_acknowledged_(sequence_num_acknowledged) {}
|
||||
|
||||
UserMessageReadAckEvent::~UserMessageReadAckEvent() = default;
|
||||
|
||||
|
@ -446,8 +424,7 @@ UserMessageReadAckEvent::~UserMessageReadAckEvent() = default;
|
|||
ScopedEvent UserMessageReadAckEvent::Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
size_t num_bytes) {
|
||||
if (num_bytes < sizeof(UserMessageReadAckEventData))
|
||||
return nullptr;
|
||||
if (num_bytes < sizeof(UserMessageReadAckEventData)) return nullptr;
|
||||
|
||||
const auto* data = static_cast<const UserMessageReadAckEventData*>(buffer);
|
||||
return std::make_unique<UserMessageReadAckEvent>(
|
||||
|
|
|
@ -142,8 +142,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) UserMessageEvent : public Event {
|
|||
PortDescriptor* port_descriptors() { return port_descriptors_.data(); }
|
||||
PortName* ports() { return ports_.data(); }
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
size_t GetSizeIfSerialized() const;
|
||||
|
@ -167,8 +166,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) PortAcceptedEvent : public Event {
|
|||
explicit PortAcceptedEvent(const PortName& port_name);
|
||||
~PortAcceptedEvent() override;
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
private:
|
||||
|
@ -180,8 +178,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) PortAcceptedEvent : public Event {
|
|||
|
||||
class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveProxyEvent : public Event {
|
||||
public:
|
||||
ObserveProxyEvent(const PortName& port_name,
|
||||
const NodeName& proxy_node_name,
|
||||
ObserveProxyEvent(const PortName& port_name, const NodeName& proxy_node_name,
|
||||
const PortName& proxy_port_name,
|
||||
const NodeName& proxy_target_node_name,
|
||||
const PortName& proxy_target_port_name);
|
||||
|
@ -196,8 +193,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveProxyEvent : public Event {
|
|||
return proxy_target_port_name_;
|
||||
}
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
private:
|
||||
|
@ -220,8 +216,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveProxyAckEvent : public Event {
|
|||
|
||||
uint64_t last_sequence_num() const { return last_sequence_num_; }
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
private:
|
||||
|
@ -244,8 +239,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveClosureEvent : public Event {
|
|||
last_sequence_num_ = last_sequence_num;
|
||||
}
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
private:
|
||||
|
@ -260,8 +254,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveClosureEvent : public Event {
|
|||
|
||||
class COMPONENT_EXPORT(MOJO_CORE_PORTS) MergePortEvent : public Event {
|
||||
public:
|
||||
MergePortEvent(const PortName& port_name,
|
||||
const PortName& new_port_name,
|
||||
MergePortEvent(const PortName& port_name, const PortName& new_port_name,
|
||||
const PortDescriptor& new_port_descriptor);
|
||||
~MergePortEvent() override;
|
||||
|
||||
|
@ -270,8 +263,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) MergePortEvent : public Event {
|
|||
return new_port_descriptor_;
|
||||
}
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
private:
|
||||
|
@ -295,8 +287,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) UserMessageReadAckRequestEvent
|
|||
return sequence_num_to_acknowledge_;
|
||||
}
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
private:
|
||||
|
@ -316,8 +307,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) UserMessageReadAckEvent : public Event {
|
|||
return sequence_num_acknowledged_;
|
||||
}
|
||||
|
||||
static ScopedEvent Deserialize(const PortName& port_name,
|
||||
const void* buffer,
|
||||
static ScopedEvent Deserialize(const PortName& port_name, const void* buffer,
|
||||
size_t num_bytes);
|
||||
|
||||
private:
|
||||
|
|
|
@ -31,8 +31,7 @@ MessageQueue::MessageQueue(uint64_t next_sequence_num)
|
|||
MessageQueue::~MessageQueue() {
|
||||
#if DCHECK_IS_ON()
|
||||
size_t num_leaked_ports = 0;
|
||||
for (const auto& message : heap_)
|
||||
num_leaked_ports += message->num_ports();
|
||||
for (const auto& message : heap_) num_leaked_ports += message->num_ports();
|
||||
DVLOG_IF(1, num_leaked_ports > 0)
|
||||
<< "Leaking " << num_leaked_ports << " ports in unreceived messages";
|
||||
#endif
|
||||
|
|
|
@ -15,8 +15,7 @@ const NodeName kInvalidNodeName = {0, 0};
|
|||
std::ostream& operator<<(std::ostream& stream, const Name& name) {
|
||||
std::ios::fmtflags flags(stream.flags());
|
||||
stream << std::hex << std::uppercase << name.v1;
|
||||
if (name.v2 != 0)
|
||||
stream << '.' << name.v2;
|
||||
if (name.v2 != 0) stream << '.' << name.v2;
|
||||
stream.flags(flags);
|
||||
return stream;
|
||||
}
|
||||
|
|
|
@ -26,9 +26,7 @@ inline bool operator==(const Name& a, const Name& b) {
|
|||
return a.v1 == b.v1 && a.v2 == b.v2;
|
||||
}
|
||||
|
||||
inline bool operator!=(const Name& a, const Name& b) {
|
||||
return !(a == b);
|
||||
}
|
||||
inline bool operator!=(const Name& a, const Name& b) { return !(a == b); }
|
||||
|
||||
inline bool operator<(const Name& a, const Name& b) {
|
||||
return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2);
|
||||
|
|
|
@ -11,7 +11,8 @@ namespace core {
|
|||
namespace ports {
|
||||
namespace test {
|
||||
|
||||
TEST(NameTest, Defaults) {
|
||||
TEST(NameTest, Defaults)
|
||||
{
|
||||
PortName default_port_name;
|
||||
EXPECT_EQ(kInvalidPortName, default_port_name);
|
||||
|
||||
|
@ -19,7 +20,8 @@ TEST(NameTest, Defaults) {
|
|||
EXPECT_EQ(kInvalidNodeName, default_node_name);
|
||||
}
|
||||
|
||||
TEST(NameTest, PortNameChecks) {
|
||||
TEST(NameTest, PortNameChecks)
|
||||
{
|
||||
PortName port_name_a(50, 100);
|
||||
PortName port_name_b(50, 100);
|
||||
PortName port_name_c(100, 50);
|
||||
|
@ -44,7 +46,8 @@ TEST(NameTest, PortNameChecks) {
|
|||
EXPECT_NE(hash_b, hash_c);
|
||||
}
|
||||
|
||||
TEST(NameTest, NodeNameChecks) {
|
||||
TEST(NameTest, NodeNameChecks)
|
||||
{
|
||||
NodeName node_name_a(50, 100);
|
||||
NodeName node_name_b(50, 100);
|
||||
NodeName node_name_c(100, 50);
|
||||
|
|
|
@ -26,9 +26,9 @@
|
|||
#include "mojo/core/ports/port_locker.h"
|
||||
|
||||
#if !defined(OS_NACL)
|
||||
#include "crypto/random.h"
|
||||
# include "crypto/random.h"
|
||||
#else
|
||||
#include "base/rand_util.h"
|
||||
# include "base/rand_util.h"
|
||||
#endif
|
||||
|
||||
namespace mojo {
|
||||
|
@ -86,8 +86,7 @@ bool CanAcceptMoreMessages(const Port* port) {
|
|||
// Have we already doled out the last message (i.e., do we expect to NOT
|
||||
// receive further messages)?
|
||||
uint64_t next_sequence_num = port->message_queue.next_sequence_num();
|
||||
if (port->state == Port::kClosed)
|
||||
return false;
|
||||
if (port->state == Port::kClosed) return false;
|
||||
if (port->peer_closed || port->remove_proxy_on_last_message) {
|
||||
if (port->peer_lost_unexpectedly)
|
||||
return port->message_queue.HasNextMessage();
|
||||
|
@ -107,8 +106,7 @@ Node::Node(const NodeName& name, NodeDelegate* delegate)
|
|||
: name_(name), delegate_(this, delegate) {}
|
||||
|
||||
Node::~Node() {
|
||||
if (!ports_.empty())
|
||||
DLOG(WARNING) << "Unclean shutdown for node " << name_;
|
||||
if (!ports_.empty()) DLOG(WARNING) << "Unclean shutdown for node " << name_;
|
||||
}
|
||||
|
||||
bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
|
||||
|
@ -156,8 +154,7 @@ int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
|
|||
PortLocker::AssertNoPortsLockedOnCurrentThread();
|
||||
base::AutoLock lock(ports_lock_);
|
||||
auto iter = ports_.find(port_name);
|
||||
if (iter == ports_.end())
|
||||
return ERROR_PORT_UNKNOWN;
|
||||
if (iter == ports_.end()) return ERROR_PORT_UNKNOWN;
|
||||
|
||||
#if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
|
||||
// Workaround for https://crbug.com/665869.
|
||||
|
@ -174,8 +171,7 @@ int Node::CreateUninitializedPort(PortRef* port_ref) {
|
|||
|
||||
scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
|
||||
int rv = AddPortWithName(port_name, port);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
*port_ref = PortRef(port_name, std::move(port));
|
||||
return OK;
|
||||
|
@ -191,8 +187,7 @@ int Node::InitializePort(const PortRef& port_ref,
|
|||
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state != Port::kUninitialized)
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
if (port->state != Port::kUninitialized) return ERROR_PORT_STATE_UNEXPECTED;
|
||||
|
||||
port->state = Port::kReceiving;
|
||||
UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
|
||||
|
@ -208,20 +203,16 @@ int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
|
|||
int rv;
|
||||
|
||||
rv = CreateUninitializedPort(port0_ref);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
rv = CreateUninitializedPort(port1_ref);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
rv = InitializePort(*port0_ref, name_, port1_ref->name());
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
rv = InitializePort(*port1_ref, name_, port0_ref->name());
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
return OK;
|
||||
}
|
||||
|
@ -230,8 +221,7 @@ int Node::SetUserData(const PortRef& port_ref,
|
|||
scoped_refptr<UserData> user_data) {
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state == Port::kClosed)
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
if (port->state == Port::kClosed) return ERROR_PORT_STATE_UNEXPECTED;
|
||||
|
||||
port->user_data = std::move(user_data);
|
||||
|
||||
|
@ -242,8 +232,7 @@ int Node::GetUserData(const PortRef& port_ref,
|
|||
scoped_refptr<UserData>* user_data) {
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state == Port::kClosed)
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
if (port->state == Port::kClosed) return ERROR_PORT_STATE_UNEXPECTED;
|
||||
|
||||
*user_data = port->user_data;
|
||||
|
||||
|
@ -296,8 +285,7 @@ int Node::ClosePort(const PortRef& port_ref) {
|
|||
for (const auto& message : undelivered_messages) {
|
||||
for (size_t i = 0; i < message->num_ports(); ++i) {
|
||||
PortRef ref;
|
||||
if (GetPort(message->ports()[i], &ref) == OK)
|
||||
ClosePort(ref);
|
||||
if (GetPort(message->ports()[i], &ref) == OK) ClosePort(ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -307,8 +295,7 @@ int Node::ClosePort(const PortRef& port_ref) {
|
|||
int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state != Port::kReceiving)
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
if (port->state != Port::kReceiving) return ERROR_PORT_STATE_UNEXPECTED;
|
||||
|
||||
port_status->has_messages = port->message_queue.HasNextMessage();
|
||||
port_status->receiving_messages = CanAcceptMoreMessages(port);
|
||||
|
@ -339,13 +326,11 @@ int Node::GetMessage(const PortRef& port_ref,
|
|||
|
||||
// This could also be treated like the port being unknown since the
|
||||
// embedder should no longer be referring to a port that has been sent.
|
||||
if (port->state != Port::kReceiving)
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
if (port->state != Port::kReceiving) return ERROR_PORT_STATE_UNEXPECTED;
|
||||
|
||||
// Let the embedder get messages until there are no more before reporting
|
||||
// that the peer closed its end.
|
||||
if (!CanAcceptMoreMessages(port))
|
||||
return ERROR_PORT_PEER_CLOSED;
|
||||
if (!CanAcceptMoreMessages(port)) return ERROR_PORT_PEER_CLOSED;
|
||||
|
||||
port->message_queue.GetNextMessage(message, filter);
|
||||
if (*message &&
|
||||
|
@ -356,8 +341,7 @@ int Node::GetMessage(const PortRef& port_ref,
|
|||
}
|
||||
}
|
||||
|
||||
if (ack_event)
|
||||
delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
|
||||
if (ack_event) delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
|
||||
|
||||
// Allow referenced ports to trigger PortStatusChanged calls.
|
||||
if (*message) {
|
||||
|
@ -389,32 +373,27 @@ int Node::SendUserMessage(const PortRef& port_ref,
|
|||
// close the sending port itself if it happened to be one of the encoded
|
||||
// ports (an invalid but possible condition.)
|
||||
for (size_t i = 0; i < message->num_ports(); ++i) {
|
||||
if (message->ports()[i] == port_ref.name())
|
||||
continue;
|
||||
if (message->ports()[i] == port_ref.name()) continue;
|
||||
|
||||
PortRef port;
|
||||
if (GetPort(message->ports()[i], &port) == OK)
|
||||
ClosePort(port);
|
||||
if (GetPort(message->ports()[i], &port) == OK) ClosePort(port);
|
||||
}
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
int Node::SetAcknowledgeRequestInterval(
|
||||
const PortRef& port_ref,
|
||||
uint64_t sequence_num_acknowledge_interval) {
|
||||
const PortRef& port_ref, uint64_t sequence_num_acknowledge_interval) {
|
||||
NodeName peer_node_name;
|
||||
PortName peer_port_name;
|
||||
uint64_t sequence_num_to_request_ack = 0;
|
||||
{
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state != Port::kReceiving)
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
if (port->state != Port::kReceiving) return ERROR_PORT_STATE_UNEXPECTED;
|
||||
|
||||
port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
|
||||
if (!sequence_num_acknowledge_interval)
|
||||
return OK;
|
||||
if (!sequence_num_acknowledge_interval) return OK;
|
||||
|
||||
peer_node_name = port->peer_node_name;
|
||||
peer_port_name = port->peer_port_name;
|
||||
|
@ -514,8 +493,7 @@ int Node::OnUserMessage(std::unique_ptr<UserMessageEvent> message) {
|
|||
#if DCHECK_IS_ON()
|
||||
std::ostringstream ports_buf;
|
||||
for (size_t i = 0; i < message->num_ports(); ++i) {
|
||||
if (i > 0)
|
||||
ports_buf << ",";
|
||||
if (i > 0) ports_buf << ",";
|
||||
ports_buf << message->ports()[i];
|
||||
}
|
||||
|
||||
|
@ -539,8 +517,7 @@ int Node::OnUserMessage(std::unique_ptr<UserMessageEvent> message) {
|
|||
return ERROR_PORT_UNKNOWN;
|
||||
} else {
|
||||
int rv = AcceptPort(message->ports()[i], descriptor);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
// Ensure that the referring node is wiped out of this descriptor. This
|
||||
// allows the event to be forwarded across multiple local hops without
|
||||
|
@ -575,8 +552,7 @@ int Node::OnUserMessage(std::unique_ptr<UserMessageEvent> message) {
|
|||
|
||||
if (should_forward_messages) {
|
||||
int rv = ForwardUserMessagesFromProxy(port_ref);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
TryRemoveProxy(port_ref);
|
||||
}
|
||||
|
||||
|
@ -600,8 +576,7 @@ int Node::OnUserMessage(std::unique_ptr<UserMessageEvent> message) {
|
|||
|
||||
int Node::OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event) {
|
||||
PortRef port_ref;
|
||||
if (GetPort(event->port_name(), &port_ref) != OK)
|
||||
return ERROR_PORT_UNKNOWN;
|
||||
if (GetPort(event->port_name(), &port_ref) != OK) return ERROR_PORT_UNKNOWN;
|
||||
|
||||
#if DCHECK_IS_ON()
|
||||
{
|
||||
|
@ -753,8 +728,7 @@ int Node::OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event) {
|
|||
int Node::OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event) {
|
||||
// OK if the port doesn't exist, as it may have been closed already.
|
||||
PortRef port_ref;
|
||||
if (GetPort(event->port_name(), &port_ref) != OK)
|
||||
return OK;
|
||||
if (GetPort(event->port_name(), &port_ref) != OK) return OK;
|
||||
|
||||
// This message tells the port that it should no longer expect more messages
|
||||
// beyond last_sequence_num. This message is forwarded along until we reach
|
||||
|
@ -803,8 +777,7 @@ int Node::OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event) {
|
|||
// See about removing the port if it is a proxy as our peer won't be able
|
||||
// to participate in proxy removal.
|
||||
port->remove_proxy_on_last_message = true;
|
||||
if (port->state == Port::kProxying)
|
||||
try_remove_proxy = true;
|
||||
if (port->state == Port::kProxying) try_remove_proxy = true;
|
||||
}
|
||||
|
||||
DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
|
||||
|
@ -816,14 +789,12 @@ int Node::OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event) {
|
|||
peer_port_name = port->peer_port_name;
|
||||
}
|
||||
|
||||
if (try_remove_proxy)
|
||||
TryRemoveProxy(port_ref);
|
||||
if (try_remove_proxy) TryRemoveProxy(port_ref);
|
||||
|
||||
event->set_port_name(peer_port_name);
|
||||
delegate_->ForwardEvent(peer_node_name, std::move(event));
|
||||
|
||||
if (notify_delegate)
|
||||
delegate_->PortStatusChanged(port_ref);
|
||||
if (notify_delegate) delegate_->PortStatusChanged(port_ref);
|
||||
|
||||
return OK;
|
||||
}
|
||||
|
@ -845,8 +816,7 @@ int Node::OnMergePort(std::unique_ptr<MergePortEvent> event) {
|
|||
// first as otherwise its peer receiving port could be left stranded
|
||||
// indefinitely.
|
||||
if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
|
||||
if (port_ref.is_valid())
|
||||
ClosePort(port_ref);
|
||||
if (port_ref.is_valid()) ClosePort(port_ref);
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
}
|
||||
|
||||
|
@ -872,8 +842,7 @@ int Node::OnUserMessageReadAckRequest(
|
|||
DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
|
||||
<< event->sequence_num_to_acknowledge();
|
||||
|
||||
if (!port_ref.is_valid())
|
||||
return ERROR_PORT_UNKNOWN;
|
||||
if (!port_ref.is_valid()) return ERROR_PORT_UNKNOWN;
|
||||
|
||||
NodeName peer_node_name;
|
||||
std::unique_ptr<Event> event_to_send;
|
||||
|
@ -968,8 +937,7 @@ int Node::OnUserMessageReadAck(std::unique_ptr<UserMessageReadAckEvent> event) {
|
|||
if (ack_request_event)
|
||||
delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
|
||||
|
||||
if (port_ref.is_valid())
|
||||
delegate_->PortStatusChanged(port_ref);
|
||||
if (port_ref.is_valid()) delegate_->PortStatusChanged(port_ref);
|
||||
|
||||
return OK;
|
||||
}
|
||||
|
@ -994,8 +962,7 @@ void Node::ErasePort(const PortName& port_name) {
|
|||
{
|
||||
base::AutoLock lock(ports_lock_);
|
||||
auto it = ports_.find(port_name);
|
||||
if (it == ports_.end())
|
||||
return;
|
||||
if (it == ports_.end()) return;
|
||||
port = std::move(it->second);
|
||||
ports_.erase(it);
|
||||
|
||||
|
@ -1016,16 +983,14 @@ int Node::SendUserMessageInternal(const PortRef& port_ref,
|
|||
std::unique_ptr<UserMessageEvent>* message) {
|
||||
std::unique_ptr<UserMessageEvent>& m = *message;
|
||||
for (size_t i = 0; i < m->num_ports(); ++i) {
|
||||
if (m->ports()[i] == port_ref.name())
|
||||
return ERROR_PORT_CANNOT_SEND_SELF;
|
||||
if (m->ports()[i] == port_ref.name()) return ERROR_PORT_CANNOT_SEND_SELF;
|
||||
}
|
||||
|
||||
NodeName target_node;
|
||||
int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
|
||||
false /* ignore_closed_peer */, m.get(),
|
||||
&target_node);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
// Beyond this point there's no sense in returning anything but OK. Even if
|
||||
// message forwarding or acceptance fails, there's nothing the embedder can
|
||||
|
@ -1047,8 +1012,7 @@ int Node::SendUserMessageInternal(const PortRef& port_ref,
|
|||
return OK;
|
||||
}
|
||||
|
||||
int Node::MergePortsInternal(const PortRef& port0_ref,
|
||||
const PortRef& port1_ref,
|
||||
int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
|
||||
bool allow_close_on_bad_state) {
|
||||
const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
|
||||
{
|
||||
|
@ -1086,10 +1050,8 @@ int Node::MergePortsInternal(const PortRef& port0_ref,
|
|||
port1->state == Port::kReceiving || allow_close_on_bad_state;
|
||||
locker.reset();
|
||||
ports_locker.Release();
|
||||
if (close_port0)
|
||||
ClosePort(port0_ref);
|
||||
if (close_port1)
|
||||
ClosePort(port1_ref);
|
||||
if (close_port0) ClosePort(port0_ref);
|
||||
if (close_port1) ClosePort(port1_ref);
|
||||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
}
|
||||
|
||||
|
@ -1097,10 +1059,8 @@ int Node::MergePortsInternal(const PortRef& port0_ref,
|
|||
SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
|
||||
port0->state = Port::kProxying;
|
||||
port1->state = Port::kProxying;
|
||||
if (port0->peer_closed)
|
||||
port0->remove_proxy_on_last_message = true;
|
||||
if (port1->peer_closed)
|
||||
port1->remove_proxy_on_last_message = true;
|
||||
if (port0->peer_closed) port0->remove_proxy_on_last_message = true;
|
||||
if (port1->peer_closed) port1->remove_proxy_on_last_message = true;
|
||||
}
|
||||
|
||||
// Flush any queued messages from the new proxies and, if successful, complete
|
||||
|
@ -1160,8 +1120,7 @@ int Node::MergePortsInternal(const PortRef& port0_ref,
|
|||
return ERROR_PORT_STATE_UNEXPECTED;
|
||||
}
|
||||
|
||||
void Node::ConvertToProxy(Port* port,
|
||||
const NodeName& to_node_name,
|
||||
void Node::ConvertToProxy(Port* port, const NodeName& to_node_name,
|
||||
PortName* port_name,
|
||||
Event::PortDescriptor* port_descriptor) {
|
||||
port->AssertLockAcquired();
|
||||
|
@ -1177,8 +1136,7 @@ void Node::ConvertToProxy(Port* port,
|
|||
|
||||
// If we already know our peer is closed, we already know this proxy can
|
||||
// be removed once it receives and forwards its last expected message.
|
||||
if (port->peer_closed)
|
||||
port->remove_proxy_on_last_message = true;
|
||||
if (port->peer_closed) port->remove_proxy_on_last_message = true;
|
||||
|
||||
*port_name = new_port_name;
|
||||
|
||||
|
@ -1220,8 +1178,7 @@ int Node::AcceptPort(const PortName& port_name,
|
|||
port->message_queue.set_signalable(false);
|
||||
|
||||
int rv = AddPortWithName(port_name, std::move(port));
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
// Allow referring port to forward messages.
|
||||
delegate_->ForwardEvent(
|
||||
|
@ -1297,8 +1254,7 @@ int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
|
|||
#if DCHECK_IS_ON()
|
||||
std::ostringstream ports_buf;
|
||||
for (size_t i = 0; i < message->num_ports(); ++i) {
|
||||
if (i > 0)
|
||||
ports_buf << ",";
|
||||
if (i > 0) ports_buf << ",";
|
||||
ports_buf << message->ports()[i];
|
||||
}
|
||||
#endif
|
||||
|
@ -1379,8 +1335,7 @@ int Node::BeginProxying(const PortRef& port_ref) {
|
|||
}
|
||||
|
||||
int rv = ForwardUserMessagesFromProxy(port_ref);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
// Forward any pending acknowledge request.
|
||||
MaybeForwardAckRequest(port_ref);
|
||||
|
@ -1423,16 +1378,14 @@ int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
|
|||
{
|
||||
SinglePortLocker locker(&port_ref);
|
||||
locker.port()->message_queue.GetNextMessage(&message, nullptr);
|
||||
if (!message)
|
||||
break;
|
||||
if (!message) break;
|
||||
}
|
||||
|
||||
NodeName target_node;
|
||||
int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
|
||||
true /* ignore_closed_peer */,
|
||||
message.get(), &target_node);
|
||||
if (rv != OK)
|
||||
return rv;
|
||||
if (rv != OK) return rv;
|
||||
|
||||
delegate_->ForwardEvent(target_node, std::move(message));
|
||||
}
|
||||
|
@ -1470,8 +1423,7 @@ void Node::TryRemoveProxy(const PortRef& port_ref) {
|
|||
DCHECK(port->state == Port::kProxying);
|
||||
|
||||
// Make sure we have seen ObserveProxyAck before removing the port.
|
||||
if (!port->remove_proxy_on_last_message)
|
||||
return;
|
||||
if (!port->remove_proxy_on_last_message) return;
|
||||
|
||||
if (!CanAcceptMoreMessages(port)) {
|
||||
should_erase = true;
|
||||
|
@ -1485,8 +1437,7 @@ void Node::TryRemoveProxy(const PortRef& port_ref) {
|
|||
}
|
||||
}
|
||||
|
||||
if (should_erase)
|
||||
ErasePort(port_ref.name());
|
||||
if (should_erase) ErasePort(port_ref.name());
|
||||
|
||||
if (removal_event)
|
||||
delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
|
||||
|
@ -1507,8 +1458,7 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
|
|||
base::AutoLock ports_lock(ports_lock_);
|
||||
|
||||
auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
|
||||
if (node_peer_port_map_iter == peer_port_maps_.end())
|
||||
return;
|
||||
if (node_peer_port_map_iter == peer_port_maps_.end()) return;
|
||||
|
||||
auto& node_peer_port_map = node_peer_port_map_iter->second;
|
||||
auto peer_ports_begin = node_peer_port_map.begin();
|
||||
|
@ -1517,8 +1467,7 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
|
|||
// If |port_name| is given, we limit the set of local ports to the ones
|
||||
// with that specific port as their peer.
|
||||
peer_ports_begin = node_peer_port_map.find(port_name);
|
||||
if (peer_ports_begin == node_peer_port_map.end())
|
||||
return;
|
||||
if (peer_ports_begin == node_peer_port_map.end()) return;
|
||||
|
||||
peer_ports_end = peer_ports_begin;
|
||||
++peer_ports_end;
|
||||
|
@ -1572,8 +1521,7 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
|
|||
}
|
||||
|
||||
// Wake up any receiving ports who have just observed simulated peer closure.
|
||||
for (const auto& port : ports_to_notify)
|
||||
delegate_->PortStatusChanged(port);
|
||||
for (const auto& port : ports_to_notify) delegate_->PortStatusChanged(port);
|
||||
|
||||
for (const auto& proxy_name : dead_proxies_to_broadcast) {
|
||||
// Broadcast an event signifying that this proxy is no longer functioning.
|
||||
|
@ -1592,8 +1540,7 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
|
|||
for (const auto& message : undelivered_messages) {
|
||||
for (size_t i = 0; i < message->num_ports(); ++i) {
|
||||
PortRef ref;
|
||||
if (GetPort(message->ports()[i], &ref) == OK)
|
||||
ClosePort(ref);
|
||||
if (GetPort(message->ports()[i], &ref) == OK) ClosePort(ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1617,30 +1564,23 @@ void Node::UpdatePortPeerAddress(const PortName& local_port_name,
|
|||
|
||||
void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
|
||||
Port* local_port) {
|
||||
if (local_port->peer_port_name == kInvalidPortName)
|
||||
return;
|
||||
if (local_port->peer_port_name == kInvalidPortName) return;
|
||||
|
||||
auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
|
||||
if (node_iter == peer_port_maps_.end())
|
||||
return;
|
||||
if (node_iter == peer_port_maps_.end()) return;
|
||||
|
||||
auto& node_peer_port_map = node_iter->second;
|
||||
auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
|
||||
if (ports_iter == node_peer_port_map.end())
|
||||
return;
|
||||
if (ports_iter == node_peer_port_map.end()) return;
|
||||
|
||||
auto& local_ports_with_this_peer = ports_iter->second;
|
||||
local_ports_with_this_peer.erase(local_port_name);
|
||||
if (local_ports_with_this_peer.empty())
|
||||
node_peer_port_map.erase(ports_iter);
|
||||
if (node_peer_port_map.empty())
|
||||
peer_port_maps_.erase(node_iter);
|
||||
if (local_ports_with_this_peer.empty()) node_peer_port_map.erase(ports_iter);
|
||||
if (node_peer_port_map.empty()) peer_port_maps_.erase(node_iter);
|
||||
}
|
||||
|
||||
void Node::SwapPortPeers(const PortName& port0_name,
|
||||
Port* port0,
|
||||
const PortName& port1_name,
|
||||
Port* port1) {
|
||||
void Node::SwapPortPeers(const PortName& port0_name, Port* port0,
|
||||
const PortName& port1_name, Port* port1) {
|
||||
ports_lock_.AssertAcquired();
|
||||
port0->AssertLockAcquired();
|
||||
port1->AssertLockAcquired();
|
||||
|
@ -1666,11 +1606,9 @@ void Node::MaybeResendAckRequest(const PortRef& port_ref) {
|
|||
{
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state != Port::kReceiving)
|
||||
return;
|
||||
if (port->state != Port::kReceiving) return;
|
||||
|
||||
if (!port->sequence_num_acknowledge_interval)
|
||||
return;
|
||||
if (!port->sequence_num_acknowledge_interval) return;
|
||||
|
||||
peer_node_name = port->peer_node_name;
|
||||
ack_request_event = std::make_unique<UserMessageReadAckRequestEvent>(
|
||||
|
@ -1687,11 +1625,9 @@ void Node::MaybeForwardAckRequest(const PortRef& port_ref) {
|
|||
{
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state != Port::kProxying)
|
||||
return;
|
||||
if (port->state != Port::kProxying) return;
|
||||
|
||||
if (!port->sequence_num_to_acknowledge)
|
||||
return;
|
||||
if (!port->sequence_num_to_acknowledge) return;
|
||||
|
||||
peer_node_name = port->peer_node_name;
|
||||
ack_request_event = std::make_unique<UserMessageReadAckRequestEvent>(
|
||||
|
@ -1709,13 +1645,11 @@ void Node::MaybeResendAck(const PortRef& port_ref) {
|
|||
{
|
||||
SinglePortLocker locker(&port_ref);
|
||||
auto* port = locker.port();
|
||||
if (port->state != Port::kReceiving)
|
||||
return;
|
||||
if (port->state != Port::kReceiving) return;
|
||||
|
||||
uint64_t last_sequence_num_read =
|
||||
port->message_queue.next_sequence_num() - 1;
|
||||
if (!port->sequence_num_to_acknowledge || !last_sequence_num_read)
|
||||
return;
|
||||
if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) return;
|
||||
|
||||
peer_node_name = port->peer_node_name;
|
||||
ack_event = std::make_unique<UserMessageReadAckEvent>(
|
||||
|
|
|
@ -100,8 +100,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
|
|||
int CreateUninitializedPort(PortRef* port_ref);
|
||||
|
||||
// Initializes a newly created port.
|
||||
int InitializePort(const PortRef& port_ref,
|
||||
const NodeName& peer_node_name,
|
||||
int InitializePort(const PortRef& port_ref, const NodeName& peer_node_name,
|
||||
const PortName& peer_port_name);
|
||||
|
||||
// Generates a new connected pair of ports bound to this node. These ports
|
||||
|
@ -147,9 +146,8 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
|
|||
// in the |unacknowledged_message_count| field of PortStatus. This allows
|
||||
// bounding the number of unread and/or in-transit messages from this port
|
||||
// to its conjugate between zero and |unacknowledged_message_count|.
|
||||
int SetAcknowledgeRequestInterval(
|
||||
const PortRef& port_ref,
|
||||
uint64_t sequence_number_acknowledge_interval);
|
||||
int SetAcknowledgeRequestInterval(const PortRef& port_ref,
|
||||
uint64_t sequence_num_acknowledge_interval);
|
||||
|
||||
// Corresponding to NodeDelegate::ForwardEvent.
|
||||
int AcceptEvent(ScopedEvent event);
|
||||
|
@ -165,8 +163,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
|
|||
//
|
||||
// It is safe for the non-merged peers (A and D above) to be transferred,
|
||||
// closed, and/or written to before, during, or after the merge.
|
||||
int MergePorts(const PortRef& port_ref,
|
||||
const NodeName& destination_node_name,
|
||||
int MergePorts(const PortRef& port_ref, const NodeName& destination_node_name,
|
||||
const PortName& destination_port_name);
|
||||
|
||||
// Like above but merges two ports local to this node. Because both ports are
|
||||
|
@ -220,11 +217,9 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
|
|||
|
||||
int SendUserMessageInternal(const PortRef& port_ref,
|
||||
std::unique_ptr<UserMessageEvent>* message);
|
||||
int MergePortsInternal(const PortRef& port0_ref,
|
||||
const PortRef& port1_ref,
|
||||
int MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
|
||||
bool allow_close_on_bad_state);
|
||||
void ConvertToProxy(Port* port,
|
||||
const NodeName& to_node_name,
|
||||
void ConvertToProxy(Port* port, const NodeName& to_node_name,
|
||||
PortName* port_name,
|
||||
Event::PortDescriptor* port_descriptor);
|
||||
int AcceptPort(const PortName& port_name,
|
||||
|
@ -246,8 +241,7 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
|
|||
// |ports_lock_| MUST be held through the extent of this method.
|
||||
// |local_port|'s lock must be held if and only if a reference to |local_port|
|
||||
// exist in |ports_|.
|
||||
void UpdatePortPeerAddress(const PortName& local_port_name,
|
||||
Port* local_port,
|
||||
void UpdatePortPeerAddress(const PortName& local_port_name, Port* local_port,
|
||||
const NodeName& new_peer_node,
|
||||
const PortName& new_peer_port);
|
||||
|
||||
|
@ -258,10 +252,8 @@ class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
|
|||
// Swaps the peer information for two local ports. Used during port merges.
|
||||
// Note that |ports_lock_| must be held along with each of the two port's own
|
||||
// locks, through the extent of this method.
|
||||
void SwapPortPeers(const PortName& port0_name,
|
||||
Port* port0,
|
||||
const PortName& port1_name,
|
||||
Port* port1);
|
||||
void SwapPortPeers(const PortName& port0_name, Port* port0,
|
||||
const PortName& port1_name, Port* port1);
|
||||
|
||||
// Sends an acknowledge request to the peer if the port has a non-zero
|
||||
// |sequence_num_acknowledge_interval|. This needs to be done when the port's
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
#include "mojo/core/ports/port.h"
|
||||
|
||||
#if DCHECK_IS_ON()
|
||||
#include "base/threading/thread_local.h"
|
||||
# include "base/threading/thread_local.h"
|
||||
#endif
|
||||
|
||||
namespace mojo {
|
||||
|
|
|
@ -42,8 +42,7 @@ class PortLocker {
|
|||
// lock is held by this PortLocker.
|
||||
bool is_port_locked = false;
|
||||
for (size_t i = 0; i < num_ports_ && !is_port_locked; ++i)
|
||||
if (port_refs_[i]->port() == port_ref.port())
|
||||
is_port_locked = true;
|
||||
if (port_refs_[i]->port() == port_ref.port()) is_port_locked = true;
|
||||
DCHECK(is_port_locked);
|
||||
#endif
|
||||
return port_ref.port();
|
||||
|
|
|
@ -71,8 +71,7 @@ class MessageRouter {
|
|||
public:
|
||||
virtual ~MessageRouter() = default;
|
||||
|
||||
virtual void ForwardEvent(TestNode* from_node,
|
||||
const NodeName& node_name,
|
||||
virtual void ForwardEvent(TestNode* from_node, const NodeName& node_name,
|
||||
ScopedEvent event) = 0;
|
||||
virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
|
||||
};
|
||||
|
@ -142,22 +141,19 @@ class TestNode : public NodeDelegate {
|
|||
int SendMultipleMessages(const PortRef& port, size_t num_messages) {
|
||||
for (size_t i = 0; i < num_messages; ++i) {
|
||||
int result = SendStringMessage(port, "");
|
||||
if (result != OK)
|
||||
return result;
|
||||
if (result != OK) return result;
|
||||
}
|
||||
return OK;
|
||||
}
|
||||
|
||||
int SendStringMessageWithPort(const PortRef& port,
|
||||
const std::string& s,
|
||||
int SendStringMessageWithPort(const PortRef& port, const std::string& s,
|
||||
const PortName& sent_port_name) {
|
||||
auto event = NewUserMessageEvent(s, 1);
|
||||
event->ports()[0] = sent_port_name;
|
||||
return node_.SendUserMessage(port, std::move(event));
|
||||
}
|
||||
|
||||
int SendStringMessageWithPort(const PortRef& port,
|
||||
const std::string& s,
|
||||
int SendStringMessageWithPort(const PortRef& port, const std::string& s,
|
||||
const PortRef& sent_port) {
|
||||
return SendStringMessageWithPort(port, s, sent_port.name());
|
||||
}
|
||||
|
@ -179,8 +175,7 @@ class TestNode : public NodeDelegate {
|
|||
bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
|
||||
for (size_t i = 0; i < num_messages; ++i) {
|
||||
ScopedMessage message;
|
||||
if (!ReadMessage(port, &message))
|
||||
return false;
|
||||
if (!ReadMessage(port, &message)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -231,15 +226,13 @@ class TestNode : public NodeDelegate {
|
|||
void PortStatusChanged(const PortRef& port) override {
|
||||
// The port may be closed, in which case we ignore the notification.
|
||||
base::AutoLock lock(lock_);
|
||||
if (!save_messages_)
|
||||
return;
|
||||
if (!save_messages_) return;
|
||||
|
||||
for (;;) {
|
||||
ScopedMessage message;
|
||||
{
|
||||
base::AutoUnlock unlock(lock_);
|
||||
if (!ReadMessage(port, &message))
|
||||
break;
|
||||
if (!ReadMessage(port, &message)) break;
|
||||
}
|
||||
|
||||
saved_messages_.emplace(std::move(message));
|
||||
|
@ -247,8 +240,7 @@ class TestNode : public NodeDelegate {
|
|||
}
|
||||
|
||||
void ClosePortsInEvent(Event* event) {
|
||||
if (event->type() != Event::Type::kUserMessage)
|
||||
return;
|
||||
if (event->type() != Event::Type::kUserMessage) return;
|
||||
|
||||
UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
|
||||
for (size_t i = 0; i < message_event->num_ports(); ++i) {
|
||||
|
@ -260,8 +252,7 @@ class TestNode : public NodeDelegate {
|
|||
|
||||
uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
|
||||
PortStatus status;
|
||||
if (node_.GetStatus(port_ref, &status) != OK)
|
||||
return 0;
|
||||
if (node_.GetStatus(port_ref, &status) != OK) return 0;
|
||||
|
||||
return status.unacknowledged_message_count;
|
||||
}
|
||||
|
@ -272,8 +263,7 @@ class TestNode : public NodeDelegate {
|
|||
events_available_event_.Wait();
|
||||
base::AutoLock lock(lock_);
|
||||
|
||||
if (should_quit_)
|
||||
return;
|
||||
if (should_quit_) return;
|
||||
|
||||
dispatching_ = true;
|
||||
while (!incoming_events_.empty()) {
|
||||
|
@ -352,12 +342,10 @@ class PortsTest : public testing::Test, public MessageRouter {
|
|||
base::AutoLock global_lock(global_lock_);
|
||||
bool all_nodes_idle = true;
|
||||
for (const auto& entry : nodes_) {
|
||||
if (!entry.second->IsIdle())
|
||||
all_nodes_idle = false;
|
||||
if (!entry.second->IsIdle()) all_nodes_idle = false;
|
||||
entry.second->WakeUp();
|
||||
}
|
||||
if (all_nodes_idle)
|
||||
return;
|
||||
if (all_nodes_idle) return;
|
||||
|
||||
// Wait for any Node to signal that it's idle.
|
||||
base::AutoUnlock global_unlock(global_lock_);
|
||||
|
@ -368,9 +356,7 @@ class PortsTest : public testing::Test, public MessageRouter {
|
|||
}
|
||||
}
|
||||
|
||||
void CreatePortPair(TestNode* node0,
|
||||
PortRef* port0,
|
||||
TestNode* node1,
|
||||
void CreatePortPair(TestNode* node0, PortRef* port0, TestNode* node1,
|
||||
PortRef* port1) {
|
||||
if (node0 == node1) {
|
||||
EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
|
||||
|
@ -386,8 +372,7 @@ class PortsTest : public testing::Test, public MessageRouter {
|
|||
|
||||
private:
|
||||
// MessageRouter:
|
||||
void ForwardEvent(TestNode* from_node,
|
||||
const NodeName& node_name,
|
||||
void ForwardEvent(TestNode* from_node, const NodeName& node_name,
|
||||
ScopedEvent event) override {
|
||||
base::AutoLock global_lock(global_lock_);
|
||||
base::AutoLock lock(lock_);
|
||||
|
@ -431,14 +416,12 @@ class PortsTest : public testing::Test, public MessageRouter {
|
|||
base::AutoLock lock(lock_);
|
||||
|
||||
// Drop messages from nodes that have been removed.
|
||||
if (nodes_.find(from_node->name()) == nodes_.end())
|
||||
return;
|
||||
if (nodes_.find(from_node->name()) == nodes_.end()) return;
|
||||
|
||||
for (const auto& entry : nodes_) {
|
||||
TestNode* node = entry.second;
|
||||
// Broadcast doesn't deliver to the local node.
|
||||
if (node == from_node)
|
||||
continue;
|
||||
if (node == from_node) continue;
|
||||
node->EnqueueEvent(event->Clone());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,13 +12,9 @@ UserMessage::UserMessage(const TypeInfo* type_info) : type_info_(type_info) {}
|
|||
|
||||
UserMessage::~UserMessage() = default;
|
||||
|
||||
bool UserMessage::WillBeRoutedExternally() {
|
||||
return true;
|
||||
}
|
||||
bool UserMessage::WillBeRoutedExternally() { return true; }
|
||||
|
||||
size_t UserMessage::GetSizeIfSerialized() const {
|
||||
return 0;
|
||||
}
|
||||
size_t UserMessage::GetSizeIfSerialized() const { return 0; }
|
||||
|
||||
} // namespace ports
|
||||
} // namespace core
|
||||
|
|
Загрузка…
Ссылка в новой задаче