Bug 1706374 - Part 5a: Apply suggested clang-tidy fixes to the ports code, r=handyman

Mechanical change applying clang-tidy fixes

Differential Revision: https://phabricator.services.mozilla.com/D112769
This commit is contained in:
Nika Layzell 2021-06-21 21:53:07 +00:00
Родитель 12db5b7e19
Коммит 050548d334
5 изменённых файлов: 281 добавлений и 108 удалений

Просмотреть файл

@ -101,7 +101,9 @@ Event::~Event() = default;
// static
ScopedEvent Event::Deserialize(const void* buffer, size_t num_bytes) {
if (num_bytes < sizeof(SerializedHeader)) return nullptr;
if (num_bytes < sizeof(SerializedHeader)) {
return nullptr;
}
const auto* header = static_cast<const SerializedHeader*>(buffer);
const PortName& port_name = header->port_name;
@ -176,16 +178,22 @@ bool UserMessageEvent::NotifyWillBeRoutedExternally() {
ScopedEvent UserMessageEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(UserMessageEventData)) return nullptr;
if (num_bytes < sizeof(UserMessageEventData)) {
return nullptr;
}
const auto* data = static_cast<const UserMessageEventData*>(buffer);
mozilla::CheckedInt<size_t> port_data_size = data->num_ports;
port_data_size *= sizeof(PortDescriptor) + sizeof(PortName);
if (!port_data_size.isValid()) return nullptr;
if (!port_data_size.isValid()) {
return nullptr;
}
mozilla::CheckedInt<size_t> total_size = port_data_size.value();
total_size += sizeof(UserMessageEventData);
if (!total_size.isValid() || num_bytes < total_size.value()) return nullptr;
if (!total_size.isValid() || num_bytes < total_size.value()) {
return nullptr;
}
auto event =
mozilla::WrapUnique(new UserMessageEvent(port_name, data->sequence_num));
@ -206,7 +214,9 @@ UserMessageEvent::UserMessageEvent(const PortName& port_name,
: Event(Type::kUserMessage, port_name), sequence_num_(sequence_num) {}
size_t UserMessageEvent::GetSizeIfSerialized() const {
if (!message_) return 0;
if (!message_) {
return 0;
}
return message_->GetSizeIfSerialized();
}
@ -271,7 +281,9 @@ ObserveProxyEvent::~ObserveProxyEvent() = default;
ScopedEvent ObserveProxyEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(ObserveProxyEventData)) return nullptr;
if (num_bytes < sizeof(ObserveProxyEventData)) {
return nullptr;
}
const auto* data = static_cast<const ObserveProxyEventData*>(buffer);
return mozilla::MakeUnique<ObserveProxyEvent>(
@ -308,7 +320,9 @@ ObserveProxyAckEvent::~ObserveProxyAckEvent() = default;
ScopedEvent ObserveProxyAckEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(ObserveProxyAckEventData)) return nullptr;
if (num_bytes < sizeof(ObserveProxyAckEventData)) {
return nullptr;
}
const auto* data = static_cast<const ObserveProxyAckEventData*>(buffer);
return mozilla::MakeUnique<ObserveProxyAckEvent>(port_name,
@ -340,7 +354,9 @@ ObserveClosureEvent::~ObserveClosureEvent() = default;
ScopedEvent ObserveClosureEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(ObserveClosureEventData)) return nullptr;
if (num_bytes < sizeof(ObserveClosureEventData)) {
return nullptr;
}
const auto* data = static_cast<const ObserveClosureEventData*>(buffer);
return mozilla::MakeUnique<ObserveClosureEvent>(port_name,
@ -373,7 +389,9 @@ MergePortEvent::~MergePortEvent() = default;
// static
ScopedEvent MergePortEvent::Deserialize(const PortName& port_name,
const void* buffer, size_t num_bytes) {
if (num_bytes < sizeof(MergePortEventData)) return nullptr;
if (num_bytes < sizeof(MergePortEventData)) {
return nullptr;
}
const auto* data = static_cast<const MergePortEventData*>(buffer);
return mozilla::MakeUnique<MergePortEvent>(port_name, data->new_port_name,
@ -400,7 +418,9 @@ UserMessageReadAckRequestEvent::~UserMessageReadAckRequestEvent() = default;
// static
ScopedEvent UserMessageReadAckRequestEvent::Deserialize(
const PortName& port_name, const void* buffer, size_t num_bytes) {
if (num_bytes < sizeof(UserMessageReadAckRequestEventData)) return nullptr;
if (num_bytes < sizeof(UserMessageReadAckRequestEventData)) {
return nullptr;
}
const auto* data =
static_cast<const UserMessageReadAckRequestEventData*>(buffer);
@ -428,7 +448,9 @@ UserMessageReadAckEvent::~UserMessageReadAckEvent() = default;
ScopedEvent UserMessageReadAckEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(UserMessageReadAckEventData)) return nullptr;
if (num_bytes < sizeof(UserMessageReadAckEventData)) {
return nullptr;
}
const auto* data = static_cast<const UserMessageReadAckEventData*>(buffer);
return mozilla::MakeUnique<UserMessageReadAckEvent>(

Просмотреть файл

@ -15,7 +15,9 @@ const NodeName kInvalidNodeName = {0, 0};
std::ostream& operator<<(std::ostream& stream, const Name& name) {
std::ios::fmtflags flags(stream.flags());
stream << std::hex << std::uppercase << name.v1;
if (name.v2 != 0) stream << '.' << name.v2;
if (name.v2 != 0) {
stream << '.' << name.v2;
}
stream.flags(flags);
return stream;
}

Просмотреть файл

@ -38,12 +38,16 @@ bool CanAcceptMoreMessages(const Port* port) {
// Have we already doled out the last message (i.e., do we expect to NOT
// receive further messages)?
uint64_t next_sequence_num = port->message_queue.next_sequence_num();
if (port->state == Port::kClosed) return false;
if (port->state == Port::kClosed) {
return false;
}
if (port->peer_closed || port->remove_proxy_on_last_message) {
if (port->peer_lost_unexpectedly)
if (port->peer_lost_unexpectedly) {
return port->message_queue.HasNextMessage();
if (port->last_sequence_num_to_receive == next_sequence_num - 1)
}
if (port->last_sequence_num_to_receive == next_sequence_num - 1) {
return false;
}
}
return true;
}
@ -61,7 +65,9 @@ Node::Node(const NodeName& name, NodeDelegate* delegate)
: name_(name), delegate_(this, delegate) {}
Node::~Node() {
if (!ports_.empty()) DLOG(WARNING) << "Unclean shutdown for node " << name_;
if (!ports_.empty()) {
DLOG(WARNING) << "Unclean shutdown for node " << name_;
}
}
bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
@ -109,7 +115,9 @@ int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
PortLocker::AssertNoPortsLockedOnCurrentThread();
mozilla::MutexAutoLock lock(ports_lock_);
auto iter = ports_.find(port_name);
if (iter == ports_.end()) return ERROR_PORT_UNKNOWN;
if (iter == ports_.end()) {
return ERROR_PORT_UNKNOWN;
}
#if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
// Workaround for https://crbug.com/665869.
@ -126,7 +134,9 @@ int Node::CreateUninitializedPort(PortRef* port_ref) {
RefPtr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
int rv = AddPortWithName(port_name, port);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
*port_ref = PortRef(port_name, std::move(port));
return OK;
@ -142,7 +152,9 @@ int Node::InitializePort(const PortRef& port_ref,
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kUninitialized) return ERROR_PORT_STATE_UNEXPECTED;
if (port->state != Port::kUninitialized) {
return ERROR_PORT_STATE_UNEXPECTED;
}
port->state = Port::kReceiving;
UpdatePortPeerAddress(port_ref.name(), port, peer_node_name,
@ -158,16 +170,24 @@ int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
int rv;
rv = CreateUninitializedPort(port0_ref);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
rv = CreateUninitializedPort(port1_ref);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
rv = InitializePort(*port0_ref, name_, port1_ref->name());
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
rv = InitializePort(*port1_ref, name_, port0_ref->name());
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
return OK;
}
@ -175,7 +195,9 @@ int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed) return ERROR_PORT_STATE_UNEXPECTED;
if (port->state == Port::kClosed) {
return ERROR_PORT_STATE_UNEXPECTED;
}
port->user_data = std::move(user_data);
@ -185,7 +207,9 @@ int Node::SetUserData(const PortRef& port_ref, RefPtr<UserData> user_data) {
int Node::GetUserData(const PortRef& port_ref, RefPtr<UserData>* user_data) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state == Port::kClosed) return ERROR_PORT_STATE_UNEXPECTED;
if (port->state == Port::kClosed) {
return ERROR_PORT_STATE_UNEXPECTED;
}
*user_data = port->user_data;
@ -238,7 +262,9 @@ int Node::ClosePort(const PortRef& port_ref) {
for (const auto& message : undelivered_messages) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef ref;
if (GetPort(message->ports()[i], &ref) == OK) ClosePort(ref);
if (GetPort(message->ports()[i], &ref) == OK) {
ClosePort(ref);
}
}
}
}
@ -248,7 +274,9 @@ int Node::ClosePort(const PortRef& port_ref) {
int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving) return ERROR_PORT_STATE_UNEXPECTED;
if (port->state != Port::kReceiving) {
return ERROR_PORT_STATE_UNEXPECTED;
}
port_status->has_messages = port->message_queue.HasNextMessage();
port_status->receiving_messages = CanAcceptMoreMessages(port);
@ -279,11 +307,15 @@ int Node::GetMessage(const PortRef& port_ref,
// This could also be treated like the port being unknown since the
// embedder should no longer be referring to a port that has been sent.
if (port->state != Port::kReceiving) return ERROR_PORT_STATE_UNEXPECTED;
if (port->state != Port::kReceiving) {
return ERROR_PORT_STATE_UNEXPECTED;
}
// Let the embedder get messages until there are no more before reporting
// that the peer closed its end.
if (!CanAcceptMoreMessages(port)) return ERROR_PORT_PEER_CLOSED;
if (!CanAcceptMoreMessages(port)) {
return ERROR_PORT_PEER_CLOSED;
}
port->message_queue.GetNextMessage(message, filter);
if (*message &&
@ -294,7 +326,9 @@ int Node::GetMessage(const PortRef& port_ref,
}
}
if (ack_event) delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
if (ack_event) {
delegate_->ForwardEvent(peer_node_name, std::move(ack_event));
}
// Allow referenced ports to trigger PortStatusChanged calls.
if (*message) {
@ -326,10 +360,14 @@ int Node::SendUserMessage(const PortRef& port_ref,
// close the sending port itself if it happened to be one of the encoded
// ports (an invalid but possible condition.)
for (size_t i = 0; i < message->num_ports(); ++i) {
if (message->ports()[i] == port_ref.name()) continue;
if (message->ports()[i] == port_ref.name()) {
continue;
}
PortRef port;
if (GetPort(message->ports()[i], &port) == OK) ClosePort(port);
if (GetPort(message->ports()[i], &port) == OK) {
ClosePort(port);
}
}
}
return rv;
@ -343,10 +381,14 @@ int Node::SetAcknowledgeRequestInterval(
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving) return ERROR_PORT_STATE_UNEXPECTED;
if (port->state != Port::kReceiving) {
return ERROR_PORT_STATE_UNEXPECTED;
}
port->sequence_num_acknowledge_interval = sequence_num_acknowledge_interval;
if (!sequence_num_acknowledge_interval) return OK;
if (!sequence_num_acknowledge_interval) {
return OK;
}
peer_node_name = port->peer_node_name;
peer_port_name = port->peer_port_name;
@ -411,8 +453,9 @@ int Node::MergePorts(const PortRef& port_ref,
// Ensure that the locally retained peer of the new proxy gets a status
// update so it notices that its peer is now remote.
PortRef local_peer;
if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK)
if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK) {
delegate_->PortStatusChanged(local_peer);
}
}
delegate_->ForwardEvent(
@ -446,7 +489,9 @@ int Node::OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message) {
#ifdef DEBUG
std::ostringstream ports_buf;
for (size_t i = 0; i < message->num_ports(); ++i) {
if (i > 0) ports_buf << ",";
if (i > 0) {
ports_buf << ",";
}
ports_buf << message->ports()[i];
}
@ -466,11 +511,14 @@ int Node::OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message) {
// If the referring node name is invalid, this descriptor can be ignored
// and the port should already exist locally.
PortRef port_ref;
if (GetPort(message->ports()[i], &port_ref) != OK)
if (GetPort(message->ports()[i], &port_ref) != OK) {
return ERROR_PORT_UNKNOWN;
}
} else {
int rv = AcceptPort(message->ports()[i], descriptor);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
// Ensure that the referring node is wiped out of this descriptor. This
// allows the event to be forwarded across multiple local hops without
@ -505,7 +553,9 @@ int Node::OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message) {
if (should_forward_messages) {
int rv = ForwardUserMessagesFromProxy(port_ref);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
TryRemoveProxy(port_ref);
}
@ -529,7 +579,9 @@ int Node::OnUserMessage(mozilla::UniquePtr<UserMessageEvent> message) {
int Node::OnPortAccepted(mozilla::UniquePtr<PortAcceptedEvent> event) {
PortRef port_ref;
if (GetPort(event->port_name(), &port_ref) != OK) return ERROR_PORT_UNKNOWN;
if (GetPort(event->port_name(), &port_ref) != OK) {
return ERROR_PORT_UNKNOWN;
}
#ifdef DEBUG
{
@ -627,8 +679,9 @@ int Node::OnObserveProxy(mozilla::UniquePtr<ObserveProxyEvent> event) {
}
}
if (event_to_forward)
if (event_to_forward) {
delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
}
if (peer_changed) {
// Re-send ack and/or ack requests, as the previous peer proxy may not have
@ -647,15 +700,17 @@ int Node::OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event) {
<< " (last_sequence_num=" << event->last_sequence_num() << ")";
PortRef port_ref;
if (GetPort(event->port_name(), &port_ref) != OK)
if (GetPort(event->port_name(), &port_ref) != OK) {
return ERROR_PORT_UNKNOWN; // The port may have observed closure first.
}
bool try_remove_proxy_immediately;
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kProxying)
if (port->state != Port::kProxying) {
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
}
// If the last sequence number is invalid, this is a signal that we need to
// retransmit the ObserveProxy event for this port rather than flagging the
@ -670,10 +725,11 @@ int Node::OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event) {
}
}
if (try_remove_proxy_immediately)
if (try_remove_proxy_immediately) {
TryRemoveProxy(port_ref);
else
} else {
InitiateProxyRemoval(port_ref);
}
return OK;
}
@ -681,7 +737,9 @@ int Node::OnObserveProxyAck(mozilla::UniquePtr<ObserveProxyAckEvent> event) {
int Node::OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event) {
// OK if the port doesn't exist, as it may have been closed already.
PortRef port_ref;
if (GetPort(event->port_name(), &port_ref) != OK) return OK;
if (GetPort(event->port_name(), &port_ref) != OK) {
return OK;
}
// This message tells the port that it should no longer expect more messages
// beyond last_sequence_num. This message is forwarded along until we reach
@ -730,7 +788,9 @@ int Node::OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event) {
// See about removing the port if it is a proxy as our peer won't be able
// to participate in proxy removal.
port->remove_proxy_on_last_message = true;
if (port->state == Port::kProxying) try_remove_proxy = true;
if (port->state == Port::kProxying) {
try_remove_proxy = true;
}
}
DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
@ -742,12 +802,16 @@ int Node::OnObserveClosure(mozilla::UniquePtr<ObserveClosureEvent> event) {
peer_port_name = port->peer_port_name;
}
if (try_remove_proxy) TryRemoveProxy(port_ref);
if (try_remove_proxy) {
TryRemoveProxy(port_ref);
}
event->set_port_name(peer_port_name);
delegate_->ForwardEvent(peer_node_name, std::move(event));
if (notify_delegate) delegate_->PortStatusChanged(port_ref);
if (notify_delegate) {
delegate_->PortStatusChanged(port_ref);
}
return OK;
}
@ -769,7 +833,9 @@ int Node::OnMergePort(mozilla::UniquePtr<MergePortEvent> event) {
// first as otherwise its peer receiving port could be left stranded
// indefinitely.
if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
if (port_ref.is_valid()) ClosePort(port_ref);
if (port_ref.is_valid()) {
ClosePort(port_ref);
}
return ERROR_PORT_STATE_UNEXPECTED;
}
@ -778,7 +844,8 @@ int Node::OnMergePort(mozilla::UniquePtr<MergePortEvent> event) {
if (!port_ref.is_valid() && new_port_ref.is_valid()) {
ClosePort(new_port_ref);
return ERROR_PORT_UNKNOWN;
} else if (port_ref.is_valid() && !new_port_ref.is_valid()) {
}
if (port_ref.is_valid() && !new_port_ref.is_valid()) {
ClosePort(port_ref);
return ERROR_PORT_UNKNOWN;
}
@ -795,7 +862,9 @@ int Node::OnUserMessageReadAckRequest(
DVLOG(1) << "AckRequest " << port_ref.name() << "@" << name_ << " sequence "
<< event->sequence_num_to_acknowledge();
if (!port_ref.is_valid()) return ERROR_PORT_UNKNOWN;
if (!port_ref.is_valid()) {
return ERROR_PORT_UNKNOWN;
}
NodeName peer_node_name;
mozilla::UniquePtr<Event> event_to_send;
@ -823,8 +892,9 @@ int Node::OnUserMessageReadAckRequest(
// requesting acknowledge for an already read message. There may already
// have been a request for future reads, so take care not to back up
// the requested acknowledge counter.
if (current_sequence_num > port->sequence_num_to_acknowledge)
if (current_sequence_num > port->sequence_num_to_acknowledge) {
port->sequence_num_to_acknowledge = current_sequence_num;
}
} else {
// This is request to ack a sequence number that hasn't been read yet.
// The state of the port can either be that it already has a
@ -844,8 +914,9 @@ int Node::OnUserMessageReadAckRequest(
}
}
if (event_to_send)
if (event_to_send) {
delegate_->ForwardEvent(peer_node_name, std::move(event_to_send));
}
return OK;
}
@ -888,10 +959,13 @@ int Node::OnUserMessageReadAck(
port->sequence_num_acknowledge_interval);
}
}
if (ack_request_event)
if (ack_request_event) {
delegate_->ForwardEvent(peer_node_name, std::move(ack_request_event));
}
if (port_ref.is_valid()) delegate_->PortStatusChanged(port_ref);
if (port_ref.is_valid()) {
delegate_->PortStatusChanged(port_ref);
}
return OK;
}
@ -904,8 +978,9 @@ int Node::AddPortWithName(const PortName& port_name, RefPtr<Port> port) {
peer_port_maps_[port->peer_node_name][port->peer_port_name].emplace(
port_name, PortRef(port_name, port));
}
if (!ports_.emplace(port_name, std::move(port)).second)
if (!ports_.emplace(port_name, std::move(port)).second) {
return OOPS(ERROR_PORT_EXISTS); // Suggests a bad UUID generator.
}
DVLOG(2) << "Created port " << port_name << "@" << name_;
return OK;
}
@ -916,7 +991,9 @@ void Node::ErasePort(const PortName& port_name) {
{
mozilla::MutexAutoLock lock(ports_lock_);
auto it = ports_.find(port_name);
if (it == ports_.end()) return;
if (it == ports_.end()) {
return;
}
port = std::move(it->second);
ports_.erase(it);
@ -937,14 +1014,18 @@ int Node::SendUserMessageInternal(
const PortRef& port_ref, mozilla::UniquePtr<UserMessageEvent>* message) {
mozilla::UniquePtr<UserMessageEvent>& m = *message;
for (size_t i = 0; i < m->num_ports(); ++i) {
if (m->ports()[i] == port_ref.name()) return ERROR_PORT_CANNOT_SEND_SELF;
if (m->ports()[i] == port_ref.name()) {
return ERROR_PORT_CANNOT_SEND_SELF;
}
}
NodeName target_node;
int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
false /* ignore_closed_peer */, m.get(),
&target_node);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
// Beyond this point there's no sense in returning anything but OK. Even if
// message forwarding or acceptance fails, there's nothing the embedder can
@ -1005,8 +1086,12 @@ int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
port1->state == Port::kReceiving || allow_close_on_bad_state;
locker.reset();
ports_locker.reset();
if (close_port0) ClosePort(port0_ref);
if (close_port1) ClosePort(port1_ref);
if (close_port0) {
ClosePort(port0_ref);
}
if (close_port1) {
ClosePort(port1_ref);
}
return ERROR_PORT_STATE_UNEXPECTED;
}
@ -1014,20 +1099,24 @@ int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
SwapPortPeers(port0_ref.name(), port0, port1_ref.name(), port1);
port0->state = Port::kProxying;
port1->state = Port::kProxying;
if (port0->peer_closed) port0->remove_proxy_on_last_message = true;
if (port1->peer_closed) port1->remove_proxy_on_last_message = true;
if (port0->peer_closed) {
port0->remove_proxy_on_last_message = true;
}
if (port1->peer_closed) {
port1->remove_proxy_on_last_message = true;
}
}
// Flush any queued messages from the new proxies and, if successful, complete
// the merge by initiating proxy removals.
if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
ForwardUserMessagesFromProxy(port1_ref) == OK) {
for (size_t i = 0; i < 2; ++i) {
for (auto& port_ref : port_refs) {
bool try_remove_proxy_immediately = false;
ScopedEvent closure_event;
NodeName closure_event_target_node;
{
SinglePortLocker locker(port_refs[i]);
SinglePortLocker locker(port_ref);
auto* port = locker.port();
DCHECK(port->state == Port::kProxying);
try_remove_proxy_immediately = port->remove_proxy_on_last_message;
@ -1039,10 +1128,11 @@ int Node::MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref,
port->peer_port_name, port->last_sequence_num_to_receive);
}
}
if (try_remove_proxy_immediately)
TryRemoveProxy(*port_refs[i]);
else
InitiateProxyRemoval(*port_refs[i]);
if (try_remove_proxy_immediately) {
TryRemoveProxy(*port_ref);
} else {
InitiateProxyRemoval(*port_ref);
}
if (closure_event) {
delegate_->ForwardEvent(closure_event_target_node,
@ -1091,7 +1181,9 @@ void Node::ConvertToProxy(Port* port, const NodeName& to_node_name,
// If we already know our peer is closed, we already know this proxy can
// be removed once it receives and forwards its last expected message.
if (port->peer_closed) port->remove_proxy_on_last_message = true;
if (port->peer_closed) {
port->remove_proxy_on_last_message = true;
}
*port_name = new_port_name;
@ -1133,7 +1225,9 @@ int Node::AcceptPort(const PortName& port_name,
port->message_queue.set_signalable(false);
int rv = AddPortWithName(port_name, std::move(port));
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
// Allow referring port to forward messages.
delegate_->ForwardEvent(port_descriptor.referring_node_name,
@ -1198,19 +1292,24 @@ int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
}
target_is_remote = target_node_name != name_;
if (forwarding_port->state != expected_port_state)
if (forwarding_port->state != expected_port_state) {
return ERROR_PORT_STATE_UNEXPECTED;
if (forwarding_port->peer_closed && !ignore_closed_peer)
}
if (forwarding_port->peer_closed && !ignore_closed_peer) {
return ERROR_PORT_PEER_CLOSED;
}
// Messages may already have a sequence number if they're being forwarded by
// a proxy. Otherwise, use the next outgoing sequence number.
if (message->sequence_num() == 0)
if (message->sequence_num() == 0) {
message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
}
#ifdef DEBUG
std::ostringstream ports_buf;
for (size_t i = 0; i < message->num_ports(); ++i) {
if (i > 0) ports_buf << ",";
if (i > 0) {
ports_buf << ",";
}
ports_buf << message->ports()[i];
}
#endif
@ -1272,8 +1371,9 @@ int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
if (descriptor.peer_node_name == name_) {
PortRef local_peer;
if (GetPort(descriptor.peer_port_name, &local_peer) == OK)
if (GetPort(descriptor.peer_port_name, &local_peer) == OK) {
delegate_->PortStatusChanged(local_peer);
}
}
}
}
@ -1285,13 +1385,16 @@ int Node::BeginProxying(const PortRef& port_ref) {
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kBuffering)
if (port->state != Port::kBuffering) {
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
}
port->state = Port::kProxying;
}
int rv = ForwardUserMessagesFromProxy(port_ref);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
// Forward any pending acknowledge request.
MaybeForwardAckRequest(port_ref);
@ -1302,8 +1405,9 @@ int Node::BeginProxying(const PortRef& port_ref) {
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kProxying)
if (port->state != Port::kProxying) {
return OOPS(ERROR_PORT_STATE_UNEXPECTED);
}
try_remove_proxy_immediately = port->remove_proxy_on_last_message;
if (try_remove_proxy_immediately) {
@ -1334,14 +1438,18 @@ int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
{
SinglePortLocker locker(&port_ref);
locker.port()->message_queue.GetNextMessage(&message, nullptr);
if (!message) break;
if (!message) {
break;
}
}
NodeName target_node;
int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
true /* ignore_closed_peer */,
message.get(), &target_node);
if (rv != OK) return rv;
if (rv != OK) {
return rv;
}
delegate_->ForwardEvent(target_node, std::move(message));
}
@ -1379,7 +1487,9 @@ void Node::TryRemoveProxy(const PortRef& port_ref) {
DCHECK(port->state == Port::kProxying);
// Make sure we have seen ObserveProxyAck before removing the port.
if (!port->remove_proxy_on_last_message) return;
if (!port->remove_proxy_on_last_message) {
return;
}
if (!CanAcceptMoreMessages(port)) {
should_erase = true;
@ -1393,10 +1503,13 @@ void Node::TryRemoveProxy(const PortRef& port_ref) {
}
}
if (should_erase) ErasePort(port_ref.name());
if (should_erase) {
ErasePort(port_ref.name());
}
if (removal_event)
if (removal_event) {
delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
}
}
void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
@ -1414,7 +1527,9 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
mozilla::MutexAutoLock ports_lock(ports_lock_);
auto node_peer_port_map_iter = peer_port_maps_.find(node_name);
if (node_peer_port_map_iter == peer_port_maps_.end()) return;
if (node_peer_port_map_iter == peer_port_maps_.end()) {
return;
}
auto& node_peer_port_map = node_peer_port_map_iter->second;
auto peer_ports_begin = node_peer_port_map.begin();
@ -1423,7 +1538,9 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
// If |port_name| is given, we limit the set of local ports to the ones
// with that specific port as their peer.
peer_ports_begin = node_peer_port_map.find(port_name);
if (peer_ports_begin == node_peer_port_map.end()) return;
if (peer_ports_begin == node_peer_port_map.end()) {
return;
}
peer_ports_end = peer_ports_begin;
++peer_ports_end;
@ -1436,9 +1553,8 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
// relatively short-lived cases where more than one local port points to
// the same peer, and this only happens when extra ports are bypassed
// proxies waiting to be torn down.
for (auto local_port_iter = local_ports.begin();
local_port_iter != local_ports.end(); ++local_port_iter) {
auto& local_port_ref = local_port_iter->second;
for (auto& local_port : local_ports) {
auto& local_port_ref = local_port.second;
SinglePortLocker locker(&local_port_ref);
auto* port = locker.port();
@ -1450,8 +1566,9 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
port->peer_closed = true;
port->peer_lost_unexpectedly = true;
if (port->state == Port::kReceiving)
if (port->state == Port::kReceiving) {
ports_to_notify.push_back(local_port_ref);
}
}
// We don't expect to forward any further messages, and we don't
@ -1464,8 +1581,9 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
dead_proxies_to_broadcast.push_back(local_port_ref.name());
std::vector<mozilla::UniquePtr<UserMessageEvent>> messages;
port->message_queue.TakeAllMessages(&messages);
for (auto& message : messages)
for (auto& message : messages) {
undelivered_messages.emplace_back(std::move(message));
}
}
}
}
@ -1477,7 +1595,9 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
}
// Wake up any receiving ports who have just observed simulated peer closure.
for (const auto& port : ports_to_notify) delegate_->PortStatusChanged(port);
for (const auto& port : ports_to_notify) {
delegate_->PortStatusChanged(port);
}
for (const auto& proxy_name : dead_proxies_to_broadcast) {
// Broadcast an event signifying that this proxy is no longer functioning.
@ -1496,7 +1616,9 @@ void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
for (const auto& message : undelivered_messages) {
for (size_t i = 0; i < message->num_ports(); ++i) {
PortRef ref;
if (GetPort(message->ports()[i], &ref) == OK) ClosePort(ref);
if (GetPort(message->ports()[i], &ref) == OK) {
ClosePort(ref);
}
}
}
}
@ -1519,19 +1641,29 @@ void Node::UpdatePortPeerAddress(const PortName& local_port_name,
void Node::RemoveFromPeerPortMap(const PortName& local_port_name,
Port* local_port) {
if (local_port->peer_port_name == kInvalidPortName) return;
if (local_port->peer_port_name == kInvalidPortName) {
return;
}
auto node_iter = peer_port_maps_.find(local_port->peer_node_name);
if (node_iter == peer_port_maps_.end()) return;
if (node_iter == peer_port_maps_.end()) {
return;
}
auto& node_peer_port_map = node_iter->second;
auto ports_iter = node_peer_port_map.find(local_port->peer_port_name);
if (ports_iter == node_peer_port_map.end()) return;
if (ports_iter == node_peer_port_map.end()) {
return;
}
auto& local_ports_with_this_peer = ports_iter->second;
local_ports_with_this_peer.erase(local_port_name);
if (local_ports_with_this_peer.empty()) node_peer_port_map.erase(ports_iter);
if (node_peer_port_map.empty()) peer_port_maps_.erase(node_iter);
if (local_ports_with_this_peer.empty()) {
node_peer_port_map.erase(ports_iter);
}
if (node_peer_port_map.empty()) {
peer_port_maps_.erase(node_iter);
}
}
void Node::SwapPortPeers(const PortName& port0_name, Port* port0,
@ -1559,9 +1691,13 @@ void Node::MaybeResendAckRequest(const PortRef& port_ref) {
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving) return;
if (port->state != Port::kReceiving) {
return;
}
if (!port->sequence_num_acknowledge_interval) return;
if (!port->sequence_num_acknowledge_interval) {
return;
}
peer_node_name = port->peer_node_name;
ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
@ -1578,9 +1714,13 @@ void Node::MaybeForwardAckRequest(const PortRef& port_ref) {
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kProxying) return;
if (port->state != Port::kProxying) {
return;
}
if (!port->sequence_num_to_acknowledge) return;
if (!port->sequence_num_to_acknowledge) {
return;
}
peer_node_name = port->peer_node_name;
ack_request_event = mozilla::MakeUnique<UserMessageReadAckRequestEvent>(
@ -1598,11 +1738,15 @@ void Node::MaybeResendAck(const PortRef& port_ref) {
{
SinglePortLocker locker(&port_ref);
auto* port = locker.port();
if (port->state != Port::kReceiving) return;
if (port->state != Port::kReceiving) {
return;
}
uint64_t last_sequence_num_read =
port->message_queue.next_sequence_num() - 1;
if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) return;
if (!port->sequence_num_to_acknowledge || !last_sequence_num_read) {
return;
}
peer_node_name = port->peer_node_name;
ack_event = mozilla::MakeUnique<UserMessageReadAckEvent>(

Просмотреть файл

@ -48,7 +48,9 @@ PortLocker::PortLocker(const PortRef** port_refs, size_t num_ports)
}
PortLocker::~PortLocker() {
for (size_t i = 0; i < num_ports_; ++i) port_refs_[i]->port()->lock_.Unlock();
for (size_t i = 0; i < num_ports_; ++i) {
port_refs_[i]->port()->lock_.Unlock();
}
#ifdef DEBUG
UpdateTLS(this, nullptr);

Просмотреть файл

@ -44,8 +44,11 @@ class PortLocker {
// Sanity check when DEBUG is on to ensure this is actually a port whose
// lock is held by this PortLocker.
bool is_port_locked = false;
for (size_t i = 0; i < num_ports_ && !is_port_locked; ++i)
if (port_refs_[i]->port() == port_ref.port()) is_port_locked = true;
for (size_t i = 0; i < num_ports_ && !is_port_locked; ++i) {
if (port_refs_[i]->port() == port_ref.port()) {
is_port_locked = true;
}
}
DCHECK(is_port_locked);
#endif
return port_ref.port();