зеркало из https://github.com/microsoft/CCF.git
Pending nodes should not be added to consensus (#404)
This commit is contained in:
Родитель
e0a031c0c0
Коммит
11543c69a7
|
@ -297,8 +297,8 @@ if(BUILD_TESTS)
|
|||
)
|
||||
|
||||
add_e2e_test(
|
||||
NAME add_node_test
|
||||
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/addnode.py
|
||||
NAME reconfiguration_test
|
||||
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py
|
||||
)
|
||||
|
||||
add_e2e_test(
|
||||
|
|
|
@ -279,6 +279,11 @@ namespace ccf
|
|||
}
|
||||
case StartType::Join:
|
||||
{
|
||||
// Generate fresh key to encrypt/decrypt historical network secrets
|
||||
// sent
|
||||
// by the primary via the kv store
|
||||
raw_fresh_key = tls::create_entropy()->random(crypto::GCM_SIZE_KEY);
|
||||
|
||||
sm.advance(State::pending);
|
||||
return Success<CreateNew::Out>({node_cert, quote});
|
||||
}
|
||||
|
@ -392,10 +397,6 @@ namespace ccf
|
|||
return true;
|
||||
});
|
||||
|
||||
// Generate fresh key to encrypt/decrypt historical network secrets sent
|
||||
// by the primary via the kv store
|
||||
raw_fresh_key = tls::create_entropy()->random(crypto::GCM_SIZE_KEY);
|
||||
|
||||
// Send RPC request to remote node to join the network.
|
||||
jsonrpc::ProcedureCall<JoinNetworkNodeToNode::In> join_rpc;
|
||||
join_rpc.id = join_seq_no++;
|
||||
|
@ -1177,15 +1178,17 @@ namespace ccf
|
|||
auto configure = false;
|
||||
std::unordered_set<NodeId> configuration;
|
||||
|
||||
// TODO(#important,#TR): Only TRUSTED nodes should be sent append
|
||||
// entries, counted in election votes and allowed to establish
|
||||
// node-to-node channels (section III-F).
|
||||
for (auto& [node_id, ni] : w)
|
||||
{
|
||||
switch (ni.value.status)
|
||||
{
|
||||
case NodeStatus::TRUSTED:
|
||||
case NodeStatus::PENDING:
|
||||
{
|
||||
// Pending nodes are not added to consensus until they are
|
||||
// trusted
|
||||
break;
|
||||
}
|
||||
case NodeStatus::TRUSTED:
|
||||
{
|
||||
add_node(node_id, ni.value.host, ni.value.nodeport);
|
||||
configure = true;
|
||||
|
@ -1204,7 +1207,7 @@ namespace ccf
|
|||
if (configure)
|
||||
{
|
||||
s.foreach([&](NodeId node_id, const Nodes::VersionV& v) {
|
||||
if (v.value.status != NodeStatus::RETIRED)
|
||||
if (v.value.status == NodeStatus::TRUSTED)
|
||||
configuration.insert(node_id);
|
||||
return true;
|
||||
});
|
||||
|
|
|
@ -18,7 +18,7 @@ namespace ccf
|
|||
std::unique_ptr<ChannelManager> channels;
|
||||
std::unique_ptr<ringbuffer::AbstractWriter> to_host;
|
||||
|
||||
void established_channel(NodeId to)
|
||||
void establish_channel(NodeId to)
|
||||
{
|
||||
// If the channel is not yet established, replace all sent messages with
|
||||
// a key exchange message. In the case of raft, this is acceptable since
|
||||
|
@ -56,7 +56,7 @@ namespace ccf
|
|||
auto& n2n_channel = channels->get(to);
|
||||
if (n2n_channel.get_status() != ChannelStatus::ESTABLISHED)
|
||||
{
|
||||
established_channel(to);
|
||||
establish_channel(to);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -87,7 +87,7 @@ namespace ccf
|
|||
auto& n2n_channel = channels->get(to);
|
||||
if (n2n_channel.get_status() != ChannelStatus::ESTABLISHED)
|
||||
{
|
||||
established_channel(to);
|
||||
establish_channel(to);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@ namespace ccf
|
|||
throw std::logic_error(fmt::format("Node {} does not exist", id));
|
||||
info->status = NodeStatus::TRUSTED;
|
||||
nodes->put(id, *info);
|
||||
LOG_INFO_FMT("Node {} is now {}", id, info->status);
|
||||
return true;
|
||||
}},
|
||||
// retire a node
|
||||
|
@ -97,6 +98,7 @@ namespace ccf
|
|||
// this function is called.
|
||||
{"accept_recovery",
|
||||
[this](Store::Tx& tx, const nlohmann::json& args) {
|
||||
// TODO: Check type of args here
|
||||
if (node.is_part_of_public_network())
|
||||
return node.finish_recovery(tx, args);
|
||||
else
|
||||
|
|
|
@ -27,7 +27,9 @@ def run(args):
|
|||
|
||||
for i in range(1, 4):
|
||||
LOG.info(f"Adding node {i}")
|
||||
assert network.create_and_trust_node(args.package, "localhost", args, False)
|
||||
assert network.create_and_trust_node(
|
||||
args.package, "localhost", args, should_wait=False
|
||||
)
|
||||
|
||||
network.add_users(primary, network.initial_users)
|
||||
LOG.info("Initial set of users added")
|
||||
|
|
|
@ -76,7 +76,7 @@ def run(args):
|
|||
|
||||
LOG.debug("Waiting for transaction to be committed by all nodes")
|
||||
wait_for_index_globally_committed(
|
||||
commit_index, current_term, network.get_running_nodes()
|
||||
commit_index, current_term, network.get_joined_nodes()
|
||||
)
|
||||
|
||||
LOG.debug("Stopping primary")
|
||||
|
|
|
@ -26,6 +26,12 @@ class NodeNetworkState(Enum):
|
|||
joined = 2
|
||||
|
||||
|
||||
class NodeStatus(Enum):
|
||||
PENDING = 0
|
||||
TRUSTED = 1
|
||||
RETIRED = 2
|
||||
|
||||
|
||||
class ServiceStatus(Enum):
|
||||
OPENING = 1
|
||||
OPEN = 2
|
||||
|
@ -175,7 +181,6 @@ class Network:
|
|||
label=args.label,
|
||||
**forwarded_args,
|
||||
)
|
||||
node.network_state = NodeNetworkState.joined
|
||||
else:
|
||||
self._add_node(node, args.package, args)
|
||||
except Exception:
|
||||
|
@ -210,6 +215,7 @@ class Network:
|
|||
primary = self._start_all_nodes(args)
|
||||
|
||||
if not open_network:
|
||||
LOG.warning("Network still needs to be opened")
|
||||
return primary, self.nodes[1:]
|
||||
|
||||
self.wait_for_all_nodes_to_catch_up(primary)
|
||||
|
@ -259,7 +265,7 @@ class Network:
|
|||
)
|
||||
|
||||
# If the network is opening, node are trusted without consortium approval
|
||||
if self.status == ServiceStatus.OPENING and not should_wait:
|
||||
if self.status == ServiceStatus.OPENING and should_wait:
|
||||
try:
|
||||
node.wait_for_node_to_join()
|
||||
except TimeoutError:
|
||||
|
@ -267,37 +273,73 @@ class Network:
|
|||
raise
|
||||
node.network_state = NodeNetworkState.joined
|
||||
|
||||
def _wait_for_node_to_exist_in_store(self, remote_node, node_id, timeout=3):
|
||||
def _wait_for_node_to_exist_in_store(
|
||||
self, remote_node, node_id, node_status=None, timeout=3
|
||||
):
|
||||
exists = False
|
||||
for _ in range(timeout):
|
||||
if self.check_node_exists(remote_node, node_id):
|
||||
if self._check_node_exists(remote_node, node_id, node_status):
|
||||
exists = True
|
||||
break
|
||||
time.sleep(1)
|
||||
if not exists:
|
||||
raise TimeoutError(f"Node {node_id} has not yet been recorded in the store")
|
||||
raise TimeoutError(
|
||||
f"Node {node_id} has not yet been recorded in the store"
|
||||
+ getattr(node_status, f" with status {node_status.name}", "")
|
||||
)
|
||||
|
||||
def create_and_add_pending_node(self, lib_name, host, args, should_wait=True):
|
||||
"""
|
||||
Create a new node and add it to the network. Note that the new node
|
||||
still needs to be trusted by members to complete the join protocol.
|
||||
"""
|
||||
new_node = self.create_node(host)
|
||||
self._add_node(new_node, lib_name, args, should_wait)
|
||||
primary, _ = self.find_primary()
|
||||
try:
|
||||
self._wait_for_node_to_exist_in_store(
|
||||
primary,
|
||||
new_node.node_id,
|
||||
(
|
||||
NodeStatus.PENDING
|
||||
if self.status == ServiceStatus.OPEN
|
||||
else NodeStatus.TRUSTED
|
||||
),
|
||||
)
|
||||
except TimeoutError:
|
||||
# The node can be safely discarded since it has not been
|
||||
# attributed a unique node_id by CCF
|
||||
LOG.error(f"New pending node {new_node.node_id} failed to join the network")
|
||||
new_node.stop()
|
||||
self.nodes.remove(new_node)
|
||||
return None
|
||||
|
||||
return new_node
|
||||
|
||||
# TODO: should_wait should disappear once nodes can join a network and catch up in PBFT
|
||||
def create_and_trust_node(self, lib_name, host, args, should_wait=True):
|
||||
new_node = self.create_node(host)
|
||||
"""
|
||||
Create a new node, add it to the network and let members vote to trust
|
||||
it so that it becomes part of the consensus protocol.
|
||||
"""
|
||||
new_node = self.create_and_add_pending_node(lib_name, host, args, should_wait)
|
||||
if new_node is None:
|
||||
return None
|
||||
|
||||
primary, _ = self.find_primary()
|
||||
try:
|
||||
self._add_node(new_node, lib_name, args)
|
||||
primary, _ = self.find_primary()
|
||||
self._wait_for_node_to_exist_in_store(primary, new_node.node_id)
|
||||
if self.status is ServiceStatus.OPEN:
|
||||
self.trust_node(primary, new_node.node_id)
|
||||
if should_wait:
|
||||
new_node.wait_for_node_to_join()
|
||||
except (ValueError, TimeoutError):
|
||||
LOG.error(f"New node {new_node.node_id} failed to join the network")
|
||||
LOG.error(f"New trusted node {new_node.node_id} failed to join the network")
|
||||
new_node.stop()
|
||||
self.nodes.remove(new_node)
|
||||
return None
|
||||
|
||||
new_node.network_state = NodeNetworkState.joined
|
||||
if should_wait:
|
||||
self.wait_for_all_nodes_to_catch_up(primary)
|
||||
LOG.success(f"New node {new_node.node_id} joined the network")
|
||||
new_node.network_state = NodeNetworkState.joined
|
||||
|
||||
return new_node
|
||||
|
||||
|
@ -414,41 +456,33 @@ class Network:
|
|||
|
||||
with remote_node.member_client() as c:
|
||||
id = c.request("read", {"table": "ccf.nodes", "key": node_id})
|
||||
assert (
|
||||
c.response(id).result["status"].decode()
|
||||
== infra.remote.NodeStatus.RETIRED.name
|
||||
)
|
||||
assert c.response(id).result["status"].decode() == NodeStatus.RETIRED.name
|
||||
|
||||
def propose_trust_node(self, member_id, remote_node, node_id):
|
||||
return self.propose(
|
||||
member_id, remote_node, "trust_node", f"--node-id={node_id}"
|
||||
)
|
||||
|
||||
def check_node_exists(self, remote_node, node_id, expected_node_status=None):
|
||||
def _check_node_exists(self, remote_node, node_id, node_status=None):
|
||||
with remote_node.member_client() as c:
|
||||
rep = c.do("read", {"table": "ccf.nodes", "key": node_id})
|
||||
|
||||
if rep.error is not None or (
|
||||
expected_node_status
|
||||
and rep.result["status"].decode() != expected_node_status.name
|
||||
node_status and rep.result["status"].decode() != node_status.name
|
||||
):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def trust_node(self, remote_node, node_id):
|
||||
if not self.check_node_exists(
|
||||
remote_node, node_id, infra.remote.NodeStatus.PENDING
|
||||
):
|
||||
if not self._check_node_exists(remote_node, node_id, NodeStatus.PENDING):
|
||||
raise ValueError(f"Node {node_id} does not exist in state PENDING")
|
||||
|
||||
member_id = 1
|
||||
result = self.propose_trust_node(member_id, remote_node, node_id)
|
||||
result = self.vote_using_majority(remote_node, result[1]["id"])
|
||||
|
||||
if not self.check_node_exists(
|
||||
remote_node, node_id, infra.remote.NodeStatus.TRUSTED
|
||||
):
|
||||
if not self._check_node_exists(remote_node, node_id, NodeStatus.TRUSTED):
|
||||
raise ValueError(f"Node {node_id} does not exist in state TRUSTED")
|
||||
|
||||
def propose_add_member(self, member_id, remote_node, new_member_cert):
|
||||
|
@ -525,8 +559,8 @@ class Network:
|
|||
def get_members(self):
|
||||
return self.members
|
||||
|
||||
def get_running_nodes(self):
|
||||
return [node for node in self.nodes if node.is_stopped() is not True]
|
||||
def get_joined_nodes(self):
|
||||
return [node for node in self.nodes if node.is_joined()]
|
||||
|
||||
def get_node_by_id(self, node_id):
|
||||
return next((node for node in self.nodes if node.node_id == node_id), None)
|
||||
|
@ -540,7 +574,7 @@ class Network:
|
|||
term = None
|
||||
|
||||
for _ in range(timeout):
|
||||
for node in self.get_running_nodes():
|
||||
for node in self.get_joined_nodes():
|
||||
with node.node_client() as c:
|
||||
id = c.request("getPrimaryInfo", {})
|
||||
res = c.response(id)
|
||||
|
@ -573,7 +607,7 @@ class Network:
|
|||
|
||||
for _ in range(timeout):
|
||||
joined_nodes = 0
|
||||
for node in self.get_running_nodes():
|
||||
for node in self.get_joined_nodes():
|
||||
with node.node_client() as c:
|
||||
id = c.request("getCommit", {})
|
||||
resp = c.response(id)
|
||||
|
@ -585,12 +619,12 @@ class Network:
|
|||
and resp.result["term"] == term_leader
|
||||
):
|
||||
joined_nodes += 1
|
||||
if joined_nodes == len(self.get_running_nodes()):
|
||||
if joined_nodes == len(self.get_joined_nodes()):
|
||||
break
|
||||
time.sleep(1)
|
||||
assert joined_nodes == len(
|
||||
self.get_running_nodes()
|
||||
), f"Only {joined_nodes} (out of {len(self.get_running_nodes())}) nodes have joined the network"
|
||||
self.get_joined_nodes()
|
||||
), f"Only {joined_nodes} (out of {len(self.get_joined_nodes())}) nodes have joined the network"
|
||||
|
||||
def wait_for_node_commit_sync(self, timeout=3):
|
||||
"""
|
||||
|
@ -599,7 +633,7 @@ class Network:
|
|||
"""
|
||||
for _ in range(timeout):
|
||||
commits = []
|
||||
for node in (node for node in self.nodes if node.is_joined()):
|
||||
for node in self.get_joined_nodes():
|
||||
with node.node_client() as c:
|
||||
id = c.request("getCommit", {})
|
||||
commits.append(c.response(id).commit)
|
||||
|
@ -731,6 +765,7 @@ class Node:
|
|||
members_certs,
|
||||
**kwargs,
|
||||
)
|
||||
self.network_state = NodeNetworkState.joined
|
||||
|
||||
def join(
|
||||
self, lib_name, enclave_type, workspace, label, target_rpc_address, **kwargs
|
||||
|
@ -754,6 +789,7 @@ class Node:
|
|||
label,
|
||||
**kwargs,
|
||||
)
|
||||
self.network_state = NodeNetworkState.joined
|
||||
|
||||
def _start(
|
||||
self,
|
||||
|
|
|
@ -670,12 +670,6 @@ def ccf_remote(
|
|||
remote.stop()
|
||||
|
||||
|
||||
class NodeStatus(Enum):
|
||||
PENDING = 0
|
||||
TRUSTED = 1
|
||||
RETIRED = 2
|
||||
|
||||
|
||||
class StartType(Enum):
|
||||
new = 0
|
||||
join = 1
|
||||
|
|
|
@ -12,6 +12,13 @@ import time
|
|||
from loguru import logger as LOG
|
||||
|
||||
|
||||
def check_can_progress(node):
|
||||
with node.node_client() as mc:
|
||||
check_commit = infra.ccf.Checker(mc)
|
||||
with node.user_client() as c:
|
||||
check_commit(c.rpc("LOG_record", {"id": 42, "msg": "Hello"}), result=True)
|
||||
|
||||
|
||||
def run(args):
|
||||
hosts = ["localhost", "localhost"]
|
||||
|
||||
|
@ -20,20 +27,24 @@ def run(args):
|
|||
) as network:
|
||||
primary, others = network.start_and_join(args)
|
||||
|
||||
LOG.debug("Add a valid node")
|
||||
# Adding as many pending nodes as initial (trusted) nodes should not
|
||||
# change the raft consensus rules (i.e. majority)
|
||||
number_new_nodes = len(hosts)
|
||||
LOG.info(
|
||||
f"Adding {number_new_nodes} pending nodes - consensus rules should not change"
|
||||
)
|
||||
|
||||
for _ in range(number_new_nodes):
|
||||
network.create_and_add_pending_node(args.package, "localhost", args)
|
||||
check_can_progress(primary)
|
||||
|
||||
LOG.info("Add a valid node")
|
||||
new_node = network.create_and_trust_node(args.package, "localhost", args, True)
|
||||
assert new_node
|
||||
|
||||
with primary.node_client() as mc:
|
||||
check_commit = infra.ccf.Checker(mc)
|
||||
|
||||
with new_node.user_client(format="json") as c:
|
||||
check_commit(
|
||||
c.rpc("LOG_record", {"id": 42, "msg": "Hello world"}), result=True
|
||||
)
|
||||
check_can_progress(primary)
|
||||
|
||||
if args.enclave_type == "debug":
|
||||
LOG.debug("Add an invalid node (unknown code id)")
|
||||
LOG.info("Add an invalid node (unknown code id)")
|
||||
assert (
|
||||
network.create_and_trust_node(
|
||||
"libluagenericenc", "localhost", args, True
|
||||
|
@ -43,10 +54,10 @@ def run(args):
|
|||
else:
|
||||
LOG.warning("Skipping unknown code id test with virtual enclave")
|
||||
|
||||
LOG.debug("Retire node")
|
||||
LOG.info("Retire node")
|
||||
network.retire_node(primary, 0)
|
||||
|
||||
LOG.debug("Add a valid node")
|
||||
LOG.info("Add a valid node")
|
||||
new_node = network.create_and_trust_node(args.package, "localhost", args)
|
||||
assert new_node
|
||||
|
|
@ -113,16 +113,18 @@ def run(args):
|
|||
args.perf_nodes,
|
||||
node_offset=(recovery_idx + 1) * len(hosts),
|
||||
pdb=args.pdb,
|
||||
) as network:
|
||||
primary, backups = network.start_in_recovery(args, ledger, sealed_secrets)
|
||||
) as recovered_network:
|
||||
primary, backups = recovered_network.start_in_recovery(
|
||||
args, ledger, sealed_secrets
|
||||
)
|
||||
|
||||
with primary.node_client() as mc:
|
||||
check_commit = infra.ccf.Checker(mc)
|
||||
check = infra.ccf.Checker()
|
||||
|
||||
for node in network.nodes:
|
||||
for node in recovered_network.nodes:
|
||||
wait_for_state(node, b"partOfPublicNetwork")
|
||||
network.wait_for_node_commit_sync()
|
||||
recovered_network.wait_for_node_commit_sync()
|
||||
LOG.success("Public CFTR started")
|
||||
|
||||
LOG.debug(
|
||||
|
@ -135,24 +137,24 @@ def run(args):
|
|||
new_node_ids_offsets, new_node_ids_offsets + len(hosts)
|
||||
):
|
||||
id = c.request(
|
||||
"read", {"table": "nodes", "key": new_node_id}
|
||||
"read", {"table": "ccf.nodes", "key": new_node_id}
|
||||
)
|
||||
assert (
|
||||
infra.remote.NodeStatus(c.response(id).result["status"])
|
||||
== infra.remote.NodeStatus.TRUSTED
|
||||
c.response(id).result["status"].decode()
|
||||
== infra.ccf.NodeStatus.TRUSTED.name
|
||||
)
|
||||
|
||||
LOG.debug("2/3 members vote to complete the recovery")
|
||||
rc, result = network.propose(
|
||||
rc, result = recovered_network.propose(
|
||||
1, primary, "accept_recovery", f"--sealed-secrets={sealed_secrets}"
|
||||
)
|
||||
assert rc and not result["completed"]
|
||||
proposal_id = result["id"]
|
||||
|
||||
rc, result = network.vote(2, primary, proposal_id, True)
|
||||
rc, result = recovered_network.vote(2, primary, proposal_id, True)
|
||||
assert rc and result
|
||||
|
||||
for node in network.nodes:
|
||||
for node in recovered_network.nodes:
|
||||
wait_for_state(node, b"partOfNetwork")
|
||||
LOG.success("All nodes part of network")
|
||||
|
||||
|
@ -175,17 +177,17 @@ def run(args):
|
|||
old_txs = Txs(args.msgs_per_recovery, recovery_idx)
|
||||
|
||||
for recovery_cnt in range(args.recovery):
|
||||
check_nodes_have_msgs(network.nodes, old_txs)
|
||||
check_nodes_have_msgs(recovered_network.nodes, old_txs)
|
||||
LOG.success(
|
||||
"Recovery #{} complete on all nodes".format(recovery_idx + 1)
|
||||
)
|
||||
network.check_for_service(primary)
|
||||
recovered_network.check_for_service(primary)
|
||||
|
||||
new_txs = Txs(args.msgs_per_recovery, recovery_idx + 1)
|
||||
|
||||
rs = log_msgs(primary, new_txs)
|
||||
check_responses(rs, True, check, check_commit)
|
||||
network.wait_for_node_commit_sync()
|
||||
recovered_network.wait_for_node_commit_sync()
|
||||
check_nodes_have_msgs(backups, new_txs)
|
||||
|
||||
ledger = primary.remote.get_ledger()
|
||||
|
|
Загрузка…
Ссылка в новой задаче