CCF/tests/recovery.py

743 строки
26 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.e2e_args
import infra.network
import infra.node
import infra.logging_app as app
import infra.checker
import suite.test_requirements as reqs
import ccf.ledger
import os
import json
from infra.runner import ConcurrentRunner
from distutils.dir_util import copy_tree
from infra.consortium import slurp_file
import infra.health_watcher
import time
from e2e_logging import verify_receipt
import infra.service_load
import ccf.tx_id
import tempfile
from loguru import logger as LOG
def get_and_verify_historical_receipt(network, ref_msg):
primary, _ = network.find_primary()
if not ref_msg:
if not network.txs.priv:
network.txs.issue(network, number_txs=1)
idx, _ = network.txs.get_last_tx()
ref_msg = network.txs.priv[idx][-1]
ref_msg["idx"] = idx
r = network.txs.get_receipt(
primary,
ref_msg["idx"],
ref_msg["seqno"],
ref_msg["view"],
)
verify_receipt(r.json()["receipt"], network.cert)
return ref_msg
@reqs.description("Recover a service")
@reqs.recover(number_txs=2)
def test_recover_service(network, args, from_snapshot=True):
network.save_service_identity(args)
old_primary, _ = network.find_primary()
prev_ident = open(args.previous_service_identity_file, "r", encoding="utf-8").read()
# Strip trailing null byte
prev_ident = prev_ident.strip("\x00")
with old_primary.client() as c:
r = c.get("/node/service/previous_identity")
assert r.status_code in (200, 404), r.status_code
prev_view = c.get("/node/network").body.json()["current_view"]
snapshots_dir = None
if from_snapshot:
snapshots_dir = network.get_committed_snapshots(old_primary)
# Start health watcher and stop nodes one by one until a recovery has to be staged
watcher = infra.health_watcher.NetworkHealthWatcher(network, args, verbose=True)
watcher.start()
for node in network.get_joined_nodes():
time.sleep(args.election_timeout_ms / 1000)
node.stop()
watcher.wait_for_recovery()
current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger()
with tempfile.NamedTemporaryFile(mode="w+") as node_data_tf:
start_node_data = {"this is a": "recovery node"}
json.dump(start_node_data, node_data_tf)
node_data_tf.flush()
recovered_network = infra.network.Network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
existing_network=network,
node_data_json_file=node_data_tf.name,
)
with tempfile.NamedTemporaryFile(mode="w+") as ntf:
service_data = {"this is a": "recovery service"}
json.dump(service_data, ntf)
ntf.flush()
recovered_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
snapshots_dir=snapshots_dir,
service_data_json_file=ntf.name,
)
LOG.info("Check that service data has been set")
primary, _ = recovered_network.find_primary()
with primary.client() as c:
r = c.get("/node/network").body.json()
assert r["service_data"] == service_data
LOG.info("Check that the node data has been set")
r = c.get("/node/network/nodes").body.json()
assert r["nodes"]
did_check = False
for node in r["nodes"]:
if node["status"] == "Trusted":
assert node["node_data"] == start_node_data
did_check = True
assert did_check
recovered_network.verify_service_certificate_validity_period(
args.initial_service_cert_validity_days
)
new_nodes = recovered_network.find_primary_and_any_backup()
for n in new_nodes:
with n.client() as c:
r = c.get("/node/service/previous_identity")
assert r.status_code == 200, r.status_code
body = r.body.json()
assert "previous_service_identity" in body, body
received_prev_ident = body["previous_service_identity"]
assert (
received_prev_ident == prev_ident
), f"Response doesn't match previous identity: {received_prev_ident} != {prev_ident}"
recovered_network.recover(args)
LOG.info("Check that new service view is as expected")
new_primary, _ = recovered_network.find_primary()
with new_primary.client() as c:
assert (
ccf.tx_id.TxID.from_str(
c.get("/node/network").body.json()["current_service_create_txid"]
).view
== prev_view + 2
)
return recovered_network
@reqs.description("Recover a service with wrong service identity")
@reqs.recover(number_txs=2)
def test_recover_service_with_wrong_identity(network, args):
old_primary, _ = network.find_primary()
snapshots_dir = network.get_committed_snapshots(old_primary)
network.save_service_identity(args)
first_service_identity_file = args.previous_service_identity_file
network.stop_all_nodes()
current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger()
# Attempt a recovery with the wrong previous service certificate
args.previous_service_identity_file = network.consortium.user_cert_path("user0")
broken_network = infra.network.Network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
existing_network=network,
)
exception = None
try:
broken_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
snapshots_dir=snapshots_dir,
)
except Exception as ex:
exception = ex
broken_network.ignoring_shutdown_errors = True
broken_network.stop_all_nodes(skip_verification=True)
if exception is None:
raise ValueError("Recovery should have failed")
if not broken_network.nodes[0].check_log_for_error_message(
"Error starting node: Previous service identity does not endorse the node identity that signed the snapshot"
):
raise ValueError("Node log does not contain the expected error message")
# Attempt a second recovery with the broken cert but no snapshot
# Now the mismatch is only noticed when the transition proposal is submitted
broken_network = infra.network.Network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
existing_network=network,
)
broken_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
)
exception = None
try:
broken_network.recover(args)
except Exception as ex:
exception = ex
broken_network.ignoring_shutdown_errors = True
broken_network.stop_all_nodes(skip_verification=True)
if exception is None:
raise ValueError("Recovery should have failed")
if not broken_network.nodes[0].check_log_for_error_message(
"Unable to open service: Previous service identity does not match."
):
raise ValueError("Node log does not contain the expected error message")
# Recover, now with the correct service identity
args.previous_service_identity_file = first_service_identity_file
recovered_network = infra.network.Network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
existing_network=network,
)
recovered_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
snapshots_dir=snapshots_dir,
)
recovered_network.recover(args)
return recovered_network
@reqs.description("Recover a service with expired service identity")
def test_recover_service_with_expired_cert(args):
expired_service_dir = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "expired_service"
)
new_common = infra.network.get_common_folder_name(args.workspace, args.label)
copy_tree(os.path.join(expired_service_dir, "common"), new_common)
network = infra.network.Network(args.nodes, args.binary_dir)
args.previous_service_identity_file = os.path.join(
expired_service_dir, "common", "service_cert.pem"
)
network.start_in_recovery(
args,
ledger_dir=os.path.join(expired_service_dir, "0.ledger"),
committed_ledger_dirs=[os.path.join(expired_service_dir, "0.ledger")],
snapshots_dir=os.path.join(expired_service_dir, "0.snapshots"),
common_dir=new_common,
)
network.recover(args)
primary, _ = network.find_primary()
infra.checker.check_can_progress(primary)
r = primary.get_receipt(2, 3)
verify_receipt(r.json(), network.cert)
@reqs.description("Attempt to recover a service but abort before recovery is complete")
def test_recover_service_aborted(network, args, from_snapshot=False):
network.save_service_identity(args)
old_primary, _ = network.find_primary()
snapshots_dir = None
if from_snapshot:
snapshots_dir = network.get_committed_snapshots(old_primary)
network.stop_all_nodes()
current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger()
aborted_network = infra.network.Network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, network
)
aborted_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
snapshots_dir=snapshots_dir,
)
LOG.info("Fill in ledger to trigger new chunks, which should be marked as recovery")
primary, _ = aborted_network.find_primary()
while (
len(
[
f
for f in os.listdir(primary.remote.ledger_paths()[0])
if f.endswith(
f"{ccf.ledger.COMMITTED_FILE_SUFFIX}{ccf.ledger.RECOVERY_FILE_SUFFIX}"
)
]
)
< 2
):
# Submit large proposal until at least two recovery ledger chunks are committed
aborted_network.consortium.create_and_withdraw_large_proposal(primary)
LOG.info(
"Do not complete service recovery on purpose and initiate new recovery from scratch"
)
snapshots_dir = None
if from_snapshot:
snapshots_dir = network.get_committed_snapshots(primary)
# Check that all nodes have the same (recovery) ledger files
aborted_network.stop_all_nodes(
skip_verification=True, read_recovery_ledger_files=True
)
current_ledger_dir, committed_ledger_dirs = primary.get_ledger()
recovered_network = infra.network.Network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
existing_network=aborted_network,
)
recovered_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
snapshots_dir=snapshots_dir,
)
recovered_network.recover(args)
return recovered_network
@reqs.description("Recovering a service, kill one node while submitting shares")
@reqs.recover(number_txs=2)
def test_share_resilience(network, args, from_snapshot=False):
network.save_service_identity(args)
old_primary, _ = network.find_primary()
snapshots_dir = None
if from_snapshot:
snapshots_dir = network.get_committed_snapshots(old_primary)
network.stop_all_nodes()
current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger()
recovered_network = infra.network.Network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, network
)
recovered_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
snapshots_dir=snapshots_dir,
)
primary, _ = recovered_network.find_primary()
recovered_network.consortium.transition_service_to_open(
primary,
previous_service_identity=slurp_file(args.previous_service_identity_file),
)
# Submit all required recovery shares minus one. Last recovery share is
# submitted after a new primary is found.
encrypted_submitted_shares_count = 0
for m in recovered_network.consortium.get_active_members():
with primary.client() as nc:
if (
encrypted_submitted_shares_count
>= recovered_network.consortium.recovery_threshold - 1
):
last_member_to_submit = m
break
check_commit = infra.checker.Checker(nc)
check_commit(m.get_and_submit_recovery_share(primary))
encrypted_submitted_shares_count += 1
LOG.info(
f"Shutting down node {primary.node_id} before submitting last recovery share"
)
primary.stop()
new_primary, _ = recovered_network.wait_for_new_primary(primary)
last_member_to_submit.get_and_submit_recovery_share(new_primary)
for node in recovered_network.get_joined_nodes():
recovered_network.wait_for_state(
node,
infra.node.State.PART_OF_NETWORK.value,
timeout=args.ledger_recovery_timeout,
)
recovered_network.recovery_count += 1
recovered_network.consortium.check_for_service(
new_primary,
infra.network.ServiceStatus.OPEN,
recovery_count=recovered_network.recovery_count,
)
if recovered_network.service_load:
recovered_network.service_load.set_network(recovered_network)
return recovered_network
@reqs.description("Recover a service from malformed ledger")
@reqs.recover(number_txs=2)
def test_recover_service_truncated_ledger(network, args, get_truncation_point):
network.save_service_identity(args)
old_primary, _ = network.find_primary()
LOG.info("Force new ledger chunk for app txs to be in committed chunks")
network.consortium.force_ledger_chunk(old_primary)
LOG.info(
"Fill ledger with dummy entries until at least one ledger chunk is not committed, and contains a signature"
)
current_ledger_path = old_primary.remote.ledger_paths()[0]
while True:
# NB: This is used as an app agnostic write, nothing to do with the large
# size, or trying to force a chunk
network.consortium.create_and_withdraw_large_proposal(
old_primary, wait_for_commit=True
)
# A signature will have been emitted by now (wait_for_commit)
# Wait a little longer so it should have been persisted to disk, but
# retry if that has produced a committed chunk
time.sleep(0.2)
if not all(
f.endswith(ccf.ledger.COMMITTED_FILE_SUFFIX)
for f in os.listdir(current_ledger_path)
):
LOG.warning(
f"Decided to stop network after looking at ledger dir {current_ledger_path}: {os.listdir(current_ledger_path)}"
)
break
network.stop_all_nodes()
current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger()
LOG.warning(
f"Ledger dir after stopping node is {current_ledger_dir}: {os.listdir(current_ledger_dir)}"
)
# Corrupt _uncommitted_ ledger before starting new service
ledger = ccf.ledger.Ledger([current_ledger_dir], committed_only=False)
chunk_filename, truncate_offset = get_truncation_point(ledger)
assert truncate_offset is not None, "Should always truncate within tx"
truncated_ledger_file_path = os.path.join(current_ledger_dir, chunk_filename)
with open(truncated_ledger_file_path, "r+", encoding="utf-8") as f:
f.truncate(truncate_offset)
LOG.warning(
f"Truncated ledger file {truncated_ledger_file_path} at {truncate_offset}"
)
recovered_network = infra.network.Network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, network
)
recovered_network.start_in_recovery(
args,
ledger_dir=current_ledger_dir,
committed_ledger_dirs=committed_ledger_dirs,
)
recovered_network.recover(args)
return recovered_network
def run_corrupted_ledger(args):
txs = app.LoggingTxs("user0")
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
txs=txs,
) as network:
network.start_and_open(args)
def get_middle_tx_offset(tx):
offset, next_offset = tx.get_offsets()
return offset + (next_offset - offset) // 2
def all_txs(ledger, verbose):
for chunk in ledger:
if verbose:
LOG.info(f"Considering chunk {chunk.filename()}")
for tx in chunk:
if verbose:
LOG.info(f"Considering tx {tx.get_tx_digest()}")
yield chunk, tx
def corrupt_first_tx(ledger, verbose=False):
LOG.info("Finding first tx to corrupt")
for chunk, tx in all_txs(ledger, verbose):
return chunk.filename(), get_middle_tx_offset(tx)
return None, None
def corrupt_last_tx(ledger, verbose=False):
LOG.info("Finding last tx to corrupt")
chunk_filename, truncate_offset = None, None
for chunk, tx in all_txs(ledger, verbose):
chunk_filename = chunk.filename()
truncate_offset = get_middle_tx_offset(tx)
return chunk_filename, truncate_offset
def corrupt_first_sig(ledger, verbose=False):
LOG.info("Finding first sig to corrupt")
for chunk, tx in all_txs(ledger, verbose):
tables = tx.get_public_domain().get_tables()
if ccf.ledger.SIGNATURE_TX_TABLE_NAME in tables:
return chunk.filename(), get_middle_tx_offset(tx)
return None, None
network = test_recover_service_truncated_ledger(network, args, corrupt_first_tx)
network = test_recover_service_truncated_ledger(network, args, corrupt_last_tx)
network = test_recover_service_truncated_ledger(
network, args, corrupt_first_sig
)
network.stop_all_nodes()
# Make sure ledger can be read once recovered (i.e. ledger corruption does not affect recovered ledger)
for node in network.nodes:
ledger = ccf.ledger.Ledger(node.remote.ledger_paths(), committed_only=False)
_, last_seqno = ledger.get_latest_public_state()
LOG.info(
f"Successfully read ledger for node {node.local_node_id} up to seqno {last_seqno}"
)
def find_recovery_tx_seqno(node):
min_recovery_seqno = 0
with node.client() as c:
r = c.get("/node/state").body.json()
if "last_recovered_seqno" not in r:
return None
min_recovery_seqno = r["last_recovered_seqno"]
ledger = ccf.ledger.Ledger(node.remote.ledger_paths(), committed_only=False)
for chunk in ledger:
_, chunk_end_seqno = chunk.get_seqnos()
if chunk_end_seqno < min_recovery_seqno:
continue
for tx in chunk:
tables = tx.get_public_domain().get_tables()
seqno = tx.get_public_domain().get_seqno()
if ccf.ledger.SERVICE_INFO_TABLE_NAME in tables:
service_status = json.loads(
tables[ccf.ledger.SERVICE_INFO_TABLE_NAME][
ccf.ledger.WELL_KNOWN_SINGLETON_TABLE_KEY
]
)["status"]
if service_status == "Open":
return seqno
return None
def check_snapshots(args, network):
primary, _ = network.find_primary()
seqno = find_recovery_tx_seqno(primary)
if seqno:
# Check that primary node has produced a snapshot. The wait timeout is larger than the
# signature interval, so the snapshots should become available within the timeout.
assert args.sig_ms_interval < 3000
if not network.get_committed_snapshots(
primary, target_seqno=True, issue_txs=False
):
raise ValueError(
f"No snapshot found after seqno={seqno} on primary {primary.local_node_id}"
)
def run(args):
recoveries_count = 5
txs = app.LoggingTxs("user0")
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
txs=txs,
) as network:
network.start_and_open(args)
primary, _ = network.find_primary()
LOG.info("Check for well-known genesis service TxID")
with primary.client() as c:
r = c.get("/node/network").body.json()
assert ccf.tx_id.TxID.from_str(
r["current_service_create_txid"]
) == ccf.tx_id.TxID(2, 1)
if args.with_load:
# See https://github.com/microsoft/CCF/issues/3788 for justification
LOG.info("Loading service before recovery...")
primary, _ = network.find_primary()
with infra.service_load.load() as load:
load.begin(network, rate=infra.service_load.DEFAULT_REQUEST_RATE_S * 10)
while True:
with primary.client() as c:
r = c.get("/node/commit", log_capture=[]).body.json()
tx_id = ccf.tx_id.TxID.from_str(r["transaction_id"])
if tx_id.seqno > args.sig_tx_interval:
LOG.info(f"Loaded service successfully: tx_id, {tx_id}")
break
time.sleep(0.1)
ref_msg = get_and_verify_historical_receipt(network, None)
network = test_recover_service_with_wrong_identity(network, args)
for i in range(recoveries_count):
# Issue transactions which will required historical ledger queries recovery
# when the network is shutdown
network.txs.issue(network, number_txs=1)
network.txs.issue(network, number_txs=1, repeat=True)
# Alternate between recovery with primary change and stable primary-ship,
# with and without snapshots
if i % recoveries_count == 0:
if args.consensus != "BFT":
network = test_share_resilience(network, args, from_snapshot=True)
elif i % recoveries_count == 1:
network = test_recover_service_aborted(
network, args, from_snapshot=False
)
else:
network = test_recover_service(network, args, from_snapshot=False)
for node in network.get_joined_nodes():
node.verify_certificate_validity_period()
check_snapshots(args, network)
ref_msg = get_and_verify_historical_receipt(network, ref_msg)
LOG.success("Recovery complete on all nodes")
primary, _ = network.find_primary()
network.stop_all_nodes()
# Verify that a new ledger chunk was created at the start of each recovery
ledger = ccf.ledger.Ledger(
primary.remote.ledger_paths(),
committed_only=False,
validator=ccf.ledger.LedgerValidator(accept_deprecated_entry_types=False),
)
for chunk in ledger:
chunk_start_seqno, _ = chunk.get_seqnos()
for tx in chunk:
tables = tx.get_public_domain().get_tables()
seqno = tx.get_public_domain().get_seqno()
if ccf.ledger.SERVICE_INFO_TABLE_NAME in tables:
service_status = json.loads(
tables[ccf.ledger.SERVICE_INFO_TABLE_NAME][
ccf.ledger.WELL_KNOWN_SINGLETON_TABLE_KEY
]
)["status"]
if service_status == "Opening" or service_status == "Recovering":
LOG.info(
f"New ledger chunk found for service {service_status.lower()} at {seqno}"
)
assert (
chunk_start_seqno == seqno
), f"{service_status} service at seqno {seqno} did not start a new ledger chunk (started at {chunk_start_seqno})"
test_recover_service_with_expired_cert(args)
if __name__ == "__main__":
def add(parser):
parser.description = """
This test_recover_service executes multiple recoveries,
with a fixed number of messages applied between each network crash (as
specified by the "--msgs-per-recovery" arg). After the network is recovered
and before applying new transactions, all transactions previously applied are
checked. Note that the key for each logging message is unique (per table).
"""
parser.add_argument(
"--msgs-per-recovery",
help="Number of public and private messages between two recoveries",
type=int,
default=5,
)
parser.add_argument(
"--with-load",
help="If set, the service is loaded before being recovered",
action="store_true",
default=False,
)
cr = ConcurrentRunner(add)
cr.add(
"recovery",
run,
package="samples/apps/logging/liblogging",
nodes=infra.e2e_args.min_nodes(cr.args, f=1),
ledger_chunk_bytes="50KB",
snapshot_tx_interval=30,
)
# Note: `run_corrupted_ledger` runs with very a specific node configuration
# so that the contents of recovered (and tampered) ledger chunks
# can be dictated by the test. In particular, the signature interval is large
# enough to create in-progress ledger files that do not end on a signature. The
# test is also in control of the ledger chunking.
cr.add(
"recovery_corrupt_ledger",
run_corrupted_ledger,
package="samples/apps/logging/liblogging",
nodes=infra.e2e_args.min_nodes(cr.args, f=0), # 1 node suffices for recovery
sig_ms_interval=1000,
ledger_chunk_bytes="1GB",
snapshot_tx_interval=1000000,
)
cr.run()