Support automatic port assignment (#1173)

This commit is contained in:
Eddy Ashton 2020-05-13 13:01:40 +01:00 коммит произвёл GitHub
Родитель d3e038b275
Коммит 0c9b34e0db
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 231 добавлений и 69 удалений

Просмотреть файл

@ -26,14 +26,11 @@ namespace cli
auto addr = results[0];
auto found = addr.find_last_of(":");
if (found == std::string::npos)
{
throw CLI::ValidationError(
option_name, "Address is not in format host:port");
}
auto hostname = addr.substr(0, found);
auto port = addr.substr(found + 1);
// If no port is specified, use port 0 (auto-assign a free port)
const auto port =
found == std::string::npos ? "0" : addr.substr(found + 1);
// Check if port is in valid range
int port_int;
@ -45,10 +42,10 @@ namespace cli
{
throw CLI::ValidationError(option_name, "Port is not a number");
}
if (port_int <= 0 || port_int > 65535)
if (port_int < 0 || port_int > 65535)
{
throw CLI::ValidationError(
option_name, "Port number is not in range 1-65535");
option_name, "Port number is not in range 0-65535");
}
parsed.hostname = hostname;

Просмотреть файл

@ -87,6 +87,13 @@ int main(int argc, char** argv)
"Address on which to listen for TLS commands coming from other nodes")
->required();
std::string node_address_file = {};
app.add_option(
"--node-address-file",
node_address_file,
"Path to which the node's node-to-node address (including potentially "
"auto-assigned port) will be written. If empty (default), write nothing");
cli::ParsedAddress rpc_address;
cli::add_address_option(
app,
@ -95,6 +102,13 @@ int main(int argc, char** argv)
"Address on which to listen for TLS commands coming from clients")
->required();
std::string rpc_address_file = {};
app.add_option(
"--rpc-address-file",
rpc_address_file,
"Path to which the node's RPC address (including potentially "
"auto-assigned port) will be written. If empty (default), write nothing");
cli::ParsedAddress public_rpc_address;
auto public_rpc_address_option = cli::add_address_option(
app,
@ -488,6 +502,38 @@ int main(int argc, char** argv)
// graceful shutdown on sigterm
asynchost::Sigterm sigterm(writer_factory);
// write to a ledger
asynchost::Ledger ledger(ledger_file, writer_factory);
ledger.register_message_handlers(bp.get_dispatcher());
// Begin listening for node-to-node and RPC messages.
// This includes DNS resolution and potentially dynamic port assignment (if
// requesting port 0). The hostname and port may be modified - after calling
// it holds the final assigned values.
asynchost::NodeConnectionsTickingReconnect node(
20, //< Flush reconnections every 20ms
bp.get_dispatcher(),
ledger,
writer_factory,
node_address.hostname,
node_address.port);
if (!node_address_file.empty())
{
files::dump(
fmt::format("{}\n{}", node_address.hostname, node_address.port),
node_address_file);
}
asynchost::RPCConnections rpc(writer_factory);
rpc.register_message_handlers(bp.get_dispatcher());
rpc.listen(0, rpc_address.hostname, rpc_address.port);
if (!rpc_address_file.empty())
{
files::dump(
fmt::format("{}\n{}", rpc_address.hostname, rpc_address.port),
rpc_address_file);
}
// Initialise the enclave and create a CCF node in it
const size_t certificate_size = 4096;
const size_t pubk_size = 1024;
@ -566,27 +612,11 @@ int main(int argc, char** argv)
LOG_INFO_FMT("Created new node");
// ledger
asynchost::Ledger ledger(ledger_file, writer_factory);
ledger.register_message_handlers(bp.get_dispatcher());
asynchost::NodeConnectionsTickingReconnect node(
20, //< Flush reconnections every 20ms
bp.get_dispatcher(),
ledger,
writer_factory,
node_address.hostname,
node_address.port);
asynchost::NotifyConnections report(
bp.get_dispatcher(),
notifications_address.hostname,
notifications_address.port);
asynchost::RPCConnections rpc(writer_factory);
rpc.register_message_handlers(bp.get_dispatcher());
rpc.listen(0, rpc_address.hostname, rpc_address.port);
// Write the node and network certs to disk.
files::dump(node_cert, node_cert_file);
if (*start || *recover)

Просмотреть файл

@ -148,14 +148,20 @@ namespace asynchost
}
};
class ServerBehaviour : public TCPBehaviour
class NodeServerBehaviour : public TCPServerBehaviour
{
public:
NodeConnections& parent;
ServerBehaviour(NodeConnections& parent) : parent(parent) {}
NodeServerBehaviour(NodeConnections& parent) : parent(parent) {}
void on_accept(TCP& peer)
void on_listening(
const std::string& host, const std::string& service) override
{
LOG_INFO_FMT("Listening for node-to-node on {}:{}", host, service);
}
void on_accept(TCP& peer) override
{
auto id = parent.get_next_id();
peer->set_behaviour(std::make_unique<IncomingBehaviour>(parent, id));
@ -179,13 +185,15 @@ namespace asynchost
messaging::Dispatcher<ringbuffer::Message>& disp,
Ledger& ledger,
ringbuffer::AbstractWriterFactory& writer_factory,
const std::string& host,
const std::string& service) :
std::string& host,
std::string& service) :
ledger(ledger),
to_enclave(writer_factory.create_writer_to_inside())
{
listener->set_behaviour(std::make_unique<ServerBehaviour>(*this));
listener->set_behaviour(std::make_unique<NodeServerBehaviour>(*this));
listener->listen(host, service);
host = listener->get_host();
service = listener->get_service();
register_message_handlers(disp);
}

