Raft fixes: Rollback safety despite NACKs, and only ACK up to incoming AE (#6016)

This commit is contained in:
Eddy Ashton 2024-02-15 17:43:46 +00:00 коммит произвёл GitHub
Родитель 74326effa7
Коммит 443309e10d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
10 изменённых файлов: 499 добавлений и 66 удалений

Просмотреть файл

@ -2,4 +2,4 @@
(- -) (= =) | Y & +--?
( V ) / . \ | +---=---'
/--x-m- /--n-n---xXx--/--yY------>>>----<<<>>]]{{}}---||-/\---..
2024
2024!

Просмотреть файл

@ -1 +1 @@
.............
..............

Просмотреть файл

@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## Unreleased
### Fixed
- Improvements to the Raft implementation, to retain commit safety and liveness despite message loss (#6016).
### Added
- Added 2 new log lines which may be helpful diagnostics in production deployments, both including a new `[rollback]` tag.
- `[rollback] ... Dropping conflicting branch` may be emitted after network partitions, and indicates that some `Pending` (non-committed) transactions have been lost. This is expected, but worth investigating if it occurs regularly - it is a sign of elections impacting service availability.
- `[rollback] ... Ignoring conflicting AppendEntries` could also be emitted after a network partition, but **should be reported to the CCF development team**. It is a sign of an unexpected execution path, which could lead to loss of liveness (inability to advance commit).
## [5.0.0-dev12]
[5.0.0-dev12]: https://github.com/microsoft/CCF/releases/tag/ccf-5.0.0-dev12

Просмотреть файл

@ -63,6 +63,8 @@
#define RAFT_TRACE_JSON_OUT(json_object) \
CCF_LOG_OUT(DEBUG, "raft_trace") << json_object
#define LOG_ROLLBACK_INFO_FMT CCF_LOG_FMT(INFO, "rollback")
namespace aft
{
using Configuration = kv::Configuration;
@ -1061,7 +1063,7 @@ namespace aft
from,
state->current_view,
r.term);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
send_append_entries_response_nack(from);
return;
}
@ -1082,7 +1084,7 @@ namespace aft
state->node_id,
from,
r.prev_idx);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
send_append_entries_response_nack(from);
}
else
{
@ -1095,8 +1097,7 @@ namespace aft
prev_term,
r.prev_term);
const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
send_append_entries_response(
from, AppendEntriesResponseType::FAIL, rejected_tx);
send_append_entries_response_nack(from, rejected_tx);
}
return;
}
@ -1109,7 +1110,7 @@ namespace aft
assert(state->retirement_committable_idx.has_value());
if (r.idx > state->retirement_committable_idx)
{
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
send_append_entries_response_nack(from);
return;
}
}
@ -1155,26 +1156,6 @@ namespace aft
r.idx,
r.prev_idx);
if (is_new_follower)
{
if (state->last_idx > r.prev_idx)
{
RAFT_DEBUG_FMT(
"New follower received first append entries with mismatch - "
"rolling back from {} to {}",
state->last_idx,
r.prev_idx);
auto rollback_level = r.prev_idx;
rollback(rollback_level);
}
else
{
RAFT_DEBUG_FMT(
"New follower has no conflict with prev_idx {}", r.prev_idx);
}
is_new_follower = false;
}
std::vector<
std::tuple<std::unique_ptr<kv::AbstractExecutionWrapper>, kv::Version>>
append_entries;
@ -1183,10 +1164,62 @@ namespace aft
{
if (i <= state->last_idx)
{
// If the current entry has already been deserialised, skip the
// payload for that entry
ledger->skip_entry(data, size);
continue;
// NB: This is only safe as long as AppendEntries only contain a
// single term. If they cover multiple terms, then we would need to at
// least partially deserialise each entry to establish what term it is
// in (or report the terms in the header)
static_assert(
max_terms_per_append_entries == 1,
"AppendEntries rollback logic assumes single term");
const auto incoming_term = r.term_of_idx;
const auto local_term = state->view_history.view_at(i);
if (incoming_term != local_term)
{
if (is_new_follower)
{
auto rollback_level = i - 1;
RAFT_DEBUG_FMT(
"New follower received AppendEntries with conflict. Incoming "
"entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
incoming_term,
i,
local_term,
i,
rollback_level);
LOG_ROLLBACK_INFO_FMT(
"Dropping conflicting branch. Rolling back {} entries, "
"beginning with {}.{}.",
state->last_idx - rollback_level,
local_term,
i);
rollback(rollback_level);
is_new_follower = false;
// Then continue to process this AE as normal
}
else
{
// We have a node retaining a conflicting suffix, and refusing to
// roll it back. It will remain divergent (not contributing to
// commit) this term, and can only be brought in-sync in a future
// term.
// This log is emitted as a canary, for what we hope is an
// unreachable branch. If it is ever seen we should revisit this.
LOG_ROLLBACK_INFO_FMT(
"Ignoring conflicting AppendEntries. Retaining {} entries, "
"beginning with {}.{}.",
state->last_idx - (i - 1),
local_term,
i);
return;
}
}
else
{
// If the current entry has already been deserialised, skip the
// payload for that entry
ledger->skip_entry(data, size);
continue;
}
}
std::vector<uint8_t> entry;
@ -1202,7 +1235,7 @@ namespace aft
state->node_id,
from,
e.what());
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
send_append_entries_response_nack(from);
return;
}
@ -1215,9 +1248,10 @@ namespace aft
"deserialised",
state->node_id,
from);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
send_append_entries_response_nack(from);
return;
}
append_entries.push_back(std::make_tuple(std::move(ds), i));
}
@ -1251,7 +1285,7 @@ namespace aft
if (apply_success == kv::ApplyResult::FAIL)
{
ledger->truncate(i - 1);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
send_append_entries_response_nack(from);
return;
}
state->last_idx = i;
@ -1280,7 +1314,7 @@ namespace aft
RAFT_FAIL_FMT("Follower failed to apply log entry: {}", i);
state->last_idx--;
ledger->truncate(state->last_idx);
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
send_append_entries_response_nack(from);
break;
}
@ -1315,6 +1349,7 @@ namespace aft
"term");
state->view_history.update(r.prev_idx + 1, ds->get_term());
}
commit_if_possible(r.leader_commit_idx);
}
break;
@ -1369,29 +1404,51 @@ namespace aft
}
}
send_append_entries_response(from, AppendEntriesResponseType::OK);
send_append_entries_response_ack(from, r);
}
void send_append_entries_response_ack(
ccf::NodeId to, const AppendEntries& ae)
{
// If we get to here, we have applied up to r.idx in this AppendEntries.
// We must only ACK this far, as we know nothing about the agreement of a
// suffix we may still hold _after_ r.idx with the leader's log
const auto response_idx = ae.idx;
send_append_entries_response(
to, AppendEntriesResponseType::OK, state->current_view, response_idx);
}
void send_append_entries_response_nack(
ccf::NodeId to, const ccf::TxID& rejected)
{
const auto response_idx = find_highest_possible_match(rejected);
const auto response_term = get_term_internal(response_idx);
send_append_entries_response(
to, AppendEntriesResponseType::FAIL, response_term, response_idx);
}
void send_append_entries_response_nack(ccf::NodeId to)
{
send_append_entries_response(
to,
AppendEntriesResponseType::FAIL,
state->current_view,
state->last_idx);
}
void send_append_entries_response(
ccf::NodeId to,
AppendEntriesResponseType answer,
const std::optional<ccf::TxID>& rejected = std::nullopt)
aft::Term response_term,
aft::Index response_idx)
{
aft::Index response_idx = state->last_idx;
aft::Term response_term = state->current_view;
if (answer == AppendEntriesResponseType::FAIL && rejected.has_value())
{
response_idx = find_highest_possible_match(rejected.value());
response_term = get_term_internal(response_idx);
}
RAFT_DEBUG_FMT(
"Send append entries response from {} to {} for index {}: {}",
state->node_id,
to,
response_idx,
answer);
(answer == AppendEntriesResponseType::OK ? "ACK" : "NACK"));
AppendEntriesResponse response{
.term = response_term,
@ -1516,20 +1573,10 @@ namespace aft
}
else
{
// Potentially unnecessary safety check - use min with last_idx, to
// prevent matches past this node's local knowledge
const auto proposed_match = std::min(r.last_log_idx, state->last_idx);
if (proposed_match < node->second.match_idx)
{
RAFT_FAIL_FMT(
"Append entries response to {} from {} attempting to move "
"match_idx backwards ({} -> {})",
state->node_id,
from,
node->second.match_idx,
proposed_match);
}
node->second.match_idx = proposed_match;
// max(...) because why would we ever want to go backwards on a success
// response?!
node->second.match_idx =
std::max(node->second.match_idx, r.last_log_idx);
}
RAFT_DEBUG_FMT(

Просмотреть файл

@ -191,6 +191,10 @@ int main(int argc, char** argv)
skip_invariants = true;
driver->summarise_logs_all();
break;
case shash("summarise_messages"):
assert(items.size() == 3);
driver->summarise_messages(items[1], items[2]);
break;
case shash("shuffle_one"):
assert(items.size() == 2);
driver->shuffle_messages_one(items[1]);

Просмотреть файл

@ -572,6 +572,75 @@ public:
}
}
std::string get_message_summary(const std::vector<uint8_t>& contents)
{
const uint8_t* data = contents.data();
size_t size = contents.size();
const auto msg_type = serialized::peek<aft::RaftMsgType>(data, size);
switch (msg_type)
{
case (aft::RaftMsgType::raft_request_vote):
{
return "RV";
}
case (aft::RaftMsgType::raft_request_vote_response):
{
return "RVR";
}
case (aft::RaftMsgType::raft_append_entries):
{
auto ae = *(aft::AppendEntries*)data;
return fmt::format(
"AE(t{}, ({}.{}, {}.{}])",
ae.term,
ae.prev_term,
ae.prev_idx,
ae.term_of_idx,
ae.idx);
}
case (aft::RaftMsgType::raft_append_entries_response):
{
auto aer = *(aft::AppendEntriesResponse*)data;
return fmt::format(
"AER({}, t{}, i{})",
aer.success == aft::AppendEntriesResponseType::OK ? "ACK" : "NACK",
aer.term,
aer.last_log_idx);
}
case (aft::RaftMsgType::raft_propose_request_vote):
{
return "PRV";
}
default:
{
throw std::runtime_error(
fmt::format("Unhandled RaftMsgType: {}", msg_type));
}
}
}
void summarise_messages(ccf::NodeId src, ccf::NodeId dst)
{
auto raft = _nodes.at(src).raft;
auto& messages = channel_stub_proxy(*raft)->messages;
std::vector<std::string> message_reps;
for (const auto& [target, raw_msg] : messages)
{
if (target == dst)
{
message_reps.push_back(get_message_summary(raw_msg));
}
}
RAFT_DRIVER_PRINT(
"Note right of {}: {} message(s) to {} = [{}]",
src,
message_reps.size(),
dst,
fmt::join(message_reps, ", "));
}
void state_one(ccf::NodeId node_id)
{
auto raft = _nodes.at(node_id).raft;

Просмотреть файл

@ -0,0 +1,128 @@
# Start a 5-node network
start_node,0
assert_detail,0,leadership_state,Leader
emit_signature,2
assert_detail,0,leadership_state,Leader
assert_commit_idx,0,2
trust_nodes,2,1,2,3,4
emit_signature,2
dispatch_all
periodic_all,10
dispatch_all
assert_commit_idx,0,4
periodic_all,10
dispatch_all
assert_state_sync
state_all
# Node 0 produces an unshared suffix
emit_signature,2
emit_signature,2
emit_signature,2
# Node 1 calls and wins an election
periodic_one,1,100
assert_detail,1,leadership_state,Candidate
dispatch_single,1,2
dispatch_single,2,1
dispatch_single,1,3
dispatch_single,3,1
assert_detail,1,leadership_state,Leader
# Node 1 produces a conflicting suffix
emit_signature,3
emit_signature,3
emit_signature,3
# Node 1 tries to share this
periodic_one,1,10
drop_pending,1 # First forget prior messages
periodic_one,1,10 # This is an empty heartbeat
dispatch_single,1,0
# The 0->1 channel now contains a NACK, which we take great care to retain for a while!
summarise_messages,0,1
# Node 1 shares their suffix successfully with node 2
dispatch_single,1,2
dispatch_single,2,1
periodic_one,1,10
dispatch_single,1,2
# Node 2 calls and wins an election
periodic_one,2,100
assert_detail,2,leadership_state,Candidate
dispatch_single,2,3
dispatch_single,3,2
dispatch_single,2,4
dispatch_single,4,2
assert_detail,1,leadership_state,Leader
# Node 2 updates node 0 with the suffix produced by node 1
drop_pending,2
periodic_one,2,10
dispatch_one,2
dispatch_single,0,2
periodic_one,2,10
dispatch_one,2
dispatch_single,0,2
periodic_one,2,10
dispatch_one,2
dispatch_single,0,2
# Node 2 extends this suffix, shares it with node 0
emit_signature,4
emit_signature,4
periodic_one,2,10
dispatch_single,2,0
# Node 1 calls and wins an election
drop_pending,1
periodic_one,1,100
assert_detail,1,leadership_state,Candidate
dispatch_single,1,3
dispatch_single,3,1
dispatch_single,1,4
dispatch_single,4,1
assert_detail,1,leadership_state,Leader
drop_pending,1
# Node 1 creates an alternative branch from 2's recent suffix
emit_signature,5
emit_signature,5
# Node 1 shares this with 3, so that it is only needs 1 more ACK to advance commit
periodic_one,1,10
dispatch_single,1,3
dispatch_single,3,1
periodic_one,1,10
dispatch_single,1,3
dispatch_single,1,3
dispatch_single,3,1
dispatch_single,3,1
# Node 1 finally receives that stale NACK from 0
drop_pending,1
state_all
summarise_messages,0,1
dispatch_single,0,1
summarise_messages,0,1
# Node 1 now has an underestimate of its match with 0, so will produce some redundant (matching) AEs
periodic_one,1,10
# The lead AE in the 1->0 should now be a NOP, containing entirely entries that 0 already has
dispatch_single,1,0
# What did 0 produce as the response, and how does node 1 handle that?
dispatch_single,0,1
summarise_logs_all
state_all
assert_commit_safety,1

Просмотреть файл

@ -0,0 +1,50 @@
# Start a 3-node network
start_node,0
assert_detail,0,leadership_state,Leader
emit_signature,2
assert_detail,0,leadership_state,Leader
assert_commit_idx,0,2
trust_nodes,2,1,2
emit_signature,2
dispatch_all
periodic_all,10
dispatch_all
assert_commit_idx,0,4
periodic_all,10
dispatch_all
assert_state_sync
state_all
# Original primary node 0 produces a small suffix, doesn't get to share it
emit_signature,2
emit_signature,2
# Node 1 calls and wins an election
periodic_one,1,100
assert_detail,1,leadership_state,Candidate
dispatch_one,1
dispatch_one,2
assert_detail,1,leadership_state,Leader
# Node 1 produces multiple heartbeat AEs before emitting a signature!
periodic_one,1,10
periodic_one,1,10
emit_signature,3
# With a few iterations node 1 should be able to bring all nodes in-sync
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
assert_state_sync

Просмотреть файл

@ -0,0 +1,122 @@
# Set up 5 node network
start_node,0
assert_detail,0,leadership_state,Leader
emit_signature,2
assert_detail,0,leadership_state,Leader
assert_commit_idx,0,2
trust_nodes,2,1,2,3,4
emit_signature,2
dispatch_all
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
assert_commit_idx,0,4
assert_state_sync
# All nodes in agreement
# Nodes 3 and 4 are partitioned, so the remaining 3 need unanimity for commit
disconnect_node,3
disconnect_node,4
# Node 0 produces a suffix
replicate,2,hello
emit_signature,2
replicate,2,world
emit_signature,2
# Node 0 shares this suffix with 1
periodic_one,0,10
drop_pending_to,0,2
dispatch_one,0
# Node 1 calls and wins an election
periodic_one,1,100
assert_detail,1,leadership_state,Candidate
dispatch_one,1
dispatch_one,0
dispatch_one,2
assert_detail,1,leadership_state,Leader
# Node 1 sends a late heartbeat, producing a NACK
drop_pending_to,1,0
drop_pending_to,2,1 # Ensure nothing else is in the message queue
dispatch_one,1
# NB: The 2->1 channel now contains a NACK at 2.4.
# We need to keep this around, and don't touch this channel for a while
# Node 0 calls and wins an election
periodic_one,0,100
assert_detail,0,leadership_state,Candidate
dispatch_one,0
dispatch_one,1
dispatch_single,2,0
assert_detail,0,leadership_state,Leader
# Node 0 emits a fresh signature, so that they can advance commit
emit_signature,4
# Node 0 sends heartbeats
periodic_one,0,10
dispatch_one,0
# Node 0 receives responses, setting initial next_index
dispatch_one,1
dispatch_single,2,0 # NB: dispatch_single, to avoid touching the NACK for 1
dispatch_single,2,0
# Node 0 sends useful AEs
periodic_one,0,10
dispatch_one,0
# Node 0 receives response, advancing commit
dispatch_one,1
dispatch_single,2,0
dispatch_single,2,0
assert_commit_idx,0,9
# Note that the peers have not yet advanced commit
assert_commit_idx,1,4
assert_commit_idx,2,4
assert_commit_safety,0
# Node 1 calls and wins an election
# To win, it now needs a vote from one of the previous dead nodes (we can't touch the 2->1 channel!)
reconnect_node,3
periodic_one,1,100
assert_detail,1,leadership_state,Candidate
dispatch_one,1
dispatch_one,0
dispatch_one,3
assert_detail,1,leadership_state,Leader
# Don't need this heartbeat AE, drop it
drop_pending_to,1,2
# Now we deliver a stale NACK
dispatch_single,2,1
# 1 may now believe 2's log is too short!
# Node 1 produces AEs
periodic_one,1,10
state_all
assert_commit_safety,0 # Sanity check, we're fine here
# This AE reaches 2
dispatch_single,1,2
# 2 had better not rollback and break commit safety!
state_all
assert_commit_safety,0

Просмотреть файл

@ -981,7 +981,6 @@ AppendEntriesAlreadyDone(i, j, index, m) ==
IF membershipState = RetirementSigned /\ commitIndex' > RetirementIndex(i)
THEN RetirementCompleted
ELSE @]
/\ isNewFollower' = [isNewFollower EXCEPT ![i] = FALSE]
/\ Reply([type |-> AppendEntriesResponse,
term |-> currentTerm[i],
success |-> TRUE,
@ -989,12 +988,15 @@ AppendEntriesAlreadyDone(i, j, index, m) ==
source |-> i,
dest |-> j],
m)
/\ UNCHANGED <<removedFromConfiguration, currentTerm, leadershipState, votedFor, log, candidateVars, leaderVars>>
/\ UNCHANGED <<removedFromConfiguration, currentTerm, leadershipState, votedFor, isNewFollower, log, candidateVars, leaderVars>>
\* Follower i receives an AppendEntries request m where it needs to roll back first
\* This action rolls back the log and leaves m in messages for further processing
ConflictAppendEntriesRequest(i, index, m) ==
/\ Len(log[i]) >= index
/\ m.entries /= << >>
/\ \E idx \in 1..Len(m.entries) :
/\ (index + (idx - 1)) \in DOMAIN log[i]
/\ log[i][index + (idx - 1)].term # m.entries[idx].term
/\ isNewFollower[i] = TRUE
/\ LET new_log == [index2 \in 1..m.prevLogIndex |-> log[i][index2]] \* Truncate log
IN /\ log' = [log EXCEPT ![i] = new_log]
@ -1038,7 +1040,6 @@ NoConflictAppendEntriesRequest(i, j, m) ==
ELSE UNCHANGED leadershipState
\* Recalculate membership state based on log' and commitIndex'
/\ membershipState' = [membershipState EXCEPT ![i] = CalcMembershipState(log'[i], commitIndex'[i], i)]
/\ isNewFollower' = [isNewFollower EXCEPT ![i] = FALSE]
/\ Reply([type |-> AppendEntriesResponse,
term |-> currentTerm[i],
success |-> TRUE,
@ -1046,7 +1047,7 @@ NoConflictAppendEntriesRequest(i, j, m) ==
source |-> i,
dest |-> j],
m)
/\ UNCHANGED <<removedFromConfiguration, currentTerm, votedFor, candidateVars, leaderVars>>
/\ UNCHANGED <<removedFromConfiguration, currentTerm, votedFor, isNewFollower, candidateVars, leaderVars>>
AcceptAppendEntriesRequest(i, j, logOk, m) ==
\* accept request