Bug 1706374 - Part 1: Import the mojo/core/ports directory from Chromium, r=handyman

This initial import does not build, and will be made to build in following
parts.

Differential Revision: https://phabricator.services.mozilla.com/D112765
This commit is contained in:
Nika Layzell 2021-06-22 18:17:17 +00:00
Родитель eaa7724180
Коммит 12a9501583
22 изменённых файлов: 5681 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,61 @@
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import("//build/config/compiler/compiler.gni")
import("//testing/test.gni")
component("ports") {
output_name = "mojo_core_ports"
sources = [
"event.cc",
"event.h",
"message_filter.h",
"message_queue.cc",
"message_queue.h",
"name.cc",
"name.h",
"node.cc",
"node.h",
"node_delegate.h",
"port.cc",
"port.h",
"port_locker.cc",
"port_locker.h",
"port_ref.cc",
"port_ref.h",
"user_data.h",
"user_message.cc",
"user_message.h",
]
defines = [ "IS_MOJO_CORE_PORTS_IMPL" ]
public_deps = [ "//base" ]
if (!is_nacl) {
deps = [ "//crypto" ]
}
if (!is_debug && !optimize_for_size) {
configs -= [ "//build/config/compiler:default_optimization" ]
configs += [ "//build/config/compiler:optimize_max" ]
}
}
source_set("tests") {
testonly = true
sources = [
"name_unittest.cc",
"ports_unittest.cc",
]
deps = [
":ports",
"//base",
"//base/test:test_support",
"//testing/gtest",
]
}

Просмотреть файл