Просмотреть файл

@ -58,30 +58,24 @@ namespace asynchost
}
};
class ServerBehaviour : public TCPBehaviour
class RPCServerBehaviour : public TCPServerBehaviour
{
public:
RPCConnections& parent;
int64_t id;
ServerBehaviour(RPCConnections& parent, int64_t id) :
RPCServerBehaviour(RPCConnections& parent, int64_t id) :
parent(parent),
id(id)
{}
void on_resolve_failed()
void on_listening(
const std::string& host, const std::string& service) override
{
LOG_DEBUG_FMT("rpc resolve failed {}", id);
cleanup();
LOG_INFO_FMT("Listening for RPCs on {}:{}", host, service);
}
void on_listen_failed()
{
LOG_DEBUG_FMT("rpc connect failed {}", id);
cleanup();
}
void on_accept(TCP& peer)
void on_accept(TCP& peer) override
{
auto client_id = parent.get_next_id();
peer->set_behaviour(
@ -111,7 +105,7 @@ namespace asynchost
to_enclave(writer_factory.create_writer_to_inside())
{}
bool listen(int64_t id, const std::string& host, const std::string& service)
bool listen(int64_t id, std::string& host, std::string& service)
{
if (id == 0)
id = get_next_id();
@ -123,11 +117,14 @@ namespace asynchost
}
TCP s;
s->set_behaviour(std::make_unique<ServerBehaviour>(*this, id));
s->set_behaviour(std::make_unique<RPCServerBehaviour>(*this, id));
if (!s->listen(host, service))
return false;
host = s->get_host();
service = s->get_service();
sockets.emplace(id, s);
return true;
}

Просмотреть файл

@ -18,6 +18,11 @@ namespace asynchost
virtual void on_resolve_failed() {}
virtual void on_listen_failed() {}
virtual void on_listening(
const std::string& host, const std::string& service)
{
LOG_INFO_FMT("Listening on {}:{}", host, service);
}
virtual void on_accept(TCP& peer) {}
virtual void on_connect() {}
virtual void on_connect_failed() {}
@ -25,6 +30,20 @@ namespace asynchost
virtual void on_disconnect() {}
};
class TCPServerBehaviour : public TCPBehaviour
{
public:
virtual void on_resolve_failed() override
{
throw std::runtime_error("TCP server resolve failed");
}
virtual void on_listen_failed() override
{
throw std::runtime_error("TCP server listen failed");
}
};
class TCPImpl : public with_uv_handle<uv_tcp_t>
{
private:
@ -75,6 +94,26 @@ namespace asynchost
addrinfo* addr_base = nullptr;
addrinfo* addr_current = nullptr;
bool service_assigned() const
{
return service != "0";
}
std::string get_address_name() const
{
const std::string port_suffix =
service_assigned() ? fmt::format(":{}", service) : "";
if (addr_current != nullptr && addr_current->ai_family == AF_INET6)
{
return fmt::format("[{}]{}", host, port_suffix);
}
else
{
return fmt::format("{}{}", host, port_suffix);
}
}
TCPImpl() : status(FRESH)
{
if (!init())
@ -95,6 +134,16 @@ namespace asynchost
behaviour = std::move(b);
}
std::string get_host() const
{
return host;
}
std::string get_service() const
{
return service;
}
bool connect(const std::string& host, const std::string& service)
{
assert_status(FRESH, CONNECTING_RESOLVING);
@ -227,30 +276,83 @@ namespace asynchost
return true;
}
void update_resolved_address(int address_family, sockaddr* sa)
{
constexpr auto buf_len = UV_IF_NAMESIZE;
char buf[buf_len] = {};
int rc;
if (address_family == AF_INET6)
{
const auto in6 = (const sockaddr_in6*)sa;
if ((rc = uv_ip6_name(in6, buf, buf_len)) != 0)
{
LOG_FAIL_FMT("uv_ip6_name failed: {}", uv_strerror(rc));
}
host = buf;
service = fmt::format("{}", ntohs(in6->sin6_port));
}
else
{
const auto in4 = (const sockaddr_in*)sa;
if ((rc = uv_ip4_name(in4, buf, buf_len)) != 0)
{
LOG_FAIL_FMT("uv_ip4_name failed: {}", uv_strerror(rc));
}
host = buf;
service = fmt::format("{}", ntohs(in4->sin_port));
}
}
void listen_resolved()
{
int rc;
while (addr_current != nullptr)
{
update_resolved_address(addr_current->ai_family, addr_current->ai_addr);
if ((rc = uv_tcp_bind(&uv_handle, addr_current->ai_addr, 0)) < 0)
{
addr_current = addr_current->ai_next;
LOG_FAIL_FMT(
"uv_tcp_bind failed on {}: {}",
get_address_name(),
uv_strerror(rc));
continue;
}
if ((rc = uv_listen((uv_stream_t*)&uv_handle, backlog, on_accept)) < 0)
{
LOG_FAIL_FMT(
"uv_listen failed on {}: {}", get_address_name(), uv_strerror(rc));
addr_current = addr_current->ai_next;
continue;
}
// If bound on port 0 (ie - asking the OS to assign a port), then we
// need to call uv_tcp_getsockname to retrieve the bound port
// (addr_current will not contain it)
if (!service_assigned())
{
sockaddr_storage sa_storage;
const auto sa = (sockaddr*)&sa_storage;
int sa_len = sizeof(sa_storage);
if ((rc = uv_tcp_getsockname(&uv_handle, sa, &sa_len)) != 0)
{
LOG_FAIL_FMT("uv_tcp_getsockname failed: {}", uv_strerror(rc));
}
update_resolved_address(addr_current->ai_family, sa);
}
assert_status(LISTENING_RESOLVING, LISTENING);
behaviour->on_listening(host, service);
return;
}
assert_status(LISTENING_RESOLVING, LISTENING_FAILED);
LOG_FAIL_FMT("uv_tcp_bind or uv_listen failed: {}", uv_strerror(rc));
behaviour->on_listen_failed();
}

Просмотреть файл

@ -36,14 +36,13 @@ class Node:
hosts, *port = host.split(":")
self.host, *self.pubhost = hosts.split(",")
self.rpc_port = port[0] if port else None
self.rpc_port = int(port[0]) if port else None
self.node_port = None
if self.host == "localhost":
self.host = infra.net.expand_localhost()
self._set_ports(infra.net.probably_free_local_port)
self.remote_impl = infra.remote.LocalRemote
else:
self._set_ports(infra.net.probably_free_remote_port)
self.remote_impl = infra.remote.SSHRemote
self.pubhost = self.pubhost[0] if self.pubhost else self.host
@ -54,14 +53,6 @@ class Node:
def __eq__(self, other):
return self.node_id == other.node_id
def _set_ports(self, probably_free_function):
if self.rpc_port is None:
self.node_port, self.rpc_port = infra.net.two_different(
probably_free_function, self.host
)
else:
self.node_port = probably_free_function(self.host)
def start(
self,
lib_name,
@ -180,8 +171,36 @@ class Node:
self.remote.set_perf()
self.remote.start()
self.remote.get_startup_files(self.common_dir)
self._read_ports()
LOG.info("Remote {} started".format(self.node_id))
def _read_ports(self):
node_address_path = os.path.join(self.common_dir, self.remote.node_address_path)
with open(node_address_path, "r") as f:
node_host, node_port = f.read().splitlines()
node_port = int(node_port)
assert (
node_host == self.host
), f"Unexpected change in node address from {self.host} to {node_host}"
if self.node_port is not None:
assert (
node_port == self.node_port
), f"Unexpected change in node port from {self.node_port} to {node_port}"
self.node_port = node_port
rpc_address_path = os.path.join(self.common_dir, self.remote.rpc_address_path)
with open(rpc_address_path, "r") as f:
rpc_host, rpc_port = f.read().splitlines()
rpc_port = int(rpc_port)
assert (
rpc_host == self.host
), f"Unexpected change in RPC address from {self.host} to {rpc_host}"
if self.rpc_port is not None:
assert (
rpc_port == self.rpc_port
), f"Unexpected change in RPC port from {self.rpc_port} to {rpc_port}"
self.rpc_port = rpc_port
def stop(self):
if self.remote and self.network_state is not NodeNetworkState.stopped:
self.network_state = NodeNetworkState.stopped

Просмотреть файл

@ -16,6 +16,7 @@ from collections import deque
from loguru import logger as LOG
DBG = os.getenv("DBG", "cgdb")
FILE_TIMEOUT = 60
_libc = ctypes.CDLL("libc.so.6")
@ -183,7 +184,7 @@ class SSHRemote(CmdMixin):
session.put(src_path, tgt_path)
session.close()
def get(self, file_name, dst_path, timeout=60, target_name=None):
def get(self, file_name, dst_path, timeout=FILE_TIMEOUT, target_name=None):
"""
Get file called `file_name` under the root of the remote. If the
file is missing, wait for timeout, and raise an exception.
@ -215,7 +216,7 @@ class SSHRemote(CmdMixin):
else:
raise ValueError(file_name)
def list_files(self, timeout=60):
def list_files(self, timeout=FILE_TIMEOUT):
files = []
with sftp_session(self.hostname) as session:
end_time = time.time() + timeout
@ -407,7 +408,7 @@ class LocalRemote(CmdMixin):
src_path = os.path.join(self.common_dir, path)
assert self._rc("cp {} {}".format(src_path, dst_path)) == 0
def get(self, file_name, dst_path, timeout=60, target_name=None):
def get(self, file_name, dst_path, timeout=FILE_TIMEOUT, target_name=None):
path = os.path.join(self.root, file_name)
end_time = time.time() + timeout
while time.time() < end_time:
@ -424,7 +425,7 @@ class LocalRemote(CmdMixin):
def list_files(self):
return os.listdir(self.root)
def start(self, timeout=10):
def start(self):
"""
Start cmd. stdout and err are captured to file locally.
"""
@ -496,6 +497,12 @@ CCF_TO_OE_LOG_LEVEL = {
}
def make_address(host, port=None):
if port is not None:
return f"{host}:{port}"
return host
class CCFRemote(object):
BIN = "cchost"
DEPS = []
@ -536,11 +543,9 @@ class CCFRemote(object):
"""
self.start_type = start_type
self.local_node_id = local_node_id
self.host = host
self.pubhost = pubhost
self.node_port = node_port
self.rpc_port = rpc_port
self.pem = "{}.pem".format(local_node_id)
self.pem = f"{local_node_id}.pem"
self.node_address_path = f"{local_node_id}.node_address"
self.rpc_address_path = f"{local_node_id}.rpc_address"
self.BIN = infra.path.build_bin_path(
self.BIN, enclave_type, binary_dir=binary_dir
)
@ -569,9 +574,11 @@ class CCFRemote(object):
bin_path,
f"--enclave-file={enclave_path}",
f"--enclave-type={enclave_type}",
f"--node-address={host}:{node_port}",
f"--rpc-address={host}:{rpc_port}",
f"--public-rpc-address={pubhost}:{rpc_port}",
f"--node-address={make_address(host, node_port)}",
f"--node-address-file={self.node_address_path}",
f"--rpc-address={make_address(host, rpc_port)}",
f"--rpc-address-file={self.rpc_address_path}",
f"--public-rpc-address={make_address(pubhost, rpc_port)}",
f"--ledger-file={self.ledger_file_name}",
f"--node-cert-file={self.pem}",
f"--host-log-level={host_log_level}",
@ -676,6 +683,8 @@ class CCFRemote(object):
def get_startup_files(self, dst_path):
self.remote.get(self.pem, dst_path)
self.remote.get(self.node_address_path, dst_path)
self.remote.get(self.rpc_address_path, dst_path)
if self.start_type in {StartType.new, StartType.recover}:
self.remote.get("networkcert.pem", dst_path)
self.remote.get("network_enc_pubk.pem", dst_path)