Atomic replacement of shutdown node (#5514)

This commit is contained in:
Amaury Chamayou 2023-08-08 18:33:59 +01:00 коммит произвёл GitHub
Родитель 7e8c240d93
Коммит 9c4d247c55
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 108 добавлений и 27 удалений

Просмотреть файл

@ -1460,7 +1460,7 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py
CLIENT_BIN ./submit
ADDITIONAL_ARGS --package "samples/apps/basic/libbasic" --client-def
"${WORKER_THREADS},write,100000,any"
"${WORKER_THREADS},write,100000,primary"
)
endif()

Просмотреть файл

@ -51,6 +51,7 @@ namespace basicapp
};
make_endpoint(
"/records/{key}", HTTP_PUT, put, {ccf::user_cert_auth_policy})
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();
auto get = [this](ccf::endpoints::ReadOnlyEndpointContext& ctx) {
@ -85,6 +86,7 @@ namespace basicapp
};
make_read_only_endpoint(
"/records/{key}", HTTP_GET, get, {ccf::user_cert_auth_policy})
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();
auto post = [this](ccf::endpoints::EndpointContext& ctx) {

Просмотреть файл

@ -4,7 +4,7 @@
"get": {
"js_module": "basic.js",
"js_function": "get_record",
"forwarding_required": "sometimes",
"forwarding_required": "never",
"authn_policies": ["user_cert"],
"mode": "readonly",
"openapi": {}
@ -12,7 +12,7 @@
"put": {
"js_module": "basic.js",
"js_function": "put_record",
"forwarding_required": "always",
"forwarding_required": "never",
"authn_policies": ["user_cert"],
"mode": "readwrite",
"openapi": {}

Просмотреть файл

@ -520,7 +520,7 @@ namespace aft
if (state->leadership_state != kv::LeadershipState::Leader)
{
RAFT_FAIL_FMT(
RAFT_DEBUG_FMT(
"Failed to replicate {} items: not leader", entries.size());
rollback(state->last_idx);
return false;
@ -528,7 +528,7 @@ namespace aft
if (term != state->current_view)
{
RAFT_FAIL_FMT(
RAFT_DEBUG_FMT(
"Failed to replicate {} items at term {}, current term is {}",
entries.size(),
term,

Просмотреть файл

@ -297,7 +297,7 @@ namespace asynchost
const auto address_it = node_addresses.find(to);
if (address_it == node_addresses.end())
{
LOG_INFO_FMT("Ignoring node_outbound to unknown node {}", to);
LOG_TRACE_FMT("Ignoring node_outbound to unknown node {}", to);
return;
}

Просмотреть файл

@ -179,20 +179,8 @@ def create_and_fill_key_space(size: int, primary: infra.node.Node) -> List[str]:
return space
def create_and_add_node(
network, host, old_primary, new_primary, snapshots_dir, statistics
):
LOG.info(f"Retiring old primary {old_primary.local_node_id}")
statistics[
"initial_primary_retirement_start_time"
] = datetime.datetime.now().isoformat()
network.retire_node(new_primary, old_primary)
statistics[
"initial_primary_retirement_complete_time"
] = datetime.datetime.now().isoformat()
LOG.info("Old primary is retired")
LOG.info(f"Adding new node: {host}")
def create_and_add_node(network, host, old_primary, snapshots_dir, statistics):
LOG.info(f"Add new node: {host}")
node = network.create_node(host)
statistics["new_node_join_start_time"] = datetime.datetime.now().isoformat()
network.join_node(
@ -203,9 +191,9 @@ def create_and_add_node(
copy_ledger=False,
snapshots_dir=snapshots_dir,
)
network.trust_node(node, args)
statistics["new_node_join_complete_time"] = datetime.datetime.now().isoformat()
LOG.info(f"Done adding new node: {host}")
LOG.info(f"Replace node {old_primary.local_node_id} with {node.local_node_id}")
network.replace_stopped_node(old_primary, node, args, statistics=statistics)
LOG.info(f"Done replacing node: {host}")
def run(args):
@ -368,14 +356,13 @@ def run(args):
old_primary = primary
primary, _ = network.wait_for_new_primary(primary)
statistics[
"new_primary_election_time"
"new_primary_detected_time"
] = datetime.datetime.now().isoformat()
if args.add_new_node_after_primary_stops:
create_and_add_node(
network,
args.add_new_node_after_primary_stops,
old_primary,
primary,
latest_snapshot_dir,
statistics,
)
@ -433,8 +420,8 @@ def run(args):
# to maintain consistency, and 504 when we try to write to the future primary
# before their election. Since these requests effectively do nothing, they
# should not count towards latency statistics.
if args.stop_primary_after_s:
overall = overall.filter(pl.col("responseStatus") < 500)
# if args.stop_primary_after_s:
# overall = overall.filter(pl.col("responseStatus") < 500)
overall = overall.with_columns(
pl.col("receiveTime").alias("latency") - pl.col("sendTime")

Просмотреть файл

@ -361,6 +361,33 @@ class Consortium:
**kwargs,
)
def replace_node(
self,
remote_node,
node_to_retire,
node_to_add,
valid_from,
validity_period_days=None,
**kwargs,
):
proposal_body = {"actions": []}
trust_args = {"node_id": node_to_add.node_id, "valid_from": str(valid_from)}
if validity_period_days is not None:
trust_args["validity_period_days"] = validity_period_days
proposal_body["actions"].append(
{"name": "transition_node_to_trusted", "args": trust_args}
)
proposal_body["actions"].append(
{"name": "remove_node", "args": {"node_id": node_to_retire.node_id}}
)
proposal = self.get_any_active_member().propose(remote_node, proposal_body)
self.vote_using_majority(
remote_node,
proposal,
{"ballot": "export function vote (proposal, proposer_id) { return true }"},
**kwargs,
)
def trust_node(self, remote_node, node_id, *args, **kwargs):
return self.trust_nodes(remote_node, [node_id], *args, **kwargs)

Просмотреть файл

@ -955,6 +955,71 @@ class Network:
self.nodes.remove(node_to_retire)
def replace_stopped_node(
self,
node_to_retire,
node_to_add,
args,
valid_from=None,
validity_period_days=None,
timeout=5,
statistics=None,
):
primary, _ = self.find_primary()
try:
if self.status is ServiceStatus.OPEN:
valid_from = valid_from or datetime.utcnow()
# Note: Timeout is function of the ledger size here since
# the commit of the trust_node proposal may rely on the new node
# catching up (e.g. adding 1 node to a 1-node network).
if statistics is not None:
statistics[
"node_replacement_governance_start"
] = datetime.now().isoformat()
self.consortium.replace_node(
primary,
node_to_retire,
node_to_add,
valid_from=valid_from,
validity_period_days=validity_period_days,
timeout=args.ledger_recovery_timeout,
)
if statistics is not None:
statistics[
"node_replacement_governance_committed"
] = datetime.now().isoformat()
except (ValueError, TimeoutError):
LOG.error(
f"NFailed to replace {node_to_retire.node_id} with {node_to_add.node_id}"
)
node_to_add.stop()
raise
node_to_add.network_state = infra.node.NodeNetworkState.joined
end_time = time.time() + timeout
r = None
while time.time() < end_time:
try:
with primary.client() as c:
r = c.get("/node/network/removable_nodes").body.json()
if node_to_retire.node_id in {n["node_id"] for n in r["nodes"]}:
check_commit = infra.checker.Checker(c)
r = c.delete(f"/node/network/nodes/{node_to_retire.node_id}")
check_commit(r)
break
else:
r = c.get(
f"/node/network/nodes/{node_to_retire.node_id}"
).body.json()
except ConnectionRefusedError:
pass
time.sleep(0.1)
else:
raise TimeoutError(f"Timed out waiting for node to become removed: {r}")
if statistics is not None:
statistics["old_node_removal_committed"] = datetime.now().isoformat()
self.nodes.remove(node_to_retire)
def create_user(self, local_user_id, curve, record=True):
infra.proc.ccall(
self.key_generator,