зеркало из https://github.com/microsoft/CCF.git
[release/3.x] Cherry pick: Session Consistency: Turn on enforcement of session consistency across elections (#4611) (#4616)
This commit is contained in:
Родитель
e1d05bb263
Коммит
5aeb4e1ca6
|
@ -156,10 +156,13 @@ namespace ccf
|
|||
create_timeout_error_task(to, client_session_id, timeout), timeout);
|
||||
}
|
||||
|
||||
// NB: This version of the code understands the *_v3 messages, but still
|
||||
// _emits_ only *_v2 messages
|
||||
ForwardedHeader_v2 header = {
|
||||
{ForwardedMsg::forwarded_cmd_v2, rpc_ctx->frame_format()}, command_id};
|
||||
const auto view_opt = session_ctx->active_view;
|
||||
if (!view_opt.has_value())
|
||||
{
|
||||
throw std::logic_error(
|
||||
"Expected active_view to be set before forwarding");
|
||||
}
|
||||
ForwardedCommandHeader_v3 header(command_id, view_opt.value());
|
||||
|
||||
return n2n_channels->send_encrypted(
|
||||
to, NodeMsgType::forwarded_msg, plain, header);
|
||||
|
|
|
@ -506,6 +506,198 @@ def test_forwarding_timeout(network, args):
|
|||
network.wait_for_primary_unanimity()
|
||||
|
||||
|
||||
@reqs.description(
|
||||
"Session consistency is provided, and inconsistencies after elections are replaced by errors"
|
||||
)
|
||||
@reqs.supports_methods("/app/log/private")
|
||||
@reqs.no_http2()
|
||||
def test_session_consistency(network, args):
|
||||
# Ensure we have 5 nodes
|
||||
original_size = network.resize(5, args)
|
||||
|
||||
primary, backups = network.find_nodes()
|
||||
backup = backups[0]
|
||||
|
||||
with contextlib.ExitStack() as stack:
|
||||
client_primary_A = stack.enter_context(
|
||||
primary.client(
|
||||
"user0",
|
||||
description_suffix="A",
|
||||
impl_type=infra.clients.RawSocketClient,
|
||||
)
|
||||
)
|
||||
client_primary_B = stack.enter_context(
|
||||
primary.client(
|
||||
"user0",
|
||||
description_suffix="B",
|
||||
impl_type=infra.clients.RawSocketClient,
|
||||
)
|
||||
)
|
||||
client_backup_C = stack.enter_context(
|
||||
backup.client(
|
||||
"user0",
|
||||
description_suffix="C",
|
||||
impl_type=infra.clients.RawSocketClient,
|
||||
)
|
||||
)
|
||||
client_backup_D = stack.enter_context(
|
||||
backup.client(
|
||||
"user0",
|
||||
description_suffix="D",
|
||||
impl_type=infra.clients.RawSocketClient,
|
||||
)
|
||||
)
|
||||
|
||||
# Create some new state
|
||||
msg_id = 42
|
||||
msg_a = "First write, to primary"
|
||||
r = client_primary_A.post(
|
||||
"/app/log/private",
|
||||
{
|
||||
"id": msg_id,
|
||||
"msg": msg_a,
|
||||
},
|
||||
)
|
||||
assert r.status_code == http.HTTPStatus.OK, r
|
||||
|
||||
# Read this state on a second session
|
||||
r = client_primary_B.get(f"/app/log/private?id={msg_id}")
|
||||
assert r.status_code == http.HTTPStatus.OK, r
|
||||
assert r.body.json()["msg"] == msg_a, r
|
||||
|
||||
# Wait for that to be committed on all backups
|
||||
network.wait_for_all_nodes_to_commit(primary)
|
||||
|
||||
# Write on backup, resulting in a forwarded request.
|
||||
# Confirm that this session can read that write, since it remains forwarded.
|
||||
# Meanwhile a separate session to the same backup node may not see it.
|
||||
# NB: The latter property is not possible to test systematically, as it
|
||||
# relies on a race - does the read on the second session happen before consensus
|
||||
# update's the backup's state. Solution is to try in a loop, with a high probability
|
||||
# that we observe the desired ordering after just a few iterations.
|
||||
n_attempts = 20
|
||||
for i in range(n_attempts):
|
||||
last_message = f"Second write, via backup ({i})"
|
||||
r = client_backup_C.post(
|
||||
"/app/log/private",
|
||||
{
|
||||
"id": msg_id,
|
||||
"msg": last_message,
|
||||
},
|
||||
)
|
||||
assert r.status_code == http.HTTPStatus.OK, r
|
||||
|
||||
r = client_backup_D.get(f"/app/log/private?id={msg_id}")
|
||||
assert r.status_code == http.HTTPStatus.OK, r
|
||||
if r.body.json()["msg"] != last_message:
|
||||
LOG.info(
|
||||
f"Successfully saw a different value on second session after {i} attempts"
|
||||
)
|
||||
break
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"Failed to observe evidence of session forwarding after {n_attempts} attempts"
|
||||
)
|
||||
|
||||
def check_sessions_alive(sessions):
|
||||
for client in sessions:
|
||||
try:
|
||||
r = client.get(f"/app/log/private?id={msg_id}")
|
||||
assert r.status_code == http.HTTPStatus.OK, r
|
||||
except ConnectionResetError as e:
|
||||
raise AssertionError(
|
||||
f"Session {client.description} was killed unexpectedly: {e}"
|
||||
) from e
|
||||
|
||||
def check_sessions_dead(
|
||||
sessions,
|
||||
):
|
||||
for client in sessions:
|
||||
try:
|
||||
r = client.get(f"/app/log/private?id={msg_id}")
|
||||
assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR, r
|
||||
assert r.body.json()["error"]["code"] == "SessionConsistencyLost", r
|
||||
except ConnectionResetError as e:
|
||||
raise AssertionError(
|
||||
f"Session {client.description} was killed without first returning an error: {e}"
|
||||
) from e
|
||||
|
||||
# After returning error, session should be terminated, so all subsequent requests should fail
|
||||
try:
|
||||
client.get("/node/commit")
|
||||
raise AssertionError(
|
||||
f"Session {client.description} survived unexpectedly"
|
||||
)
|
||||
except ConnectionResetError:
|
||||
LOG.info(f"Session {client.description} was terminated as expected")
|
||||
|
||||
def wait_for_new_view(node, original_view, timeout_multiplier):
|
||||
election_s = args.election_timeout_ms / 1000
|
||||
timeout = election_s * timeout_multiplier
|
||||
end_time = time.time() + timeout
|
||||
while time.time() < end_time:
|
||||
with node.client() as c:
|
||||
r = c.get("/node/network")
|
||||
assert r.status_code == http.HTTPStatus.OK, r
|
||||
if r.body.json()["current_view"] > original_view:
|
||||
return
|
||||
time.sleep(0.1)
|
||||
raise TimeoutError(
|
||||
f"Node failed to reach view higher than {original_view} after waiting {timeout}s"
|
||||
)
|
||||
|
||||
# Partition primary and forwarding backup from other backups
|
||||
with network.partitioner.partition([primary, backup]):
|
||||
# Write on partitioned primary
|
||||
msg0 = "Hello world"
|
||||
r0 = client_primary_A.post(
|
||||
"/app/log/private",
|
||||
{
|
||||
"id": msg_id,
|
||||
"msg": msg0,
|
||||
},
|
||||
)
|
||||
assert r0.status_code == http.HTTPStatus.OK
|
||||
|
||||
# Read from partitioned backup, over forwarded session to primary
|
||||
r1 = client_backup_C.get(f"/app/log/private?id={msg_id}")
|
||||
assert r1.status_code == http.HTTPStatus.OK
|
||||
assert r1.body.json()["msg"] == msg0, r1
|
||||
|
||||
# Despite partition, these sessions remain live
|
||||
check_sessions_alive((client_primary_A, client_backup_C))
|
||||
|
||||
# Once CheckQuorum takes effect and the primary stands down, all sessions
|
||||
# on the primary report a risk of inconsistency
|
||||
wait_for_new_view(backup, r0.view, 4)
|
||||
check_sessions_dead(
|
||||
(
|
||||
# This session wrote state which is now at risk of being lost
|
||||
client_primary_A,
|
||||
# This session only read old state which is still valid
|
||||
client_primary_B,
|
||||
# This is also immediately true for forwarded sessions on the backup
|
||||
client_backup_C,
|
||||
)
|
||||
)
|
||||
|
||||
# The backup may not have received any view increment yet, so a non-forwarded
|
||||
# session on the backup may still be valid. This is a temporary, racey situation,
|
||||
# and safe (the backup has not rolled back, and is still reporting state in the
|
||||
# old session).
|
||||
# Test that once the view has advanced, that backup session is also terminated.
|
||||
wait_for_new_view(backup, r0.view, 1)
|
||||
check_sessions_dead((client_backup_D,))
|
||||
|
||||
# Wait for network stability after healing partition
|
||||
network.wait_for_primary_unanimity(min_view=r0.view)
|
||||
|
||||
# Restore original network size
|
||||
network.resize(original_size, args)
|
||||
|
||||
return network
|
||||
|
||||
|
||||
def run_2tx_reconfig_tests(args):
|
||||
if not args.include_2tx_reconfig:
|
||||
return
|
||||
|
@ -550,6 +742,7 @@ def run(args):
|
|||
test_isolate_and_reconnect_primary(network, args, iteration=n)
|
||||
test_election_reconfiguration(network, args)
|
||||
test_forwarding_timeout(network, args)
|
||||
test_session_consistency(network, args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Загрузка…
Ссылка в новой задаче