CCF/tests/partitions_test.py

584 строки
22 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.network
import infra.net
import infra.interfaces
import infra.e2e_args
import infra.partitions
import infra.logging_app as app
import suite.test_requirements as reqs
from datetime import datetime, timedelta
from infra.checker import check_can_progress, check_does_not_progress
import pprint
from infra.tx_status import TxStatus
import time
import http
import contextlib
import ccf.ledger
from loguru import logger as LOG
from math import ceil
@reqs.description("Invalid partitions are not allowed")
def test_invalid_partitions(network, args):
nodes = network.get_joined_nodes()
try:
network.partitioner.partition(
[nodes[0], nodes[2]],
[nodes[1], nodes[2]],
)
assert False, "Node should not appear in two or more partitions"
except ValueError:
pass
try:
network.partitioner.partition()
assert False, "At least one partition should be specified"
except ValueError:
pass
try:
invalid_local_node_id = -1
new_node = infra.node.Node(invalid_local_node_id, "local://localhost")
network.partitioner.partition([new_node])
assert False, "All nodes should belong to network"
except ValueError:
pass
return network
@reqs.description("Partition primary + f nodes")
def test_partition_majority(network, args):
primary, backups = network.find_nodes()
# Create a partition with primary + half remaining nodes (i.e. majority)
partition = [primary]
partition.extend(backups[len(backups) // 2 :])
# Wait for all nodes to be have reached the same level of commit, so that
# nodes outside of partition can become primary after this one is dropped
network.wait_for_all_nodes_to_commit(primary=primary)
# The primary should remain stable while the partition is active
# Note: Context manager
initial_view = None
with network.partitioner.partition(partition):
try:
network.wait_for_new_primary(primary)
assert False, "No new primary should be elected when partitioning majority"
except TimeoutError:
LOG.info("No new primary, as expected")
with primary.client() as c:
res = c.get("/node/network") # Well-known read-only endpoint
body = res.body.json()
initial_view = body["current_view"]
# The partitioned nodes will have called elections, increasing their view.
# When the partition is lifted, the nodes must elect a new leader, in at least this
# increased term. The winning node could come from either partition, and could even
# be the original primary.
network.wait_for_primary_unanimity(min_view=initial_view)
return network
@reqs.description("Isolate primary from one backup")
@reqs.exactly_n_nodes(3)
def test_isolate_primary_from_one_backup(network, args):
p, backups = network.find_nodes()
b_0, b_1 = backups
# Issue one transaction, waiting for all nodes to be have reached
# the same level of commit, so that nodes outside of partition can
# become primary after this one is dropped
# Note: Because of https://github.com/microsoft/CCF/issues/2224, we need to
# issue a write transaction instead of just reading the TxID of the latest entry
initial_txid = network.txs.issue(network)
# Isolate first backup from primary so that first backup becomes candidate
# in a new term and wins the election
# Note: Managed manually
rules = network.partitioner.isolate_node(p, b_0)
LOG.info(
f"Check that primary {p.local_node_id} reports increasing last ack time for partitioned backup {b_0.local_node_id}"
)
last_ack = 0
while True:
with p.client() as c:
r = c.get("/node/consensus", log_capture=[]).body.json()["details"]
ack = r["acks"][b_0.node_id]["last_received_ms"]
if r["primary_id"] is not None:
assert (
ack >= last_ack
), f"Nodes {p.local_node_id} and {b_0.local_node_id} are no longer partitioned"
last_ack = ack
else:
LOG.debug(f"Node {p.local_node_id} is no longer primary")
break
time.sleep(0.1)
# Now wait for several elections to occur. We expect:
# - b_0 to call and win an election with b_1's help
# - b_0 to produce a new signature, and commit it with b_1's help
# - p to call its own election, and lose because it doesn't have this signature
# - In the resulting election race:
# - If p calls first, it loses and we're in the same situation
# - If b_0 calls first, it wins, but then p calls its election and we've returned to the same situation
# - If b_1 calls first, it can win and then bring _both_ nodes up-to-date, becoming a _stable_ primary
# So we repeat elections until b_1 is primary
new_primary = network.wait_for_primary_unanimity(
min_view=initial_txid.view, timeout_multiplier=30
)
assert new_primary == b_1
new_view = network.txs.issue(network).view
# The partition is now between 2 backups, but both can talk to the new primary
# Explicitly drop rules before continuing
rules.drop()
LOG.info(f"Check that new primary {new_primary.local_node_id} reports stable acks")
last_ack = 0
end_time = time.time() + 2 * network.args.election_timeout_ms // 1000
while time.time() < end_time:
with new_primary.client() as c:
acks = c.get("/node/consensus", log_capture=[]).body.json()["details"][
"acks"
]
delayed_acks = [
ack
for ack in acks.values()
if ack["last_received_ms"] > args.election_timeout_ms
]
if delayed_acks:
raise RuntimeError(f"New primary reported some delayed acks: {acks}")
time.sleep(0.1)
# Original primary should now, or very soon, report the new primary
new_primary_, new_view_ = network.wait_for_new_primary(p, nodes=[p])
assert (
new_primary == new_primary_
), f"New primary {new_primary_.local_node_id} after partition is dropped is different than before {new_primary.local_node_id}"
assert (
new_view == new_view_
), f"Consensus view {new_view} should not have changed after partition is dropped: now {new_view_}"
return network
@reqs.description("Isolate and reconnect primary")
def test_isolate_and_reconnect_primary(network, args, **kwargs):
primary, backups = network.find_nodes()
with network.partitioner.partition(backups):
lost_tx_resp = check_does_not_progress(primary)
new_primary, _ = network.wait_for_new_primary(
primary, nodes=backups, timeout_multiplier=6
)
new_tx_resp = check_can_progress(new_primary)
# Check reconnected former primary has caught up
with primary.client() as c:
try:
# There will be at least one full election cycle for nothing, where the
# re-joining node fails to get elected but causes others to rev up their
# term. After that, a successful election needs to take place, and we
# arbitrarily allow 3 time periods to avoid being too brittle when
# raft timeouts line up badly.
c.wait_for_commit(new_tx_resp, timeout=(network.election_duration * 4))
except TimeoutError:
details = c.get("/node/consensus").body.json()
assert (
False
), f"Stuck before {new_tx_resp.view}.{new_tx_resp.seqno}: {pprint.pformat(details)}"
# Check it has dropped anything submitted while partitioned
r = c.get(f"/node/tx?transaction_id={lost_tx_resp.view}.{lost_tx_resp.seqno}")
status = TxStatus(r.body.json()["status"])
assert status == TxStatus.Invalid, r
@reqs.description("New joiner helps liveness")
@reqs.exactly_n_nodes(3)
def test_new_joiner_helps_liveness(network, args):
primary, backups = network.find_nodes()
# Issue some transactions, so there is a ledger history that a new node must receive
network.txs.issue(network, number_txs=10)
# Remove a node, leaving the network frail
network.retire_node(primary, backups[-1])
backups[-1].stop()
primary, backups = network.find_nodes()
with contextlib.ExitStack() as stack:
# Add a new node, but partition them before trusting them
new_node = network.create_node("local://localhost")
network.join_node(new_node, args.package, args, from_snapshot=False)
new_joiner_partition = [new_node]
new_joiner_rules = stack.enter_context(
network.partitioner.partition([primary, *backups], new_joiner_partition)
)
# Trust the new node, and wait for commit of this (but don't ask the new node itself, which doesn't know this yet)
network.trust_node(new_node, args, no_wait=True)
check_can_progress(primary)
# Partition the primary, temporarily creating a minority service that cannot make progress
minority_partition = backups[len(backups) // 2 :] + new_joiner_partition
minority_rules = stack.enter_context(
network.partitioner.partition(minority_partition)
)
# This is an unusual situation, where we've actually produced a dead partitioned node.
# Initially any write requests will timeout (failed attempt at forwarding), and then
# the node transitions to a candidate with nobody to talk to. Rather than trying to
# catch the errors of these states quickly, we just sleep until the latter state is
# reached, and then confirm it was reached.
time.sleep(network.observed_election_duration)
with backups[0].client("user0") as c:
r = c.post("/app/log/private", {"id": 42, "msg": "Hello world"})
assert r.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE
# Restore the new node to the service
new_joiner_rules.drop()
# Confirm that the new node catches up, and progress can be made in this majority partition
network.wait_for_new_primary(primary, minority_partition)
check_can_progress(new_node)
# Explicitly drop rules before continuing
minority_rules.drop()
network.wait_for_primary_unanimity()
primary, _ = network.find_nodes()
network.wait_for_all_nodes_to_commit(primary=primary)
@reqs.description("Test node-to-node channel behaviour once certs have expired")
@reqs.exactly_n_nodes(3)
def test_expired_certs(network, args):
primary, (backup_a, backup_b) = network.find_nodes()
def set_certs(from_days_diff, validity_period_days, nodes):
valid_from = str(
infra.crypto.datetime_to_X509time(
datetime.utcnow() + timedelta(days=from_days_diff)
)
)
for node in nodes:
network.consortium.set_node_certificate_validity(
primary,
node,
valid_from=valid_from,
validity_period_days=validity_period_days,
)
node.set_certificate_validity_period(
valid_from,
validity_period_days,
)
# Wait for this node to receive this updated cert, and start advertising it
timeout = 2
end_time = time.time() + timeout
while True:
try:
node.verify_certificate_validity_period()
LOG.info("Successfully updated cert")
break
except ValueError as ve:
LOG.warning(f"Cert is still old value: {ve}")
assert (
time.time() < end_time
), f"Cert has not been updated after {timeout}s"
time.sleep(0.2)
# Expired cert is only an issue on channel creation.
# Force channel creation by partitioning to cause controlled election.
with contextlib.ExitStack() as stack:
# Partition backup_b from others
with network.partitioner.partition([backup_b]):
# Advance state, committed by presence on primary and backup_a
with primary.client("user0") as c:
r = c.post("/app/log/private", {"id": 42, "msg": "hello world"})
assert r.status_code == http.HTTPStatus.OK, r
c.wait_for_commit(r)
# Expire the certs of primary and backup_a - these are the only viable
# candidates due to the newly committed suffix
# NB: Once we start doing this, speaking to these nodes is tricky, because
# client auth will also fail => disable ca verification
primary.verify_ca_by_default = False
backup_a.verify_ca_by_default = False
set_certs(
from_days_diff=-30, validity_period_days=7, nodes=(primary, backup_a)
)
# Partition primary, so that backup_a is only viable candidate, and must try
# to create channels to backup_b
stack.enter_context(network.partitioner.partition([primary]))
# Restore connectivity between backups and wait for election
network.wait_for_primary_unanimity(nodes=[backup_a, backup_b], min_view=r.view)
# Should now be able to make progress
check_can_progress(backup_a)
# Restore connectivity with primary, an election may or may not happen
network.wait_for_primary_unanimity(min_view=r.view)
# Set valid node certs so that future clients can speak to these nodes
set_certs(from_days_diff=-1, validity_period_days=7, nodes=(primary, backup_a))
# Can now speak to these again
primary.verify_ca_by_default = True
backup_a.verify_ca_by_default = True
return network
@reqs.description("Test election while reconfiguration is in flight")
@reqs.at_least_n_nodes(3)
def test_election_reconfiguration(network, args):
# Test for issue described in https://github.com/microsoft/CCF/issues/3948
# Note: this test makes use of node-endorsed secondary RPC interface since
# new nodes never observe commit of their configuration and thus never
# open their service-endorsed primary RPC interface.
primary, backups = network.find_nodes()
LOG.info("Join new nodes without trusting them just yet")
new_nodes = []
# Start N+1 new nodes to make sure they cannot elect one of them as a primary
# without approval from the original configuration
for _ in range(len(network.nodes) + 1):
rpc_interfaces = {
infra.interfaces.PRIMARY_RPC_INTERFACE: infra.interfaces.RPCInterface(
host="localhost"
)
}
rpc_interfaces.update(infra.interfaces.make_secondary_interface())
new_node = network.create_node(infra.interfaces.HostSpec(rpc_interfaces))
network.join_node(new_node, args.package, args, from_snapshot=False)
new_nodes.append(new_node)
# Wait until all backups know about these joins, so they have an equal chance of
# becoming primary afterwards
network.wait_for_node_commit_sync()
LOG.info("Isolate original backups and issue reconfiguration of another quorum")
# Partition backups _from each other_
with network.partitioner.partitions([backup] for backup in backups):
LOG.info("Trust all new nodes in one single proposal")
# Note: Commit is stuck since a majority of backups in initial configuration
# are isolated
network.consortium.trust_nodes(
primary,
[n.node_id for n in new_nodes],
valid_from=datetime.utcnow(),
wait_for_global_commit=False,
)
for node in new_nodes:
node.wait_for_node_to_join(
interface_name=infra.interfaces.SECONDARY_RPC_INTERFACE
)
# Wait for configuration tx to be replicated to new node
network.wait_for_node_in_store(
node,
node.node_id,
ccf.ledger.NodeStatus.TRUSTED,
interface_name=infra.interfaces.SECONDARY_RPC_INTERFACE,
)
LOG.info(f"Stop primary node {primary.local_node_id} to trigger election")
primary.stop()
LOG.info(
"Make sure that new nodes cannot elect a primary node among themselves"
)
try:
network.wait_for_new_primary(
primary,
nodes=new_nodes,
interface_name=infra.interfaces.SECONDARY_RPC_INTERFACE,
timeout_multiplier=3,
)
except infra.network.PrimaryNotFound:
LOG.info(
"As expected, new primary could not be elected as old configuration could not make progress"
)
else:
assert False, "No new primary should be elected while partition is up"
LOG.info("Stop all new nodes")
for node in new_nodes:
node.stop()
LOG.info(
"As partition is lifted, check that isolated original backups elect primary"
)
network.wait_for_primary_unanimity(nodes=backups)
LOG.info("Retire former primary and add new node")
network.retire_node(backups[0], primary)
new_node = network.create_node("local://localhost")
network.join_node(new_node, args.package, args, from_snapshot=False)
network.trust_node(new_node, args)
return network
@reqs.description("Add a learner, partition nodes, check that there is no progress")
def test_learner_does_not_take_part(network, args):
primary, backups = network.find_nodes()
f_backups = backups[: network.get_f() + 1]
# Note: host is supplied explicitly to avoid having differently
# assigned IPs for the interfaces, something which the test infra doesn't
# support widely yet.
operator_rpc_interface = "operator_rpc_interface"
host = infra.net.expand_localhost()
new_node = network.create_node(
infra.interfaces.HostSpec(
rpc_interfaces={
infra.interfaces.PRIMARY_RPC_INTERFACE: infra.interfaces.RPCInterface(
host=host
),
operator_rpc_interface: infra.interfaces.RPCInterface(
host=host,
endorsement=infra.interfaces.Endorsement(
authority=infra.interfaces.EndorsementAuthority.Node
),
),
}
)
)
network.join_node(new_node, args.package, args, from_snapshot=False)
LOG.info("Wait for all nodes to have committed join of new pending node")
network.wait_for_all_nodes_to_commit(primary=primary)
# Here, we partition a majority of backups. This is very intentional so that
# the new learner node is not promoted to trusted while the partition is up.
# However, this means that the isolated majority of backups can (and will)
# elect one of them as new primary while the partition is up. When the partition
# is lifted, all the transactions executed of the primary node (including
# trusting the new node) will be rolled back. Because of this, we issue a new
# trust_node proposal to make sure the new node ends up being trusted and joins
# successfully.
with network.partitioner.partition(f_backups):
check_does_not_progress(primary, timeout=5)
try:
network.consortium.trust_node(
primary,
new_node.node_id,
timeout=ceil(args.join_timer_s * 2),
valid_from=datetime.now(),
)
except TimeoutError:
LOG.info("Trust node proposal did not commit as expected")
else:
raise Exception("Trust node proposal committed unexpectedly")
check_does_not_progress(primary, timeout=5)
LOG.info("Majority partition can make progress")
partition_primary, _ = network.wait_for_new_primary(primary, nodes=f_backups)
check_can_progress(partition_primary)
LOG.info("New joiner is not promoted to Trusted without f other backups")
with new_node.client(
interface_name=operator_rpc_interface, verify_ca=False
) as c:
r = c.get("/node/network/nodes/self")
assert r.body.json()["status"] == "Learner"
r = c.get("/node/consensus")
assert new_node.node_id in r.body.json()["details"]["learners"]
LOG.info("Partition is lifted, wait for primary unanimity on original nodes")
# Note: Because trusting the new node failed, the new node is not considered
# in the primary unanimity. Indeed, its transition to Trusted may have been rolled back.
primary = network.wait_for_primary_unanimity()
network.wait_for_all_nodes_to_commit(primary=primary)
LOG.info("Trust new joiner again")
network.trust_node(new_node, args)
check_can_progress(primary)
check_can_progress(new_node)
def run_2tx_reconfig_tests(args):
if not args.include_2tx_reconfig:
return
local_args = args
if args.reconfiguration_type != "TwoTransaction":
local_args.reconfiguration_type = "TwoTransaction"
with infra.network.network(
local_args.nodes,
local_args.binary_dir,
local_args.debug_nodes,
local_args.perf_nodes,
pdb=local_args.pdb,
init_partitioner=True,
) as network:
network.start_and_open(local_args)
test_learner_does_not_take_part(network, local_args)
def run(args):
txs = app.LoggingTxs("user0")
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
args.perf_nodes,
pdb=args.pdb,
txs=txs,
init_partitioner=True,
) as network:
network.start_and_open(args)
test_invalid_partitions(network, args)
test_partition_majority(network, args)
test_isolate_primary_from_one_backup(network, args)
test_new_joiner_helps_liveness(network, args)
test_expired_certs(network, args)
for n in range(5):
test_isolate_and_reconnect_primary(network, args, iteration=n)
test_election_reconfiguration(network, args)
if __name__ == "__main__":
def add(parser):
parser.add_argument(
"--include-2tx-reconfig",
help="Include tests for the 2-transaction reconfiguration scheme",
default=False,
action="store_true",
)
args = infra.e2e_args.cli_args(add)
args.package = "samples/apps/logging/liblogging"
args.snapshot_tx_interval = (
20 # Increase snapshot frequency for faster reconfigurations
)
args.nodes = infra.e2e_args.min_nodes(args, f=1)
run(args)
run_2tx_reconfig_tests(args)