@ -0,0 +1,468 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/event.h"
#include <stdint.h>
#include <string.h>
#include "base/logging.h"
#include "base/numerics/safe_math.h"
#include "mojo/core/ports/user_message.h"
namespace mojo {
namespace core {
namespace ports {
namespace {
const size_t kPortsMessageAlignment = 8;
#pragma pack(push, 1)
struct SerializedHeader {
Event::Type type;
uint32_t padding;
PortName port_name;
};
struct UserMessageEventData {
uint64_t sequence_num;
uint32_t num_ports;
uint32_t padding;
};
struct ObserveProxyEventData {
NodeName proxy_node_name;
PortName proxy_port_name;
NodeName proxy_target_node_name;
PortName proxy_target_port_name;
};
struct ObserveProxyAckEventData {
uint64_t last_sequence_num;
};
struct ObserveClosureEventData {
uint64_t last_sequence_num;
};
struct MergePortEventData {
PortName new_port_name;
Event::PortDescriptor new_port_descriptor;
};
struct UserMessageReadAckRequestEventData {
uint64_t sequence_num_to_acknowledge;
};
struct UserMessageReadAckEventData {
uint64_t sequence_num_acknowledged;
};
#pragma pack(pop)
static_assert(sizeof(Event::PortDescriptor) % kPortsMessageAlignment == 0,
"Invalid PortDescriptor size.");
static_assert(sizeof(SerializedHeader) % kPortsMessageAlignment == 0,
"Invalid SerializedHeader size.");
static_assert(sizeof(UserMessageEventData) % kPortsMessageAlignment == 0,
"Invalid UserEventData size.");
static_assert(sizeof(ObserveProxyEventData) % kPortsMessageAlignment == 0,
"Invalid ObserveProxyEventData size.");
static_assert(sizeof(ObserveProxyAckEventData) % kPortsMessageAlignment == 0,
"Invalid ObserveProxyAckEventData size.");
static_assert(sizeof(ObserveClosureEventData) % kPortsMessageAlignment == 0,
"Invalid ObserveClosureEventData size.");
static_assert(sizeof(MergePortEventData) % kPortsMessageAlignment == 0,
"Invalid MergePortEventData size.");
static_assert(sizeof(UserMessageReadAckRequestEventData) %
kPortsMessageAlignment ==
0,
"Invalid UserMessageReadAckRequestEventData size.");
static_assert(sizeof(UserMessageReadAckEventData) % kPortsMessageAlignment == 0,
"Invalid UserMessageReadAckEventData size.");
} // namespace
Event::PortDescriptor::PortDescriptor() {
memset(padding, 0, sizeof(padding));
}
Event::~Event() = default;
// static
ScopedEvent Event::Deserialize(const void* buffer, size_t num_bytes) {
if (num_bytes < sizeof(SerializedHeader))
return nullptr;
const auto* header = static_cast<const SerializedHeader*>(buffer);
const PortName& port_name = header->port_name;
const size_t data_size = num_bytes - sizeof(*header);
switch (header->type) {
case Type::kUserMessage:
return UserMessageEvent::Deserialize(port_name, header + 1, data_size);
case Type::kPortAccepted:
return PortAcceptedEvent::Deserialize(port_name, header + 1, data_size);
case Type::kObserveProxy:
return ObserveProxyEvent::Deserialize(port_name, header + 1, data_size);
case Type::kObserveProxyAck:
return ObserveProxyAckEvent::Deserialize(port_name, header + 1,
data_size);
case Type::kObserveClosure:
return ObserveClosureEvent::Deserialize(port_name, header + 1, data_size);
case Type::kMergePort:
return MergePortEvent::Deserialize(port_name, header + 1, data_size);
case Type::kUserMessageReadAckRequest:
return UserMessageReadAckRequestEvent::Deserialize(port_name, header + 1,
data_size);
case Type::kUserMessageReadAck:
return UserMessageReadAckEvent::Deserialize(port_name, header + 1,
data_size);
default:
DVLOG(2) << "Ingoring unknown port event type: "
<< static_cast<uint32_t>(header->type);
return nullptr;
}
}
Event::Event(Type type, const PortName& port_name)
: type_(type), port_name_(port_name) {}
size_t Event::GetSerializedSize() const {
return sizeof(SerializedHeader) + GetSerializedDataSize();
}
void Event::Serialize(void* buffer) const {
auto* header = static_cast<SerializedHeader*>(buffer);
header->type = type_;
header->padding = 0;
header->port_name = port_name_;
SerializeData(header + 1);
}
ScopedEvent Event::Clone() const {
return nullptr;
}
UserMessageEvent::~UserMessageEvent() = default;
UserMessageEvent::UserMessageEvent(size_t num_ports)
: Event(Type::kUserMessage, kInvalidPortName) {
ReservePorts(num_ports);
}
void UserMessageEvent::AttachMessage(std::unique_ptr<UserMessage> message) {
DCHECK(!message_);
message_ = std::move(message);
}
void UserMessageEvent::ReservePorts(size_t num_ports) {
port_descriptors_.resize(num_ports);
ports_.resize(num_ports);
}
bool UserMessageEvent::NotifyWillBeRoutedExternally() {
DCHECK(message_);
return message_->WillBeRoutedExternally();
}
// static
ScopedEvent UserMessageEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(UserMessageEventData))
return nullptr;
const auto* data = static_cast<const UserMessageEventData*>(buffer);
base::CheckedNumeric<size_t> port_data_size = data->num_ports;
port_data_size *= sizeof(PortDescriptor) + sizeof(PortName);
if (!port_data_size.IsValid())
return nullptr;
base::CheckedNumeric<size_t> total_size = port_data_size.ValueOrDie();
total_size += sizeof(UserMessageEventData);
if (!total_size.IsValid() || num_bytes < total_size.ValueOrDie())
return nullptr;
auto event =
base::WrapUnique(new UserMessageEvent(port_name, data->sequence_num));
event->ReservePorts(data->num_ports);
const auto* in_descriptors =
reinterpret_cast<const PortDescriptor*>(data + 1);
std::copy(in_descriptors, in_descriptors + data->num_ports,
event->port_descriptors());
const auto* in_names =
reinterpret_cast<const PortName*>(in_descriptors + data->num_ports);
std::copy(in_names, in_names + data->num_ports, event->ports());
return std::move(event);
}
UserMessageEvent::UserMessageEvent(const PortName& port_name,
uint64_t sequence_num)
: Event(Type::kUserMessage, port_name), sequence_num_(sequence_num) {}
size_t UserMessageEvent::GetSizeIfSerialized() const {
if (!message_)
return 0;
return message_->GetSizeIfSerialized();
}
size_t UserMessageEvent::GetSerializedDataSize() const {
DCHECK_EQ(ports_.size(), port_descriptors_.size());
base::CheckedNumeric<size_t> size = sizeof(UserMessageEventData);
base::CheckedNumeric<size_t> ports_size =
sizeof(PortDescriptor) + sizeof(PortName);
ports_size *= ports_.size();
return (size + ports_size.ValueOrDie()).ValueOrDie();
}
void UserMessageEvent::SerializeData(void* buffer) const {
DCHECK_EQ(ports_.size(), port_descriptors_.size());
auto* data = static_cast<UserMessageEventData*>(buffer);
data->sequence_num = sequence_num_;
DCHECK(base::IsValueInRangeForNumericType<uint32_t>(ports_.size()));
data->num_ports = static_cast<uint32_t>(ports_.size());
data->padding = 0;
auto* ports_data = reinterpret_cast<PortDescriptor*>(data + 1);
std::copy(port_descriptors_.begin(), port_descriptors_.end(), ports_data);
auto* port_names_data =
reinterpret_cast<PortName*>(ports_data + ports_.size());
std::copy(ports_.begin(), ports_.end(), port_names_data);
}
PortAcceptedEvent::PortAcceptedEvent(const PortName& port_name)
: Event(Type::kPortAccepted, port_name) {}
PortAcceptedEvent::~PortAcceptedEvent() = default;
// static
ScopedEvent PortAcceptedEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
return std::make_unique<PortAcceptedEvent>(port_name);
}
size_t PortAcceptedEvent::GetSerializedDataSize() const {
return 0;
}
void PortAcceptedEvent::SerializeData(void* buffer) const {}
ObserveProxyEvent::ObserveProxyEvent(const PortName& port_name,
const NodeName& proxy_node_name,
const PortName& proxy_port_name,
const NodeName& proxy_target_node_name,
const PortName& proxy_target_port_name)
: Event(Type::kObserveProxy, port_name),
proxy_node_name_(proxy_node_name),
proxy_port_name_(proxy_port_name),
proxy_target_node_name_(proxy_target_node_name),
proxy_target_port_name_(proxy_target_port_name) {}
ObserveProxyEvent::~ObserveProxyEvent() = default;
// static
ScopedEvent ObserveProxyEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(ObserveProxyEventData))
return nullptr;
const auto* data = static_cast<const ObserveProxyEventData*>(buffer);
return std::make_unique<ObserveProxyEvent>(
port_name, data->proxy_node_name, data->proxy_port_name,
data->proxy_target_node_name, data->proxy_target_port_name);
}
size_t ObserveProxyEvent::GetSerializedDataSize() const {
return sizeof(ObserveProxyEventData);
}
void ObserveProxyEvent::SerializeData(void* buffer) const {
auto* data = static_cast<ObserveProxyEventData*>(buffer);
data->proxy_node_name = proxy_node_name_;
data->proxy_port_name = proxy_port_name_;
data->proxy_target_node_name = proxy_target_node_name_;
data->proxy_target_port_name = proxy_target_port_name_;
}
ScopedEvent ObserveProxyEvent::Clone() const {
return std::make_unique<ObserveProxyEvent>(
port_name(), proxy_node_name_, proxy_port_name_, proxy_target_node_name_,
proxy_target_port_name_);
}
ObserveProxyAckEvent::ObserveProxyAckEvent(const PortName& port_name,
uint64_t last_sequence_num)
: Event(Type::kObserveProxyAck, port_name),
last_sequence_num_(last_sequence_num) {}
ObserveProxyAckEvent::~ObserveProxyAckEvent() = default;
// static
ScopedEvent ObserveProxyAckEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(ObserveProxyAckEventData))
return nullptr;
const auto* data = static_cast<const ObserveProxyAckEventData*>(buffer);
return std::make_unique<ObserveProxyAckEvent>(port_name,
data->last_sequence_num);
}
size_t ObserveProxyAckEvent::GetSerializedDataSize() const {
return sizeof(ObserveProxyAckEventData);
}
void ObserveProxyAckEvent::SerializeData(void* buffer) const {
auto* data = static_cast<ObserveProxyAckEventData*>(buffer);
data->last_sequence_num = last_sequence_num_;
}
ScopedEvent ObserveProxyAckEvent::Clone() const {
return std::make_unique<ObserveProxyAckEvent>(port_name(),
last_sequence_num_);
}
ObserveClosureEvent::ObserveClosureEvent(const PortName& port_name,
uint64_t last_sequence_num)
: Event(Type::kObserveClosure, port_name),
last_sequence_num_(last_sequence_num) {}
ObserveClosureEvent::~ObserveClosureEvent() = default;
// static
ScopedEvent ObserveClosureEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(ObserveClosureEventData))
return nullptr;
const auto* data = static_cast<const ObserveClosureEventData*>(buffer);
return std::make_unique<ObserveClosureEvent>(port_name,
data->last_sequence_num);
}
size_t ObserveClosureEvent::GetSerializedDataSize() const {
return sizeof(ObserveClosureEventData);
}
void ObserveClosureEvent::SerializeData(void* buffer) const {
auto* data = static_cast<ObserveClosureEventData*>(buffer);
data->last_sequence_num = last_sequence_num_;
}
ScopedEvent ObserveClosureEvent::Clone() const {
return std::make_unique<ObserveClosureEvent>(port_name(), last_sequence_num_);
}
MergePortEvent::MergePortEvent(const PortName& port_name,
const PortName& new_port_name,
const PortDescriptor& new_port_descriptor)
: Event(Type::kMergePort, port_name),
new_port_name_(new_port_name),
new_port_descriptor_(new_port_descriptor) {}
MergePortEvent::~MergePortEvent() = default;
// static
ScopedEvent MergePortEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(MergePortEventData))
return nullptr;
const auto* data = static_cast<const MergePortEventData*>(buffer);
return std::make_unique<MergePortEvent>(port_name, data->new_port_name,
data->new_port_descriptor);
}
size_t MergePortEvent::GetSerializedDataSize() const {
return sizeof(MergePortEventData);
}
void MergePortEvent::SerializeData(void* buffer) const {
auto* data = static_cast<MergePortEventData*>(buffer);
data->new_port_name = new_port_name_;
data->new_port_descriptor = new_port_descriptor_;
}
UserMessageReadAckRequestEvent::UserMessageReadAckRequestEvent(
const PortName& port_name,
uint64_t sequence_num_to_acknowledge)
: Event(Type::kUserMessageReadAckRequest, port_name),
sequence_num_to_acknowledge_(sequence_num_to_acknowledge) {
}
UserMessageReadAckRequestEvent::~UserMessageReadAckRequestEvent() = default;
// static
ScopedEvent UserMessageReadAckRequestEvent::Deserialize(
const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(UserMessageReadAckRequestEventData))
return nullptr;
const auto* data =
static_cast<const UserMessageReadAckRequestEventData*>(buffer);
return std::make_unique<UserMessageReadAckRequestEvent>(
port_name, data->sequence_num_to_acknowledge);
}
size_t UserMessageReadAckRequestEvent::GetSerializedDataSize() const {
return sizeof(UserMessageReadAckRequestEventData);
}
void UserMessageReadAckRequestEvent::SerializeData(void* buffer) const {
auto* data = static_cast<UserMessageReadAckRequestEventData*>(buffer);
data->sequence_num_to_acknowledge = sequence_num_to_acknowledge_;
}
UserMessageReadAckEvent::UserMessageReadAckEvent(
const PortName& port_name,
uint64_t sequence_num_acknowledged)
: Event(Type::kUserMessageReadAck, port_name),
sequence_num_acknowledged_(sequence_num_acknowledged) {
}
UserMessageReadAckEvent::~UserMessageReadAckEvent() = default;
// static
ScopedEvent UserMessageReadAckEvent::Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes) {
if (num_bytes < sizeof(UserMessageReadAckEventData))
return nullptr;
const auto* data = static_cast<const UserMessageReadAckEventData*>(buffer);
return std::make_unique<UserMessageReadAckEvent>(
port_name, data->sequence_num_acknowledged);
}
size_t UserMessageReadAckEvent::GetSerializedDataSize() const {
return sizeof(UserMessageReadAckEventData);
}
void UserMessageReadAckEvent::SerializeData(void* buffer) const {
auto* data = static_cast<UserMessageReadAckEventData*>(buffer);
data->sequence_num_acknowledged = sequence_num_acknowledged_;
}
} // namespace ports
} // namespace core
} // namespace mojo

