From 932901ca58721e6dbccf23b7979cb9adc9bf0f8b Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 14 Sep 2022 16:59:49 +0100 Subject: [PATCH] Fix for occasional failures in `recovery_test_cft` (#4225) --- tests/infra/network.py | 6 ++- tests/recovery.py | 102 +++++++++++++++++++++++------------------ 2 files changed, 62 insertions(+), 46 deletions(-) diff --git a/tests/infra/network.py b/tests/infra/network.py index 9a2b32049..48815e995 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -891,14 +891,18 @@ class Network: def _wait_for_app_open(self, node, timeout=3): end_time = time.time() + timeout + logs = [] while time.time() < end_time: # As an operator, query a well-known /app endpoint to find out # if the app has been opened to users with node.client() as c: - r = c.get("/app/commit") + logs = [] + r = c.get("/app/commit", log_capture=logs) if not (r.status_code == http.HTTPStatus.NOT_FOUND.value): + flush_info(logs, None) return time.sleep(0.1) + flush_info(logs, None) raise TimeoutError(f"Application frontend was not open after {timeout}s") def _get_node_by_service_id(self, node_id): diff --git a/tests/recovery.py b/tests/recovery.py index e77d3b890..1de0de580 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -405,13 +405,7 @@ def test_share_resilience(network, args, from_snapshot=False): @reqs.description("Recover a service from malformed ledger") @reqs.recover(number_txs=2) -def test_recover_service_truncated_ledger( - network, - args, - corrupt_first_tx=False, - corrupt_last_tx=False, - corrupt_first_sig=False, -): +def test_recover_service_truncated_ledger(network, args, get_truncation_point): network.save_service_identity(args) old_primary, _ = network.find_primary() @@ -423,51 +417,39 @@ def test_recover_service_truncated_ledger( ) current_ledger_path = old_primary.remote.ledger_paths()[0] while True: + # NB: This is used as an app agnostic write, nothing to do with the large + # size, or trying to force a chunk network.consortium.create_and_withdraw_large_proposal( old_primary, wait_for_commit=True ) # A signature will have been emitted by now (wait_for_commit) - network.consortium.create_and_withdraw_large_proposal(old_primary) + # Wait a little longer so it should have been persisted to disk, but + # retry if that has produced a committed chunk + time.sleep(0.2) if not all( f.endswith(ccf.ledger.COMMITTED_FILE_SUFFIX) for f in os.listdir(current_ledger_path) ): + LOG.warning( + f"Decided to stop network after looking at ledger dir {current_ledger_path}: {os.listdir(current_ledger_path)}" + ) break network.stop_all_nodes() current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger() + LOG.warning( + f"Ledger dir after stopping node is {current_ledger_dir}: {os.listdir(current_ledger_dir)}" + ) # Corrupt _uncommitted_ ledger before starting new service ledger = ccf.ledger.Ledger([current_ledger_dir], committed_only=False) - def get_middle_tx_offset(tx): - offset, next_offset = tx.get_offsets() - return offset + (next_offset - offset) // 2 + chunk_filename, truncate_offset = get_truncation_point(ledger) - for chunk in ledger: - chunk_filename = chunk.filename() - first_tx_offset = None - last_tx_offset = None - first_sig_offset = None - for tx in chunk: - tables = tx.get_public_domain().get_tables() - if ( - first_sig_offset is None - and ccf.ledger.SIGNATURE_TX_TABLE_NAME in tables - ): - first_sig_offset = get_middle_tx_offset(tx) - last_tx_offset = get_middle_tx_offset(tx) - if first_tx_offset is None: - first_tx_offset = get_middle_tx_offset(tx) + assert truncate_offset is not None, "Should always truncate within tx" truncated_ledger_file_path = os.path.join(current_ledger_dir, chunk_filename) - if corrupt_first_tx: - truncate_offset = first_tx_offset - elif corrupt_last_tx: - truncate_offset = last_tx_offset - elif corrupt_first_sig: - truncate_offset = first_sig_offset with open(truncated_ledger_file_path, "r+", encoding="utf-8") as f: f.truncate(truncate_offset) @@ -499,14 +481,46 @@ def run_corrupted_ledger(args): txs=txs, ) as network: network.start_and_open(args) + + def get_middle_tx_offset(tx): + offset, next_offset = tx.get_offsets() + return offset + (next_offset - offset) // 2 + + def all_txs(ledger, verbose): + for chunk in ledger: + if verbose: + LOG.info(f"Considering chunk {chunk.filename()}") + for tx in chunk: + if verbose: + LOG.info(f"Considering tx {tx.get_tx_digest()}") + yield chunk, tx + + def corrupt_first_tx(ledger, verbose=False): + LOG.info("Finding first tx to corrupt") + for chunk, tx in all_txs(ledger, verbose): + return chunk.filename(), get_middle_tx_offset(tx) + return None, None + + def corrupt_last_tx(ledger, verbose=False): + LOG.info("Finding last tx to corrupt") + chunk_filename, truncate_offset = None, None + for chunk, tx in all_txs(ledger, verbose): + chunk_filename = chunk.filename() + truncate_offset = get_middle_tx_offset(tx) + return chunk_filename, truncate_offset + + def corrupt_first_sig(ledger, verbose=False): + LOG.info("Finding first sig to corrupt") + for chunk, tx in all_txs(ledger, verbose): + tables = tx.get_public_domain().get_tables() + if ccf.ledger.SIGNATURE_TX_TABLE_NAME in tables: + return chunk.filename(), get_middle_tx_offset(tx) + return None, None + + network = test_recover_service_truncated_ledger(network, args, corrupt_first_tx) + network = test_recover_service_truncated_ledger(network, args, corrupt_last_tx) network = test_recover_service_truncated_ledger( - network, args, corrupt_first_tx=True - ) - network = test_recover_service_truncated_ledger( - network, args, corrupt_last_tx=True - ) - network = test_recover_service_truncated_ledger( - network, args, corrupt_first_sig=True + network, args, corrupt_first_sig ) network.stop_all_nodes() @@ -683,17 +697,15 @@ checked. Note that the key for each logging message is unique (per table). default=False, ) - args = infra.e2e_args.cli_args(add) - cr = ConcurrentRunner(add) cr.add( "recovery", run, package="samples/apps/logging/liblogging", - nodes=infra.e2e_args.min_nodes(args, f=1), + nodes=infra.e2e_args.min_nodes(cr.args, f=1), ledger_chunk_bytes="50KB", - snasphot_tx_interval=30, + snapshot_tx_interval=30, ) # Note: `run_corrupted_ledger` runs with very a specific node configuration @@ -705,10 +717,10 @@ checked. Note that the key for each logging message is unique (per table). "recovery_corrupt_ledger", run_corrupted_ledger, package="samples/apps/logging/liblogging", - nodes=infra.e2e_args.min_nodes(args, f=0), # 1 node suffices for recovery + nodes=infra.e2e_args.min_nodes(cr.args, f=0), # 1 node suffices for recovery sig_ms_interval=1000, ledger_chunk_bytes="1GB", - snasphot_tx_interval=1000000, + snapshot_tx_interval=1000000, ) cr.run()