зеркало из https://github.com/microsoft/CCF.git
Fix for occasional failures in `recovery_test_cft` (#4225)
This commit is contained in:
Родитель
371534afa1
Коммит
932901ca58
|
@ -891,14 +891,18 @@ class Network:
|
|||
|
||||
def _wait_for_app_open(self, node, timeout=3):
|
||||
end_time = time.time() + timeout
|
||||
logs = []
|
||||
while time.time() < end_time:
|
||||
# As an operator, query a well-known /app endpoint to find out
|
||||
# if the app has been opened to users
|
||||
with node.client() as c:
|
||||
r = c.get("/app/commit")
|
||||
logs = []
|
||||
r = c.get("/app/commit", log_capture=logs)
|
||||
if not (r.status_code == http.HTTPStatus.NOT_FOUND.value):
|
||||
flush_info(logs, None)
|
||||
return
|
||||
time.sleep(0.1)
|
||||
flush_info(logs, None)
|
||||
raise TimeoutError(f"Application frontend was not open after {timeout}s")
|
||||
|
||||
def _get_node_by_service_id(self, node_id):
|
||||
|
|
|
@ -405,13 +405,7 @@ def test_share_resilience(network, args, from_snapshot=False):
|
|||
|
||||
@reqs.description("Recover a service from malformed ledger")
|
||||
@reqs.recover(number_txs=2)
|
||||
def test_recover_service_truncated_ledger(
|
||||
network,
|
||||
args,
|
||||
corrupt_first_tx=False,
|
||||
corrupt_last_tx=False,
|
||||
corrupt_first_sig=False,
|
||||
):
|
||||
def test_recover_service_truncated_ledger(network, args, get_truncation_point):
|
||||
network.save_service_identity(args)
|
||||
old_primary, _ = network.find_primary()
|
||||
|
||||
|
@ -423,51 +417,39 @@ def test_recover_service_truncated_ledger(
|
|||
)
|
||||
current_ledger_path = old_primary.remote.ledger_paths()[0]
|
||||
while True:
|
||||
# NB: This is used as an app agnostic write, nothing to do with the large
|
||||
# size, or trying to force a chunk
|
||||
network.consortium.create_and_withdraw_large_proposal(
|
||||
old_primary, wait_for_commit=True
|
||||
)
|
||||
# A signature will have been emitted by now (wait_for_commit)
|
||||
network.consortium.create_and_withdraw_large_proposal(old_primary)
|
||||
# Wait a little longer so it should have been persisted to disk, but
|
||||
# retry if that has produced a committed chunk
|
||||
time.sleep(0.2)
|
||||
if not all(
|
||||
f.endswith(ccf.ledger.COMMITTED_FILE_SUFFIX)
|
||||
for f in os.listdir(current_ledger_path)
|
||||
):
|
||||
LOG.warning(
|
||||
f"Decided to stop network after looking at ledger dir {current_ledger_path}: {os.listdir(current_ledger_path)}"
|
||||
)
|
||||
break
|
||||
|
||||
network.stop_all_nodes()
|
||||
|
||||
current_ledger_dir, committed_ledger_dirs = old_primary.get_ledger()
|
||||
LOG.warning(
|
||||
f"Ledger dir after stopping node is {current_ledger_dir}: {os.listdir(current_ledger_dir)}"
|
||||
)
|
||||
|
||||
# Corrupt _uncommitted_ ledger before starting new service
|
||||
ledger = ccf.ledger.Ledger([current_ledger_dir], committed_only=False)
|
||||
|
||||
def get_middle_tx_offset(tx):
|
||||
offset, next_offset = tx.get_offsets()
|
||||
return offset + (next_offset - offset) // 2
|
||||
chunk_filename, truncate_offset = get_truncation_point(ledger)
|
||||
|
||||
for chunk in ledger:
|
||||
chunk_filename = chunk.filename()
|
||||
first_tx_offset = None
|
||||
last_tx_offset = None
|
||||
first_sig_offset = None
|
||||
for tx in chunk:
|
||||
tables = tx.get_public_domain().get_tables()
|
||||
if (
|
||||
first_sig_offset is None
|
||||
and ccf.ledger.SIGNATURE_TX_TABLE_NAME in tables
|
||||
):
|
||||
first_sig_offset = get_middle_tx_offset(tx)
|
||||
last_tx_offset = get_middle_tx_offset(tx)
|
||||
if first_tx_offset is None:
|
||||
first_tx_offset = get_middle_tx_offset(tx)
|
||||
assert truncate_offset is not None, "Should always truncate within tx"
|
||||
|
||||
truncated_ledger_file_path = os.path.join(current_ledger_dir, chunk_filename)
|
||||
if corrupt_first_tx:
|
||||
truncate_offset = first_tx_offset
|
||||
elif corrupt_last_tx:
|
||||
truncate_offset = last_tx_offset
|
||||
elif corrupt_first_sig:
|
||||
truncate_offset = first_sig_offset
|
||||
|
||||
with open(truncated_ledger_file_path, "r+", encoding="utf-8") as f:
|
||||
f.truncate(truncate_offset)
|
||||
|
@ -499,14 +481,46 @@ def run_corrupted_ledger(args):
|
|||
txs=txs,
|
||||
) as network:
|
||||
network.start_and_open(args)
|
||||
|
||||
def get_middle_tx_offset(tx):
|
||||
offset, next_offset = tx.get_offsets()
|
||||
return offset + (next_offset - offset) // 2
|
||||
|
||||
def all_txs(ledger, verbose):
|
||||
for chunk in ledger:
|
||||
if verbose:
|
||||
LOG.info(f"Considering chunk {chunk.filename()}")
|
||||
for tx in chunk:
|
||||
if verbose:
|
||||
LOG.info(f"Considering tx {tx.get_tx_digest()}")
|
||||
yield chunk, tx
|
||||
|
||||
def corrupt_first_tx(ledger, verbose=False):
|
||||
LOG.info("Finding first tx to corrupt")
|
||||
for chunk, tx in all_txs(ledger, verbose):
|
||||
return chunk.filename(), get_middle_tx_offset(tx)
|
||||
return None, None
|
||||
|
||||
def corrupt_last_tx(ledger, verbose=False):
|
||||
LOG.info("Finding last tx to corrupt")
|
||||
chunk_filename, truncate_offset = None, None
|
||||
for chunk, tx in all_txs(ledger, verbose):
|
||||
chunk_filename = chunk.filename()
|
||||
truncate_offset = get_middle_tx_offset(tx)
|
||||
return chunk_filename, truncate_offset
|
||||
|
||||
def corrupt_first_sig(ledger, verbose=False):
|
||||
LOG.info("Finding first sig to corrupt")
|
||||
for chunk, tx in all_txs(ledger, verbose):
|
||||
tables = tx.get_public_domain().get_tables()
|
||||
if ccf.ledger.SIGNATURE_TX_TABLE_NAME in tables:
|
||||
return chunk.filename(), get_middle_tx_offset(tx)
|
||||
return None, None
|
||||
|
||||
network = test_recover_service_truncated_ledger(network, args, corrupt_first_tx)
|
||||
network = test_recover_service_truncated_ledger(network, args, corrupt_last_tx)
|
||||
network = test_recover_service_truncated_ledger(
|
||||
network, args, corrupt_first_tx=True
|
||||
)
|
||||
network = test_recover_service_truncated_ledger(
|
||||
network, args, corrupt_last_tx=True
|
||||
)
|
||||
network = test_recover_service_truncated_ledger(
|
||||
network, args, corrupt_first_sig=True
|
||||
network, args, corrupt_first_sig
|
||||
)
|
||||
|
||||
network.stop_all_nodes()
|
||||
|
@ -683,17 +697,15 @@ checked. Note that the key for each logging message is unique (per table).
|
|||
default=False,
|
||||
)
|
||||
|
||||
args = infra.e2e_args.cli_args(add)
|
||||
|
||||
cr = ConcurrentRunner(add)
|
||||
|
||||
cr.add(
|
||||
"recovery",
|
||||
run,
|
||||
package="samples/apps/logging/liblogging",
|
||||
nodes=infra.e2e_args.min_nodes(args, f=1),
|
||||
nodes=infra.e2e_args.min_nodes(cr.args, f=1),
|
||||
ledger_chunk_bytes="50KB",
|
||||
snasphot_tx_interval=30,
|
||||
snapshot_tx_interval=30,
|
||||
)
|
||||
|
||||
# Note: `run_corrupted_ledger` runs with very a specific node configuration
|
||||
|
@ -705,10 +717,10 @@ checked. Note that the key for each logging message is unique (per table).
|
|||
"recovery_corrupt_ledger",
|
||||
run_corrupted_ledger,
|
||||
package="samples/apps/logging/liblogging",
|
||||
nodes=infra.e2e_args.min_nodes(args, f=0), # 1 node suffices for recovery
|
||||
nodes=infra.e2e_args.min_nodes(cr.args, f=0), # 1 node suffices for recovery
|
||||
sig_ms_interval=1000,
|
||||
ledger_chunk_bytes="1GB",
|
||||
snasphot_tx_interval=1000000,
|
||||
snapshot_tx_interval=1000000,
|
||||
)
|
||||
|
||||
cr.run()
|
||||
|
|
Загрузка…
Ссылка в новой задаче