Просмотреть файл

@ -0,0 +1,334 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_EVENT_H_
#define MOJO_CORE_PORTS_EVENT_H_
#include <stdint.h>
#include <vector>
#include "base/component_export.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "mojo/core/ports/name.h"
#include "mojo/core/ports/user_message.h"
namespace mojo {
namespace core {
namespace ports {
class Event;
using ScopedEvent = std::unique_ptr<Event>;
// A Event is the fundamental unit of operation and communication within and
// between Nodes.
class COMPONENT_EXPORT(MOJO_CORE_PORTS) Event {
public:
enum Type : uint32_t {
// A user message event contains arbitrary user-specified payload data
// which may include any number of ports and/or system handles (e.g. FDs).
kUserMessage,
// When a Node receives a user message with one or more ports attached, it
// sends back an instance of this event for every attached port to indicate
// that the port has been accepted by its destination node.
kPortAccepted,
// This event begins circulation any time a port enters a proxying state. It
// may be re-circulated in certain edge cases, but the ultimate purpose of
// the event is to ensure that every port along a route is (if necessary)
// aware that the proxying port is indeed proxying (and to where) so that it
// can begin to be bypassed along the route.
kObserveProxy,
// An event used to acknowledge to a proxy that all concerned nodes and
// ports are aware of its proxying state and that no more user messages will
// be routed to it beyond a given final sequence number.
kObserveProxyAck,
// Indicates that a port has been closed. This event fully circulates a
// route to ensure that all ports are aware of closure.
kObserveClosure,
// Used to request the merging of two routes via two sacrificial receiving
// ports, one from each route.
kMergePort,
// Used to request that the conjugate port acknowledges read messages by
// sending back a UserMessageReadAck.
kUserMessageReadAckRequest,
// Used to acknowledge read messages to the conjugate.
kUserMessageReadAck,
};
#pragma pack(push, 1)
struct PortDescriptor {
PortDescriptor();
NodeName peer_node_name;
PortName peer_port_name;
NodeName referring_node_name;
PortName referring_port_name;
uint64_t next_sequence_num_to_send;
uint64_t next_sequence_num_to_receive;
uint64_t last_sequence_num_to_receive;
bool peer_closed;
char padding[7];
};
#pragma pack(pop)
virtual ~Event();
static ScopedEvent Deserialize(const void* buffer, size_t num_bytes);
template <typename T>
static std::unique_ptr<T> Cast(ScopedEvent* event) {
return base::WrapUnique(static_cast<T*>(event->release()));
}
Type type() const { return type_; }
const PortName& port_name() const { return port_name_; }
void set_port_name(const PortName& port_name) { port_name_ = port_name; }
size_t GetSerializedSize() const;
void Serialize(void* buffer) const;
virtual ScopedEvent Clone() const;
protected:
Event(Type type, const PortName& port_name);
virtual size_t GetSerializedDataSize() const = 0;
virtual void SerializeData(void* buffer) const = 0;
private:
const Type type_;
PortName port_name_;
DISALLOW_COPY_AND_ASSIGN(Event);
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) UserMessageEvent : public Event {
public:
explicit UserMessageEvent(size_t num_ports);
~UserMessageEvent() override;
bool HasMessage() const { return !!message_; }
void AttachMessage(std::unique_ptr<UserMessage> message);
template <typename T>
T* GetMessage() {
DCHECK(HasMessage());
DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info());
return static_cast<T*>(message_.get());
}
template <typename T>
const T* GetMessage() const {
DCHECK(HasMessage());
DCHECK_EQ(&T::kUserMessageTypeInfo, message_->type_info());
return static_cast<const T*>(message_.get());
}
void ReservePorts(size_t num_ports);
bool NotifyWillBeRoutedExternally();
uint64_t sequence_num() const { return sequence_num_; }
void set_sequence_num(uint64_t sequence_num) { sequence_num_ = sequence_num; }
size_t num_ports() const { return ports_.size(); }
PortDescriptor* port_descriptors() { return port_descriptors_.data(); }
PortName* ports() { return ports_.data(); }
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
size_t GetSizeIfSerialized() const;
private:
UserMessageEvent(const PortName& port_name, uint64_t sequence_num);
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
uint64_t sequence_num_ = 0;
std::vector<PortDescriptor> port_descriptors_;
std::vector<PortName> ports_;
std::unique_ptr<UserMessage> message_;
DISALLOW_COPY_AND_ASSIGN(UserMessageEvent);
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) PortAcceptedEvent : public Event {
public:
explicit PortAcceptedEvent(const PortName& port_name);
~PortAcceptedEvent() override;
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
DISALLOW_COPY_AND_ASSIGN(PortAcceptedEvent);
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveProxyEvent : public Event {
public:
ObserveProxyEvent(const PortName& port_name,
const NodeName& proxy_node_name,
const PortName& proxy_port_name,
const NodeName& proxy_target_node_name,
const PortName& proxy_target_port_name);
~ObserveProxyEvent() override;
const NodeName& proxy_node_name() const { return proxy_node_name_; }
const PortName& proxy_port_name() const { return proxy_port_name_; }
const NodeName& proxy_target_node_name() const {
return proxy_target_node_name_;
}
const PortName& proxy_target_port_name() const {
return proxy_target_port_name_;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
ScopedEvent Clone() const override;
const NodeName proxy_node_name_;
const PortName proxy_port_name_;
const NodeName proxy_target_node_name_;
const PortName proxy_target_port_name_;
DISALLOW_COPY_AND_ASSIGN(ObserveProxyEvent);
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveProxyAckEvent : public Event {
public:
ObserveProxyAckEvent(const PortName& port_name, uint64_t last_sequence_num);
~ObserveProxyAckEvent() override;
uint64_t last_sequence_num() const { return last_sequence_num_; }
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
ScopedEvent Clone() const override;
const uint64_t last_sequence_num_;
DISALLOW_COPY_AND_ASSIGN(ObserveProxyAckEvent);
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) ObserveClosureEvent : public Event {
public:
ObserveClosureEvent(const PortName& port_name, uint64_t last_sequence_num);
~ObserveClosureEvent() override;
uint64_t last_sequence_num() const { return last_sequence_num_; }
void set_last_sequence_num(uint64_t last_sequence_num) {
last_sequence_num_ = last_sequence_num;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
ScopedEvent Clone() const override;
uint64_t last_sequence_num_;
DISALLOW_COPY_AND_ASSIGN(ObserveClosureEvent);
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) MergePortEvent : public Event {
public:
MergePortEvent(const PortName& port_name,
const PortName& new_port_name,
const PortDescriptor& new_port_descriptor);
~MergePortEvent() override;
const PortName& new_port_name() const { return new_port_name_; }
const PortDescriptor& new_port_descriptor() const {
return new_port_descriptor_;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
const PortName new_port_name_;
const PortDescriptor new_port_descriptor_;
DISALLOW_COPY_AND_ASSIGN(MergePortEvent);
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) UserMessageReadAckRequestEvent
: public Event {
public:
UserMessageReadAckRequestEvent(const PortName& port_name,
uint64_t sequence_num_to_acknowledge);
~UserMessageReadAckRequestEvent() override;
uint64_t sequence_num_to_acknowledge() const {
return sequence_num_to_acknowledge_;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
uint64_t sequence_num_to_acknowledge_;
};
class COMPONENT_EXPORT(MOJO_CORE_PORTS) UserMessageReadAckEvent : public Event {
public:
UserMessageReadAckEvent(const PortName& port_name,
uint64_t sequence_num_acknowledged);
~UserMessageReadAckEvent() override;
uint64_t sequence_num_acknowledged() const {
return sequence_num_acknowledged_;
}
static ScopedEvent Deserialize(const PortName& port_name,
const void* buffer,
size_t num_bytes);
private:
size_t GetSerializedDataSize() const override;
void SerializeData(void* buffer) const override;
uint64_t sequence_num_acknowledged_;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_EVENT_H_

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_MESSAGE_FILTER_H_
#define MOJO_CORE_PORTS_MESSAGE_FILTER_H_
namespace mojo {
namespace core {
namespace ports {
class UserMessageEvent;
// An interface which can be implemented to user message events according to
// arbitrary policy.
class MessageFilter {
public:
virtual ~MessageFilter() = default;
// Returns true if |message| should be accepted by whomever is applying this
// filter. See MessageQueue::GetNextMessage(), for example.
virtual bool Match(const UserMessageEvent& message) = 0;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_MESSAGE_FILTER_H_

Просмотреть файл

@ -0,0 +1,93 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/message_queue.h"
#include <algorithm>
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "mojo/core/ports/message_filter.h"
namespace mojo {
namespace core {
namespace ports {
// Used by std::{push,pop}_heap functions
inline bool operator<(const std::unique_ptr<UserMessageEvent>& a,
const std::unique_ptr<UserMessageEvent>& b) {
return a->sequence_num() > b->sequence_num();
}
MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
MessageQueue::MessageQueue(uint64_t next_sequence_num)
: next_sequence_num_(next_sequence_num) {
// The message queue is blocked waiting for a message with sequence number
// equal to |next_sequence_num|.
}
MessageQueue::~MessageQueue() {
#if DCHECK_IS_ON()
size_t num_leaked_ports = 0;
for (const auto& message : heap_)
num_leaked_ports += message->num_ports();
DVLOG_IF(1, num_leaked_ports > 0)
<< "Leaking " << num_leaked_ports << " ports in unreceived messages";
#endif
}
bool MessageQueue::HasNextMessage() const {
return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_;
}
void MessageQueue::GetNextMessage(std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter) {
if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) {
message->reset();
return;
}
std::pop_heap(heap_.begin(), heap_.end());
*message = std::move(heap_.back());
total_queued_bytes_ -= (*message)->GetSizeIfSerialized();
heap_.pop_back();
// We keep the capacity of |heap_| in check so that a large batch of incoming
// messages doesn't permanently wreck available memory. The choice of interval
// here is somewhat arbitrary.
constexpr size_t kHeapMinimumShrinkSize = 16;
constexpr size_t kHeapShrinkInterval = 512;
if (UNLIKELY(heap_.size() > kHeapMinimumShrinkSize &&
heap_.size() % kHeapShrinkInterval == 0)) {
heap_.shrink_to_fit();
}
next_sequence_num_++;
}
void MessageQueue::AcceptMessage(std::unique_ptr<UserMessageEvent> message,
bool* has_next_message) {
// TODO: Handle sequence number roll-over.
total_queued_bytes_ += message->GetSizeIfSerialized();
heap_.emplace_back(std::move(message));
std::push_heap(heap_.begin(), heap_.end());
if (!signalable_) {
*has_next_message = false;
} else {
*has_next_message = (heap_[0]->sequence_num() == next_sequence_num_);
}
}
void MessageQueue::TakeAllMessages(
std::vector<std::unique_ptr<UserMessageEvent>>* messages) {
*messages = std::move(heap_);
total_queued_bytes_ = 0;
}
} // namespace ports
} // namespace core
} // namespace mojo

Просмотреть файл

@ -0,0 +1,86 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_MESSAGE_QUEUE_H_
#define MOJO_CORE_PORTS_MESSAGE_QUEUE_H_
#include <stdint.h>
#include <limits>
#include <memory>
#include <vector>
#include "base/component_export.h"
#include "base/macros.h"
#include "mojo/core/ports/event.h"
namespace mojo {
namespace core {
namespace ports {
constexpr uint64_t kInitialSequenceNum = 1;
constexpr uint64_t kInvalidSequenceNum = std::numeric_limits<uint64_t>::max();
class MessageFilter;
// An incoming message queue for a port. MessageQueue keeps track of the highest
// known sequence number and can indicate whether the next sequential message is
// available. Thus the queue enforces message ordering for the consumer without
// enforcing it for the producer (see AcceptMessage() below.)
class COMPONENT_EXPORT(MOJO_CORE_PORTS) MessageQueue {
public:
explicit MessageQueue();
explicit MessageQueue(uint64_t next_sequence_num);
~MessageQueue();
void set_signalable(bool value) { signalable_ = value; }
uint64_t next_sequence_num() const { return next_sequence_num_; }
bool HasNextMessage() const;
// Gives ownership of the message. If |filter| is non-null, the next message
// will only be retrieved if the filter successfully matches it.
void GetNextMessage(std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter);
// Takes ownership of the message. Note: Messages are ordered, so while we
// have added a message to the queue, we may still be waiting on a message
// ahead of this one before we can let any of the messages be returned by
// GetNextMessage.
//
// Furthermore, once has_next_message is set to true, it will remain false
// until GetNextMessage is called enough times to return a null message.
// In other words, has_next_message acts like an edge trigger.
//
void AcceptMessage(std::unique_ptr<UserMessageEvent> message,
bool* has_next_message);
// Takes all messages from this queue. Used to safely destroy queued messages
// without holding any Port lock.
void TakeAllMessages(
std::vector<std::unique_ptr<UserMessageEvent>>* messages);
// The number of messages queued here, regardless of whether the next expected
// message has arrived yet.
size_t queued_message_count() const { return heap_.size(); }
// The aggregate memory size in bytes of all messages queued here, regardless
// of whether the next expected message has arrived yet.
size_t queued_num_bytes() const { return total_queued_bytes_; }
private:
std::vector<std::unique_ptr<UserMessageEvent>> heap_;
uint64_t next_sequence_num_;
bool signalable_ = true;
size_t total_queued_bytes_ = 0;
DISALLOW_COPY_AND_ASSIGN(MessageQueue);
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_MESSAGE_QUEUE_H_

Просмотреть файл

@ -0,0 +1,26 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/name.h"
namespace mojo {
namespace core {
namespace ports {
const PortName kInvalidPortName = {0, 0};
const NodeName kInvalidNodeName = {0, 0};
std::ostream& operator<<(std::ostream& stream, const Name& name) {
std::ios::fmtflags flags(stream.flags());
stream << std::hex << std::uppercase << name.v1;
if (name.v2 != 0)
stream << '.' << name.v2;
stream.flags(flags);
return stream;
}
} // namespace ports
} // namespace core
} // namespace mojo

Просмотреть файл

@ -0,0 +1,76 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_NAME_H_
#define MOJO_CORE_PORTS_NAME_H_
#include <stdint.h>
#include <ostream>
#include <tuple>
#include "base/component_export.h"
#include "base/hash/hash.h"
namespace mojo {
namespace core {
namespace ports {
struct COMPONENT_EXPORT(MOJO_CORE_PORTS) Name {
Name(uint64_t v1, uint64_t v2) : v1(v1), v2(v2) {}
uint64_t v1, v2;
};
inline bool operator==(const Name& a, const Name& b) {
return a.v1 == b.v1 && a.v2 == b.v2;
}
inline bool operator!=(const Name& a, const Name& b) {
return !(a == b);
}
inline bool operator<(const Name& a, const Name& b) {
return std::tie(a.v1, a.v2) < std::tie(b.v1, b.v2);
}
COMPONENT_EXPORT(MOJO_CORE_PORTS)
std::ostream& operator<<(std::ostream& stream, const Name& name);
struct COMPONENT_EXPORT(MOJO_CORE_PORTS) PortName : Name {
PortName() : Name(0, 0) {}
PortName(uint64_t v1, uint64_t v2) : Name(v1, v2) {}
};
extern COMPONENT_EXPORT(MOJO_CORE_PORTS) const PortName kInvalidPortName;
struct COMPONENT_EXPORT(MOJO_CORE_PORTS) NodeName : Name {
NodeName() : Name(0, 0) {}
NodeName(uint64_t v1, uint64_t v2) : Name(v1, v2) {}
};
extern COMPONENT_EXPORT(MOJO_CORE_PORTS) const NodeName kInvalidNodeName;
} // namespace ports
} // namespace core
} // namespace mojo
namespace std {
template <>
struct COMPONENT_EXPORT(MOJO_CORE_PORTS) hash<mojo::core::ports::PortName> {
std::size_t operator()(const mojo::core::ports::PortName& name) const {
return base::HashInts64(name.v1, name.v2);
}
};
template <>
struct COMPONENT_EXPORT(MOJO_CORE_PORTS) hash<mojo::core::ports::NodeName> {
std::size_t operator()(const mojo::core::ports::NodeName& name) const {
return base::HashInts64(name.v1, name.v2);
}
};
} // namespace std
#endif // MOJO_CORE_PORTS_NAME_H_

Просмотреть файл

@ -0,0 +1,75 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/name.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace mojo {
namespace core {
namespace ports {
namespace test {
TEST(NameTest, Defaults) {
PortName default_port_name;
EXPECT_EQ(kInvalidPortName, default_port_name);
NodeName default_node_name;
EXPECT_EQ(kInvalidNodeName, default_node_name);
}
TEST(NameTest, PortNameChecks) {
PortName port_name_a(50, 100);
PortName port_name_b(50, 100);
PortName port_name_c(100, 50);
EXPECT_EQ(port_name_a, port_name_b);
EXPECT_NE(port_name_a, port_name_c);
EXPECT_NE(port_name_b, port_name_c);
EXPECT_LT(port_name_a, port_name_c);
EXPECT_LT(port_name_b, port_name_c);
EXPECT_FALSE(port_name_a < port_name_b);
EXPECT_FALSE(port_name_b < port_name_a);
std::hash<PortName> port_hash_fn;
size_t hash_a = port_hash_fn(port_name_a);
size_t hash_b = port_hash_fn(port_name_b);
size_t hash_c = port_hash_fn(port_name_c);
EXPECT_EQ(hash_a, hash_b);
EXPECT_NE(hash_a, hash_c);
EXPECT_NE(hash_b, hash_c);
}
TEST(NameTest, NodeNameChecks) {
NodeName node_name_a(50, 100);
NodeName node_name_b(50, 100);
NodeName node_name_c(100, 50);
EXPECT_EQ(node_name_a, node_name_b);
EXPECT_NE(node_name_a, node_name_c);
EXPECT_NE(node_name_b, node_name_c);
EXPECT_LT(node_name_a, node_name_c);
EXPECT_LT(node_name_b, node_name_c);
EXPECT_FALSE(node_name_a < node_name_b);
EXPECT_FALSE(node_name_b < node_name_a);
std::hash<NodeName> node_hash_fn;
size_t hash_a = node_hash_fn(node_name_a);
size_t hash_b = node_hash_fn(node_name_b);
size_t hash_c = node_hash_fn(node_name_c);
EXPECT_EQ(hash_a, hash_b);
EXPECT_NE(hash_a, hash_c);
EXPECT_NE(hash_b, hash_c);
}
} // namespace test
} // namespace ports
} // namespace core
} // namespace mojo

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,321 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_NODE_H_
#define MOJO_CORE_PORTS_NODE_H_
#include <stddef.h>
#include <stdint.h>
#include <queue>
#include <unordered_map>
#include "base/component_export.h"
#include "base/containers/flat_map.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/name.h"
#include "mojo/core/ports/port.h"
#include "mojo/core/ports/port_ref.h"
#include "mojo/core/ports/user_data.h"
namespace mojo {
namespace core {
namespace ports {
enum : int {
OK = 0,
ERROR_PORT_UNKNOWN = -10,
ERROR_PORT_EXISTS = -11,
ERROR_PORT_STATE_UNEXPECTED = -12,
ERROR_PORT_CANNOT_SEND_SELF = -13,
ERROR_PORT_PEER_CLOSED = -14,
ERROR_PORT_CANNOT_SEND_PEER = -15,
ERROR_NOT_IMPLEMENTED = -100,
};
struct PortStatus {
bool has_messages;
bool receiving_messages;
bool peer_closed;
bool peer_remote;
size_t queued_message_count;
size_t queued_num_bytes;
size_t unacknowledged_message_count;
};
class MessageFilter;
class NodeDelegate;
// A Node maintains a collection of Ports (see port.h) indexed by unique 128-bit
// addresses (names), performing routing and processing of events among the
// Ports within the Node and to or from other Nodes in the system. Typically
// (and practically, in all uses today) there is a single Node per system
// process. Thus a Node boundary effectively models a process boundary.
//
// New Ports can be created uninitialized using CreateUninitializedPort (and
// later initialized using InitializePort), or created in a fully initialized
// state using CreatePortPair(). Initialized ports have exactly one conjugate
// port which is the ultimate receiver of any user messages sent by that port.
// See SendUserMessage().
//
// In addition to routing user message events, various control events are used
// by Nodes to coordinate Port behavior and lifetime within and across Nodes.
// See Event documentation for description of different types of events used by
// a Node to coordinate behavior.
class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node {
public:
enum class ShutdownPolicy {
DONT_ALLOW_LOCAL_PORTS,
ALLOW_LOCAL_PORTS,
};
// Does not take ownership of the delegate.
Node(const NodeName& name, NodeDelegate* delegate);
~Node();
// Returns true iff there are no open ports referring to another node or ports
// in the process of being transferred from this node to another. If this
// returns false, then to ensure clean shutdown, it is necessary to keep the
// node alive and continue routing messages to it via AcceptMessage. This
// method may be called again after AcceptMessage to check if the Node is now
// ready to be destroyed.
//
// If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return
// |true| even if some ports remain alive, as long as none of them are proxies
// to another node.
bool CanShutdownCleanly(
ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS);
// Lookup the named port.
int GetPort(const PortName& port_name, PortRef* port_ref);
// Creates a port on this node. Before the port can be used, it must be
// initialized using InitializePort. This method is useful for bootstrapping
// a connection between two nodes. Generally, ports are created using
// CreatePortPair instead.
int CreateUninitializedPort(PortRef* port_ref);
// Initializes a newly created port.
int InitializePort(const PortRef& port_ref,
const NodeName& peer_node_name,
const PortName& peer_port_name);
// Generates a new connected pair of ports bound to this node. These ports
// are initialized and ready to go.
int CreatePortPair(PortRef* port0_ref, PortRef* port1_ref);
// User data associated with the port.
int SetUserData(const PortRef& port_ref, scoped_refptr<UserData> user_data);
int GetUserData(const PortRef& port_ref, scoped_refptr<UserData>* user_data);
// Prevents further messages from being sent from this port or delivered to
// this port. The port is removed, and the port's peer is notified of the
// closure after it has consumed all pending messages.
int ClosePort(const PortRef& port_ref);
// Returns the current status of the port.
int GetStatus(const PortRef& port_ref, PortStatus* port_status);
// Returns the next available message on the specified port or returns a null
// message if there are none available. Returns ERROR_PORT_PEER_CLOSED to
// indicate that this port's peer has closed. In such cases GetMessage may
// be called until it yields a null message, indicating that no more messages
// may be read from the port.
//
// If |filter| is non-null, the next available message is returned only if it
// is matched by the filter. If the provided filter does not match the next
// available message, GetMessage() behaves as if there is no message
// available. Ownership of |filter| is not taken, and it must outlive the
// extent of this call.
int GetMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message,
MessageFilter* filter);
// Sends a message from the specified port to its peer. Note that the message
// notification may arrive synchronously (via PortStatusChanged() on the
// delegate) if the peer is local to this Node.
int SendUserMessage(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent> message);
// Makes the port send acknowledge requests to its conjugate to acknowledge
// at least every |sequence_number_acknowledge_interval| messages as they're
// read from the conjugate. The number of unacknowledged messages is exposed
// in the |unacknowledged_message_count| field of PortStatus. This allows
// bounding the number of unread and/or in-transit messages from this port
// to its conjugate between zero and |unacknowledged_message_count|.
int SetAcknowledgeRequestInterval(
const PortRef& port_ref,
uint64_t sequence_number_acknowledge_interval);
// Corresponding to NodeDelegate::ForwardEvent.
int AcceptEvent(ScopedEvent event);
// Called to merge two ports with each other. If you have two independent
// port pairs A <=> B and C <=> D, the net result of merging B and C is a
// single connected port pair A <=> D.
//
// Note that the behavior of this operation is undefined if either port to be
// merged (B or C above) has ever been read from or written to directly, and
// this must ONLY be called on one side of the merge, though it doesn't matter
// which side.
//
// It is safe for the non-merged peers (A and D above) to be transferred,
// closed, and/or written to before, during, or after the merge.
int MergePorts(const PortRef& port_ref,
const NodeName& destination_node_name,
const PortName& destination_port_name);
// Like above but merges two ports local to this node. Because both ports are
// local this can also verify that neither port has been written to before the
// merge. If this fails for any reason, both ports are closed. Otherwise OK
// is returned and the ports' receiving peers are connected to each other.
int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref);
// Called to inform this node that communication with another node is lost
// indefinitely. This triggers cleanup of ports bound to this node.
int LostConnectionToNode(const NodeName& node_name);
private:
// Helper to ensure that a Node always calls into its delegate safely, i.e.
// without holding any internal locks.
class DelegateHolder {
public:
DelegateHolder(Node* node, NodeDelegate* delegate);
~DelegateHolder();
NodeDelegate* operator->() const {
EnsureSafeDelegateAccess();
return delegate_;
}
private:
#if DCHECK_IS_ON()
void EnsureSafeDelegateAccess() const;
#else
void EnsureSafeDelegateAccess() const {}
#endif
Node* const node_;
NodeDelegate* const delegate_;
DISALLOW_COPY_AND_ASSIGN(DelegateHolder);
};
int OnUserMessage(std::unique_ptr<UserMessageEvent> message);
int OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event);
int OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event);
int OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event);
int OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event);
int OnMergePort(std::unique_ptr<MergePortEvent> event);
int OnUserMessageReadAckRequest(
std::unique_ptr<UserMessageReadAckRequestEvent> event);
int OnUserMessageReadAck(std::unique_ptr<UserMessageReadAckEvent> event);
int AddPortWithName(const PortName& port_name, scoped_refptr<Port> port);
void ErasePort(const PortName& port_name);
int SendUserMessageInternal(const PortRef& port_ref,
std::unique_ptr<UserMessageEvent>* message);
int MergePortsInternal(const PortRef& port0_ref,
const PortRef& port1_ref,
bool allow_close_on_bad_state);
void ConvertToProxy(Port* port,
const NodeName& to_node_name,
PortName* port_name,
Event::PortDescriptor* port_descriptor);
int AcceptPort(const PortName& port_name,
const Event::PortDescriptor& port_descriptor);
int PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
Port::State expected_port_state,
bool ignore_closed_peer,
UserMessageEvent* message,
NodeName* forward_to_node);
int BeginProxying(const PortRef& port_ref);
int ForwardUserMessagesFromProxy(const PortRef& port_ref);
void InitiateProxyRemoval(const PortRef& port_ref);
void TryRemoveProxy(const PortRef& port_ref);
void DestroyAllPortsWithPeer(const NodeName& node_name,
const PortName& port_name);
// Changes the peer node and port name referenced by |port|. Note that both
// |ports_lock_| MUST be held through the extent of this method.
// |local_port|'s lock must be held if and only if a reference to |local_port|
// exist in |ports_|.
void UpdatePortPeerAddress(const PortName& local_port_name,
Port* local_port,
const NodeName& new_peer_node,
const PortName& new_peer_port);
// Removes an entry from |peer_port_map_| corresponding to |local_port|'s peer
// address, if valid.
void RemoveFromPeerPortMap(const PortName& local_port_name, Port* local_port);
// Swaps the peer information for two local ports. Used during port merges.
// Note that |ports_lock_| must be held along with each of the two port's own
// locks, through the extent of this method.
void SwapPortPeers(const PortName& port0_name,
Port* port0,
const PortName& port1_name,
Port* port1);
// Sends an acknowledge request to the peer if the port has a non-zero
// |sequence_num_acknowledge_interval|. This needs to be done when the port's
// peer changes, as the previous peer proxy may not have forwarded any prior
// acknowledge request before deleting itself.
void MaybeResendAckRequest(const PortRef& port_ref);
// Forwards a stored acknowledge request to the peer if the proxy has a
// non-zero |sequence_num_acknowledge_interval|.
void MaybeForwardAckRequest(const PortRef& port_ref);
// Sends an acknowledge of the most recently read sequence number to the peer
// if any messages have been read, and the port has a non-zero
// |sequence_num_to_acknowledge|.
void MaybeResendAck(const PortRef& port_ref);
const NodeName name_;
const DelegateHolder delegate_;
// Just to clarify readability of the types below.
using LocalPortName = PortName;
using PeerPortName = PortName;
// Guards access to |ports_| and |peer_port_maps_| below.
//
// This must never be acquired while an individual port's lock is held on the
// same thread. Conversely, individual port locks may be acquired while this
// one is held.
//
// Because UserMessage events may execute arbitrary user code during
// destruction, it is also important to ensure that such events are never
// destroyed while this (or any individual Port) lock is held.
base::Lock ports_lock_;
std::unordered_map<LocalPortName, scoped_refptr<Port>> ports_;
// Maps a peer port name to a list of PortRefs for all local ports which have
// the port name key designated as their peer port. The set of local ports
// which have the same peer port is expected to always be relatively small and
// usually 1. Hence we just use a flat_map of local PortRefs keyed on each
// local port's name.
using PeerPortMap =
std::unordered_map<PeerPortName, base::flat_map<LocalPortName, PortRef>>;
// A reverse mapping which can be used to find all local ports that reference
// a given peer node or a local port that references a specific given peer
// port on a peer node. The key to this map is the corresponding peer node
// name.
std::unordered_map<NodeName, PeerPortMap> peer_port_maps_;
DISALLOW_COPY_AND_ASSIGN(Node);
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_NODE_H_

Просмотреть файл

@ -0,0 +1,38 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_NODE_DELEGATE_H_
#define MOJO_CORE_PORTS_NODE_DELEGATE_H_
#include <stddef.h>
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/name.h"
#include "mojo/core/ports/port_ref.h"
namespace mojo {
namespace core {
namespace ports {
class NodeDelegate {
public:
virtual ~NodeDelegate() = default;
// Forward an event (possibly asynchronously) to the specified node.
virtual void ForwardEvent(const NodeName& node, ScopedEvent event) = 0;
// Broadcast an event to all nodes.
virtual void BroadcastEvent(ScopedEvent event) = 0;
// Indicates that the port's status has changed recently. Use Node::GetStatus
// to query the latest status of the port. Note, this event could be spurious
// if another thread is simultaneously modifying the status of the port.
virtual void PortStatusChanged(const PortRef& port_ref) = 0;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_NODE_DELEGATE_H_

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/port.h"
namespace mojo {
namespace core {
namespace ports {
Port::Port(uint64_t next_sequence_num_to_send,
uint64_t next_sequence_num_to_receive)
: state(kUninitialized),
next_sequence_num_to_send(next_sequence_num_to_send),
last_sequence_num_acknowledged(next_sequence_num_to_send - 1),
sequence_num_acknowledge_interval(0),
last_sequence_num_to_receive(0),
sequence_num_to_acknowledge(0),
message_queue(next_sequence_num_to_receive),
remove_proxy_on_last_message(false),
peer_closed(false),
peer_lost_unexpectedly(false) {}
Port::~Port() = default;
} // namespace ports
} // namespace core
} // namespace mojo

Просмотреть файл

@ -0,0 +1,200 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_PORT_H_
#define MOJO_CORE_PORTS_PORT_H_
#include <memory>
#include <queue>
#include <utility>
#include <vector>
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/lock.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/message_queue.h"
#include "mojo/core/ports/user_data.h"
namespace mojo {
namespace core {
namespace ports {
class PortLocker;
// A Port is essentially a node in a circular list of addresses. For the sake of
// this documentation such a list will henceforth be referred to as a "route."
// Routes are the fundamental medium upon which all Node event circulation takes
// place and are thus the backbone of all Mojo message passing.
//
// Each Port is identified by a 128-bit address within a Node (see node.h). A
// Port doesn't really *do* anything per se: it's a named collection of state,
// and its owning Node manages all event production, transmission, routing, and
// processing logic. See Node for more details on how Ports may be used to
// transmit arbitrary user messages as well as other Ports.
//
// Ports may be in any of a handful of states (see State below) which dictate
// how they react to system events targeting them. In the simplest and most
// common case, Ports are initially created as an entangled pair (i.e. a simple
// cycle consisting of two Ports) both in the |kReceiving| State. Consider Ports
// we'll label |A| and |B| here, which may be created using
// Node::CreatePortPair():
//
// +-----+ +-----+
// | |--------->| |
// | A | | B |
// | |<---------| |
// +-----+ +-----+
//
// |A| references |B| via |peer_node_name| and |peer_port_name|, while |B| in
// turn references |A|. Note that a Node is NEVER aware of who is sending events
// to a given Port; it is only aware of where it must route events FROM a given
// Port.
//
// For the sake of documentation, we refer to one receiving port in a route as
// the "conjugate" of the other. A receiving port's conjugate is also its peer
// upon initial creation, but because of proxying this may not be the case at a
// later time.
//
// ALL access to this data structure must be guarded by |lock_| acquisition,
// which is only possible using a PortLocker. PortLocker ensures that
// overlapping Port lock acquisitions on a single thread are always acquired in
// a globally consistent order.
class Port : public base::RefCountedThreadSafe<Port> {
public:
// The state of a given Port. A Port may only exist in one of these states at
// any given time.
enum State {
// The Port is not yet paired with a peer and is therefore unusable. See
// Node::CreateUninitializedPort and Node::InitializePort for motivation.
kUninitialized,
// The Port is publicly visible outside of its Node and may be used to send
// and receive user messages. There are always AT MOST two |kReceiving|
// Ports along any given route. A user message event sent from a receiving
// port is always circulated along the Port's route until it reaches either
// a dead-end -- in which case the route is broken -- or it reaches the
// other receiving Port in the route -- in which case it lands in that
// Port's incoming message queue which can by read by user code.
kReceiving,
// The Port has been taken out of the |kReceiving| state in preparation for
// proxying to a new destination. A Port enters this state immediately when
// it's attached to a user message and may only leave this state when
// transitioning to |kProxying|. See Node for more details.
kBuffering,
// The Port is forwarding all user messages (and most other events) to its
// peer without discretion. Ports in the |kProxying| state may never leave
// this state and only exist temporarily until their owning Node has
// established that no more events will target them. See Node for more
// details.
kProxying,
// The Port has been closed and is now permanently unusable. Only
// |kReceiving| ports can be closed.
kClosed
};
// The current State of the Port.
State state;
// The Node and Port address to which events should be routed FROM this Port.
// Note that this is NOT necessarily the address of the Port currently sending
// events TO this Port.
NodeName peer_node_name;
PortName peer_port_name;
// The next available sequence number to use for outgoing user message events
// originating from this port.
uint64_t next_sequence_num_to_send;
// The largest acknowledged user message event sequence number.
uint64_t last_sequence_num_acknowledged;
// The interval for which acknowledge requests will be sent. A value of N will
// cause an acknowledge request for |last_sequence_num_acknowledged| + N when
// initially set and on received acknowledge. This means that the lower bound
// for unread or in-transit messages is |next_sequence_num_to_send| -
// |last_sequence_num_acknowledged| + |sequence_number_acknowledge_interval|.
// If zero, no acknowledge requests are sent.
uint64_t sequence_num_acknowledge_interval;
// The sequence number of the last message this Port should ever expect to
// receive in its lifetime. May be used to determine that a proxying port is
// ready to be destroyed or that a receiving port's conjugate has been closed
// and we know the sequence number of the last message it sent.
uint64_t last_sequence_num_to_receive;
// The sequence number of the message for which this Port should send an
// acknowledge message. In the buffering state, holds the acknowledge request
// value that is forwarded to the peer on transition to proxying.
// This is zero in any port that's never received an acknowledge request, and
// in proxies that have forwarded a stored acknowledge.
uint64_t sequence_num_to_acknowledge;
// The queue of incoming user messages received by this Port. Only non-empty
// for buffering or receiving Ports. When a buffering port enters the proxying
// state, it flushes its queue and the proxy then bypasses the queue
// indefinitely.
//
// A receiving port's queue only has elements removed by user code reading
// messages from the port.
//
// Note that this is a priority queue which only exposes messages to consumers
// in strict sequential order.
MessageQueue message_queue;
// In some edge cases, a Node may need to remember to route a single special
// event upon destruction of this (proxying) Port. That event is stashed here
// in the interim.
std::unique_ptr<std::pair<NodeName, ScopedEvent>> send_on_proxy_removal;
// Arbitrary user data attached to the Port. In practice, Mojo uses this to
// stash an observer interface which can be notified about various Port state
// changes.
scoped_refptr<UserData> user_data;
// Indicates that this (proxying) Port has received acknowledgement that no
// new user messages will be routed to it. If |true|, the proxy will be
// removed once it has received and forwarded all sequenced messages up to and
// including the one numbered |last_sequence_num_to_receive|.
bool remove_proxy_on_last_message;
// Indicates that this Port is aware that its nearest (in terms of forward,
// non-zero cyclic routing distance) receiving Port has been closed.
bool peer_closed;
// Indicates that this Port lost its peer unexpectedly (e.g. via process death
// rather than receiving an ObserveClosure event). In this case
// |peer_closed| will be true but |last_sequence_num_to_receive| cannot be
// known. Such ports will continue to make message available until their
// message queue is empty.
bool peer_lost_unexpectedly;
Port(uint64_t next_sequence_num_to_send,
uint64_t next_sequence_num_to_receive);
void AssertLockAcquired() {
#if DCHECK_IS_ON()
lock_.AssertAcquired();
#endif
}
private:
friend class base::RefCountedThreadSafe<Port>;
friend class PortLocker;
~Port();
base::Lock lock_;
DISALLOW_COPY_AND_ASSIGN(Port);
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_PORT_H_

Просмотреть файл

@ -0,0 +1,74 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/port_locker.h"
#include <algorithm>
#include "mojo/core/ports/port.h"
#if DCHECK_IS_ON()
#include "base/threading/thread_local.h"
#endif
namespace mojo {
namespace core {
namespace ports {
namespace {
#if DCHECK_IS_ON()
void UpdateTLS(PortLocker* old_locker, PortLocker* new_locker) {
// Sanity check when DCHECK is on to make sure there is only ever one
// PortLocker extant on the current thread.
static auto* tls = new base::ThreadLocalPointer<PortLocker>();
DCHECK_EQ(old_locker, tls->Get());
tls->Set(new_locker);
}
#endif
} // namespace
PortLocker::PortLocker(const PortRef** port_refs, size_t num_ports)
: port_refs_(port_refs), num_ports_(num_ports) {
#if DCHECK_IS_ON()
UpdateTLS(nullptr, this);
#endif
// Sort the ports by address to lock them in a globally consistent order.
std::sort(
port_refs_, port_refs_ + num_ports_,
[](const PortRef* a, const PortRef* b) { return a->port() < b->port(); });
for (size_t i = 0; i < num_ports_; ++i) {
// TODO(crbug.com/725605): Remove this CHECK.
CHECK(port_refs_[i]->port());
port_refs_[i]->port()->lock_.Acquire();
}
}
PortLocker::~PortLocker() {
for (size_t i = 0; i < num_ports_; ++i)
port_refs_[i]->port()->lock_.Release();
#if DCHECK_IS_ON()
UpdateTLS(this, nullptr);
#endif
}
#if DCHECK_IS_ON()
// static
void PortLocker::AssertNoPortsLockedOnCurrentThread() {
// Forces a DCHECK if the TLS PortLocker is anything other than null.
UpdateTLS(nullptr, nullptr);
}
#endif
SinglePortLocker::SinglePortLocker(const PortRef* port_ref)
: port_ref_(port_ref), locker_(&port_ref_, 1) {}
SinglePortLocker::~SinglePortLocker() = default;
} // namespace ports
} // namespace core
} // namespace mojo

Просмотреть файл

@ -0,0 +1,86 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_PORT_LOCKER_H_
#define MOJO_CORE_PORTS_PORT_LOCKER_H_
#include <stdint.h>
#include "base/macros.h"
#include "mojo/core/ports/port_ref.h"
namespace mojo {
namespace core {
namespace ports {
class Port;
class PortRef;
// A helper which must be used to acquire individual Port locks. Any given
// thread may have at most one of these alive at any time. This ensures that
// when multiple ports are locked, they're locked in globally consistent order.
//
// Port locks are acquired upon construction of this object and released upon
// destruction.
class PortLocker {
public:
// Constructs a PortLocker over a sequence of |num_ports| contiguous
// |PortRef*|s. The sequence may be reordered by this constructor, and upon
// return, all referenced ports' locks are held.
PortLocker(const PortRef** port_refs, size_t num_ports);
~PortLocker();
// Provides safe access to a PortRef's Port. Note that in release builds this
// doesn't do anything other than pass through to the private accessor on
// |port_ref|, but it does force callers to go through a PortLocker to get to
// the state, thus minimizing the likelihood that they'll go and do something
// stupid.
Port* GetPort(const PortRef& port_ref) const {
#if DCHECK_IS_ON()
// Sanity check when DCHECK is on to ensure this is actually a port whose
// lock is held by this PortLocker.
bool is_port_locked = false;
for (size_t i = 0; i < num_ports_ && !is_port_locked; ++i)
if (port_refs_[i]->port() == port_ref.port())
is_port_locked = true;
DCHECK(is_port_locked);
#endif
return port_ref.port();
}
// A helper which can be used to verify that no Port locks are held on the
// current thread. In non-DCHECK builds this is a no-op.
#if DCHECK_IS_ON()
static void AssertNoPortsLockedOnCurrentThread();
#else
static void AssertNoPortsLockedOnCurrentThread() {}
#endif
private:
const PortRef** const port_refs_;
const size_t num_ports_;
DISALLOW_COPY_AND_ASSIGN(PortLocker);
};
// Convenience wrapper for a PortLocker that locks a single port.
class SinglePortLocker {
public:
explicit SinglePortLocker(const PortRef* port_ref);
~SinglePortLocker();
Port* port() const { return locker_.GetPort(*port_ref_); }
private:
const PortRef* port_ref_;
PortLocker locker_;
DISALLOW_COPY_AND_ASSIGN(SinglePortLocker);
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_PORT_LOCKER_H_

Просмотреть файл

@ -0,0 +1,30 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/port_ref.h"
#include "mojo/core/ports/port.h"
namespace mojo {
namespace core {
namespace ports {
PortRef::~PortRef() = default;
PortRef::PortRef() = default;
PortRef::PortRef(const PortName& name, scoped_refptr<Port> port)
: name_(name), port_(std::move(port)) {}
PortRef::PortRef(const PortRef& other) = default;
PortRef::PortRef(PortRef&& other) = default;
PortRef& PortRef::operator=(const PortRef& other) = default;
PortRef& PortRef::operator=(PortRef&& other) = default;
} // namespace ports
} // namespace core
} // namespace mojo

Просмотреть файл

@ -0,0 +1,48 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_PORT_REF_H_
#define MOJO_CORE_PORTS_PORT_REF_H_
#include "base/component_export.h"
#include "base/memory/ref_counted.h"
#include "mojo/core/ports/name.h"
namespace mojo {
namespace core {
namespace ports {
class Port;
class PortLocker;
class COMPONENT_EXPORT(MOJO_CORE_PORTS) PortRef {
public:
~PortRef();
PortRef();
PortRef(const PortName& name, scoped_refptr<Port> port);
PortRef(const PortRef& other);
PortRef(PortRef&& other);
PortRef& operator=(const PortRef& other);
PortRef& operator=(PortRef&& other);
const PortName& name() const { return name_; }
bool is_valid() const { return !!port_; }
private:
friend class PortLocker;
Port* port() const { return port_.get(); }
PortName name_;
scoped_refptr<Port> port_;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_PORT_REF_H_

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_USER_DATA_H_
#define MOJO_CORE_PORTS_USER_DATA_H_
#include "base/memory/ref_counted.h"
namespace mojo {
namespace core {
namespace ports {
class UserData : public base::RefCountedThreadSafe<UserData> {
protected:
friend class base::RefCountedThreadSafe<UserData>;
virtual ~UserData() = default;
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_USER_DATA_H_

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/core/ports/user_message.h"
namespace mojo {
namespace core {
namespace ports {
UserMessage::UserMessage(const TypeInfo* type_info) : type_info_(type_info) {}
UserMessage::~UserMessage() = default;
bool UserMessage::WillBeRoutedExternally() {
return true;
}
size_t UserMessage::GetSizeIfSerialized() const {
return 0;
}
} // namespace ports
} // namespace core
} // namespace mojo

Просмотреть файл

@ -0,0 +1,58 @@
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef MOJO_CORE_PORTS_USER_MESSAGE_H_
#define MOJO_CORE_PORTS_USER_MESSAGE_H_
#include <stddef.h>
#include "base/component_export.h"
#include "base/macros.h"
namespace mojo {
namespace core {
namespace ports {
// Base type to use for any embedder-defined user message implementation. This
// class is intentionally empty.
//
// Provides a bit of type-safety help to subclasses since by design downcasting
// from this type is a common operation in embedders.
//
// Each subclass should define a static const instance of TypeInfo named
// |kUserMessageTypeInfo| and pass its address down to the UserMessage
// constructor. The type of a UserMessage can then be dynamically inspected by
// comparing |type_info()| to any subclass's |&kUserMessageTypeInfo|.
class COMPONENT_EXPORT(MOJO_CORE_PORTS) UserMessage {
public:
struct TypeInfo {};
explicit UserMessage(const TypeInfo* type_info);
virtual ~UserMessage();
const TypeInfo* type_info() const { return type_info_; }
// Invoked immediately before the system asks the embedder to forward this
// message to an external node.
//
// Returns |true| if the message is OK to route externally, or |false|
// otherwise. Returning |false| implies an unrecoverable condition, and the
// message event will be destroyed without further routing.
virtual bool WillBeRoutedExternally();
// Returns the size in bytes of this message iff it's serialized. Zero
// otherwise.
virtual size_t GetSizeIfSerialized() const;
private:
const TypeInfo* const type_info_;
DISALLOW_COPY_AND_ASSIGN(UserMessage);
};
} // namespace ports
} // namespace core
} // namespace mojo
#endif // MOJO_CORE_PORTS_USER_MESSAGE_H_