RaftDriver: Always loop until sync is achieved at the end of a scenario (#6009)

This commit is contained in:
Eddy Ashton 2024-02-20 09:09:42 +00:00 коммит произвёл GitHub
Родитель 5e8dfdab92
Коммит 2c739c6559
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 167 добавлений и 54 удалений

Просмотреть файл

@ -78,7 +78,7 @@ int main(int argc, char** argv)
if (in.starts_with("===="))
{
// Terminate early if four or more '=' appear on a line.
return 0;
break;
}
#ifdef CCF_RAFT_TRACING
if (!line.empty())
@ -285,6 +285,10 @@ int main(int argc, char** argv)
assert(items.size() == 2);
driver->create_new_node(items[1]);
break;
case shash("loop_until_sync"):
assert(items.size() == 1);
driver->loop_until_sync(lineno);
break;
case shash(""):
// Ignore empty lines
skip_invariants = true;
@ -302,5 +306,8 @@ int main(int argc, char** argv)
++lineno;
}
// Confirm path to liveness from final state
driver->loop_until_sync(lineno);
return 0;
}

Просмотреть файл

@ -855,8 +855,7 @@ public:
}
}
std::optional<std::pair<aft::Term, ccf::NodeId>> find_primary_in_term(
const std::string& term_s, const size_t lineno)
std::vector<std::pair<aft::Term, ccf::NodeId>> find_primaries()
{
std::vector<std::pair<aft::Term, ccf::NodeId>> primaries;
for (const auto& [node_id, node_driver] : _nodes)
@ -866,6 +865,13 @@ public:
primaries.emplace_back(node_driver.raft->get_view(), node_id);
}
}
return primaries;
}
std::optional<std::pair<aft::Term, ccf::NodeId>> find_primary_in_term(
const std::string& term_s, const size_t lineno)
{
auto primaries = find_primaries();
if (term_s == "latest")
{
@ -989,41 +995,44 @@ public:
}
}
void assert_state_sync(const size_t lineno)
using Discrepancies = std::map<ccf::NodeId, std::vector<std::string>>;
Discrepancies check_state_sync(const std::map<ccf::NodeId, NodeDriver> nodes)
{
auto [target_id, nd] = *_nodes.begin();
Discrepancies discrepancies;
if (nodes.empty())
{
return discrepancies;
}
auto [target_id, nd] = *nodes.begin();
auto& target_raft = nd.raft;
const auto target_term = target_raft->get_view();
const auto target_last_idx = target_raft->get_last_idx();
const auto target_commit_idx = target_raft->get_committed_seqno();
bool all_match = true;
for (auto it = std::next(_nodes.begin()); it != _nodes.end(); ++it)
for (auto it = std::next(nodes.begin()); it != nodes.end(); ++it)
{
const auto& node_id = it->first;
auto& raft = it->second.raft;
if (raft->get_view() != target_term)
{
RAFT_DRIVER_PRINT(
"Note over {}: Term {} doesn't match term {} on {}",
node_id,
discrepancies[node_id].push_back(fmt::format(
"Term {} doesn't match term {} on {}",
raft->get_view(),
target_term,
target_id);
all_match = false;
target_id));
}
if (raft->get_last_idx() != target_last_idx)
{
RAFT_DRIVER_PRINT(
"Note over {}: Last index {} doesn't match "
"last index {} on {}",
node_id,
discrepancies[node_id].push_back(fmt::format(
"Last index {} doesn't match last index {} on {}",
raft->get_last_idx(),
target_last_idx,
target_id);
all_match = false;
target_id));
}
else
{
@ -1033,9 +1042,8 @@ public:
const auto target_entry = target_raft->ledger->get_entry_by_idx(idx);
if (!target_entry.has_value())
{
RAFT_DRIVER_PRINT(
"Note over {}: Missing ledger entry at {}", target_id, idx);
all_match = false;
discrepancies[node_id].push_back(
fmt::format("Missing ledger entry at {}", idx));
break;
}
else
@ -1043,22 +1051,18 @@ public:
const auto entry = raft->ledger->get_entry_by_idx(idx);
if (!entry.has_value())
{
RAFT_DRIVER_PRINT(
"Note over {}: Missing ledger entry at {}", node_id, idx);
all_match = false;
discrepancies[node_id].push_back(
fmt::format("Missing ledger entry at {}", idx));
break;
}
else if (entry != target_entry)
{
RAFT_DRIVER_PRINT(
"Note over {}: Entry at index {} "
"doesn't match entry on {}: {} != {}",
node_id,
discrepancies[node_id].push_back(fmt::format(
"Entry at index {} doesn't match entry on {}: {} != {}",
idx,
target_id,
stringify(entry.value()),
stringify(target_entry.value()));
all_match = false;
stringify(target_entry.value())));
break;
}
}
@ -1067,24 +1071,112 @@ public:
if (raft->get_committed_seqno() != target_commit_idx)
{
RAFT_DRIVER_PRINT(
"Note over {}: Commit index {} doesn't "
"match commit index {} on {}",
node_id,
discrepancies[node_id].push_back(fmt::format(
"Commit index {} doesn't match commit index {} on {}",
raft->get_committed_seqno(),
target_commit_idx,
target_id);
all_match = false;
target_id));
}
}
if (!all_match)
return discrepancies;
}
void print_discrepancies(const Discrepancies& discrepancies)
{
for (const auto& [node_id, reasons] : discrepancies)
{
for (const auto& reason : reasons)
{
RAFT_DRIVER_PRINT("Note over {}: {}", node_id, reason);
}
}
}
void assert_state_sync(const size_t lineno)
{
const auto discrepancies = check_state_sync(_nodes);
if (!discrepancies.empty())
{
print_discrepancies(discrepancies);
throw std::runtime_error(fmt::format(
"States not in sync on line {}", std::to_string((int)lineno)));
}
}
void loop_until_sync(const size_t lineno)
{
std::pair<aft::Term, ccf::NodeId> term_primary;
{
// Find primary in highest term
auto primaries = find_primaries();
if (primaries.size() > 0)
{
std::sort(primaries.begin(), primaries.end());
term_primary = primaries.back();
}
else
{
// If no primary exists, try to create one? No such scenario, so far
throw std::runtime_error(
fmt::format("Can't currently loop until sync, no primary"));
}
}
const auto& [term, primary] = term_primary;
// Don't try to confirm sync with retired nodes
decltype(_nodes) nodes;
for (auto& [node_id, node_driver] : _nodes)
{
if (node_driver.raft->is_active())
{
nodes[node_id] = node_driver;
}
else
{
RAFT_DRIVER_PRINT(
"Note over {}: Ignoring from sync check, due to retirement", node_id);
}
}
// Emit a fresh signature on that primary (so that they can advance commit)
emit_signature(std::to_string(term), lineno);
// Reconnect all nodes
for (const auto& [node_id, _] : nodes)
{
reconnect_node(node_id);
}
// Loop, doing periodic and dispatch, until sync.
// Can make iterations higher if any scenario actually needs more
auto iterations = 0;
static constexpr auto max_iterations = 20;
while (true)
{
periodic_one(primary, ms(10));
dispatch_all();
auto discrepancies = check_state_sync(nodes);
if (discrepancies.empty())
{
break;
}
if (++iterations >= max_iterations)
{
print_discrepancies(discrepancies);
throw std::logic_error(fmt::format(
"Failed to reach state sync after {} loop iterations", iterations));
}
}
}
void assert_commit_safety(ccf::NodeId node_id, const size_t lineno)
{
// Confirm that the index this node considers committed, is present on a

Просмотреть файл

@ -114,7 +114,7 @@ LOCAL OrderDropConsecutiveMessages(server) ==
LOCAL OrderDropMessage(server, Test(_)) ==
\E i \in { idx \in 1..Len(messages[server]) : Test(messages[server][idx]) }:
messages' = [ messages EXCEPT ![server] = RemoveAt(messages[server], i) ]
messages' = [ messages EXCEPT ![server] = RemoveAt(@, i) ]
----------------------------------------------------------------------------------
\* Point-to-Point Ordering and no duplication of messages:

Просмотреть файл

@ -18,6 +18,13 @@ ToMembershipState ==
"Active" :> {Active} @@
"Retired" :> {RetirementOrdered, RetirementSigned, RetirementCompleted}
IsHeader(msg, dst, src, logline, type) ==
/\ msg.type = type
/\ msg.type = RaftMsgType[logline.msg.packet.msg]
/\ msg.dest = dst
/\ msg.source = src
/\ msg.term = logline.msg.packet.term
IsAppendEntriesRequest(msg, dst, src, logline) ==
(*
| ccfraft.tla | json | raft.h |
@ -31,11 +38,7 @@ IsAppendEntriesRequest(msg, dst, src, logline) ==
| | .term_of_idx | term_of_idx |
| | .contains_new_view | contains_new_view |
*)
/\ msg.type = AppendEntriesRequest
/\ msg.type = RaftMsgType[logline.msg.packet.msg]
/\ msg.dest = dst
/\ msg.source = src
/\ msg.term = logline.msg.packet.term
/\ IsHeader(msg, dst, src, logline, AppendEntriesRequest)
/\ msg.commitIndex = logline.msg.packet.leader_commit_idx
/\ msg.prevLogTerm = logline.msg.packet.prev_term
/\ Len(msg.entries) = logline.msg.packet.idx - logline.msg.packet.prev_idx
@ -43,15 +46,30 @@ IsAppendEntriesRequest(msg, dst, src, logline) ==
/\ msg.prevLogIndex = logline.msg.packet.prev_idx
IsAppendEntriesResponse(msg, dst, src, logline) ==
/\ msg.type = AppendEntriesResponse
/\ msg.type = RaftMsgType[logline.msg.packet.msg]
/\ msg.dest = dst
/\ msg.source = src
/\ msg.term = logline.msg.packet.term
/\ IsHeader(msg, dst, src, logline, AppendEntriesResponse)
\* raft_types.h enum AppendEntriesResponseType
/\ msg.success = (logline.msg.packet.success = "OK")
/\ msg.lastLogIndex = logline.msg.packet.last_log_idx
IsRequestVoteRequest(msg, dst, src, logline) ==
/\ IsHeader(msg, dst, src, logline, RequestVoteRequest)
/\ msg.lastCommittableIndex = logline.msg.packet.last_committable_idx
/\ msg.lastCommittableTerm = logline.msg.packet.term_of_last_committable_idx
IsRequestVoteResponse(msg, dst, src, logline) ==
/\ IsHeader(msg, dst, src, logline, RequestVoteResponse)
/\ msg.voteGranted = logline.msg.packet.vote_granted
IsProposeVoteRequest(msg, dst, src, logline) ==
/\ IsHeader(msg, dst, src, logline, ProposeVoteRequest)
IsMessage(msg, dst, src, logline) ==
CASE msg.type = AppendEntriesResponse -> IsAppendEntriesResponse(msg, dst, src, logline)
[] msg.type = AppendEntriesRequest -> IsAppendEntriesRequest(msg, dst, src, logline)
[] msg.type = RequestVoteRequest -> IsRequestVoteRequest(msg, dst, src, logline)
[] msg.type = RequestVoteResponse -> IsRequestVoteResponse(msg, dst, src, logline)
[] msg.type = ProposeVoteRequest -> IsProposeVoteRequest(msg, dst, src, logline)
-------------------------------------------------------------------------------------
\* Trace validation has been designed for TLC running in default model-checking
@ -59,7 +77,7 @@ IsAppendEntriesResponse(msg, dst, src, logline) ==
ASSUME TLCGet("config").mode = "bfs"
JsonFile ==
IF "JSON" \in DOMAIN IOEnv THEN IOEnv.JSON ELSE "../../build/startup.ndjson"
IF "JSON" \in DOMAIN IOEnv THEN IOEnv.JSON ELSE "../../tests/raft_scenarios/bad_network.ndjson"
JsonLog ==
\* Deserialize the System log as a sequence of records from the log file.
@ -131,11 +149,7 @@ IsEvent(e) ==
IsDropPendingTo ==
/\ IsEvent("drop_pending_to")
/\ Network!DropMessage(logline.msg.to_node_id,
LAMBDA msg:
/\ msg.type = RaftMsgType[logline.msg.packet.msg]
/\ msg.dest = logline.msg.to_node_id
/\ msg.source = logline.msg.from_node_id
)
LAMBDA msg: IsMessage(msg, logline.msg.to_node_id, logline.msg.from_node_id, logline))
/\ UNCHANGED <<reconfigurationVars, serverVars, candidateVars, leaderVars, logVars>>
IsTimeout ==