RaftDriver: Log whenever messages are dropped, to simplify trace validation (#6021)

This commit is contained in:
Eddy Ashton 2024-02-19 09:27:15 +00:00 коммит произвёл GitHub
Родитель eab71a3bc4
Коммит 2092338dd6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
5 изменённых файлов: 72 добавлений и 42 удалений

Просмотреть файл

@ -1473,16 +1473,6 @@ namespace aft
const ccf::NodeId& from, AppendEntriesResponse r)
{
std::lock_guard<ccf::pal::Mutex> guard(state->lock);
// Ignore if we're not the leader.
if (state->leadership_state != kv::LeadershipState::Leader)
{
RAFT_FAIL_FMT(
"Recv append entries response to {} from {}: no longer leader",
state->node_id,
from);
return;
}
auto node = all_other_nodes.find(from);
if (node == all_other_nodes.end())
@ -1506,6 +1496,16 @@ namespace aft
RAFT_TRACE_JSON_OUT(j);
#endif
// Ignore if we're not the leader.
if (state->leadership_state != kv::LeadershipState::Leader)
{
RAFT_FAIL_FMT(
"Recv append entries response to {} from {}: no longer leader",
state->node_id,
from);
return;
}
using namespace std::chrono_literals;
node->second.last_ack_timeout = 0ms;
@ -2432,6 +2432,11 @@ namespace aft
}
}
nlohmann::json get_state_representation() const
{
return *state;
}
private:
void create_and_remove_node_state()
{

Просмотреть файл

@ -478,36 +478,43 @@ public:
const uint8_t* data = contents.data();
size_t size = contents.size();
nlohmann::json packet;
const auto msg_type = serialized::peek<aft::RaftMsgType>(data, size);
switch (msg_type)
{
case (aft::RaftMsgType::raft_request_vote):
{
auto rv = *(aft::RequestVote*)data;
packet = rv;
log_msg_details(node_id, tgt_node_id, rv, dropped);
break;
}
case (aft::RaftMsgType::raft_request_vote_response):
{
auto rvr = *(aft::RequestVoteResponse*)data;
packet = rvr;
log_msg_details(node_id, tgt_node_id, rvr, dropped);
break;
}
case (aft::RaftMsgType::raft_append_entries):
{
auto ae = *(aft::AppendEntries*)data;
packet = ae;
log_msg_details(node_id, tgt_node_id, ae, dropped);
break;
}
case (aft::RaftMsgType::raft_append_entries_response):
{
auto aer = *(aft::AppendEntriesResponse*)data;
packet = aer;
log_msg_details(node_id, tgt_node_id, aer, dropped);
break;
}
case (aft::RaftMsgType::raft_propose_request_vote):
{
auto prv = *(aft::ProposeRequestVote*)data;
packet = prv;
log_msg_details(node_id, tgt_node_id, prv, dropped);
break;
}
@ -517,6 +524,23 @@ public:
fmt::format("Unhandled RaftMsgType: {}", msg_type));
}
}
#ifdef CCF_RAFT_TRACING
if (dropped)
{
nlohmann::json j = {};
j["function"] = "drop_pending_to";
j["from_node_id"] = node_id;
j["to_node_id"] = tgt_node_id;
// state is used by raft_scenarios_runner.py to identify indicate which
// node a log occurred on. Here we assign all dropped messages to the
// sender.
// Populate additional fields for trace_viz.py
j["state"] = _nodes.at(node_id).raft->get_state_representation();
j["packet"] = packet;
RAFT_TRACE_JSON_OUT(j);
}
#endif
}
void connect(ccf::NodeId first, ccf::NodeId second)
@ -739,6 +763,9 @@ public:
}
}
// Log that this message was dropped
log_msg_details(src, dst, contents, true);
return false;
}

Просмотреть файл

@ -33,6 +33,7 @@ FUNCTIONS = {
"become_follower": "BFol",
"commit": "Cmt",
"bootstrap": "Boot",
"drop_pending_to": "Drop",
None: "",
}

Просмотреть файл

@ -112,6 +112,10 @@ LOCAL OrderDropConsecutiveMessages(server) ==
\E s \in SubSeqs(messages[server]):
messages' = [ messages EXCEPT ![server] = s ]
LOCAL OrderDropMessage(server, Test(_)) ==
\E i \in { idx \in 1..Len(messages[server]) : Test(messages[server][idx]) }:
messages' = [ messages EXCEPT ![server] = RemoveAt(messages[server], i) ]
----------------------------------------------------------------------------------
\* Point-to-Point Ordering and no duplication of messages:
@ -191,4 +195,10 @@ DropMessages(server) ==
[] Guarantee = ReorderedNoDup -> ReorderNoDupDropMessages
[] Guarantee = Reordered -> ReorderDupDropMessages
DropMessage(sender, Test(_)) ==
CASE Guarantee = OrderedNoDup -> FALSE
[] Guarantee = Ordered -> OrderDropMessage(sender, Test)
[] Guarantee = ReorderedNoDup -> FALSE
[] Guarantee = Reordered -> FALSE
==================================================================================

Просмотреть файл

@ -112,34 +112,6 @@ TraceInit ==
logline ==
TraceLog[l]
\* ccfraft assumes ordered point-to-point communication. In other words, we should
\* only receive the first pending message from j at i. However, it is possible
\* that a prefix of the messages sent by j to i have been lost -- the network is
\* unreliable. Thus, we allow to receive any message but drop the prefix up to
\* that message.
\* We could add a Traceccfraft!DropMessage action that non-deterministically drops
\* messages at every step of the system. However, that would lead to massive state
\* space explosion. OTOH, it would have the advantage that we could assert that
\* messages is all empty at the end of the trace. Right now, messages will contain
\* all messages, even those that have been lost by the real system. Another trade
\* off is that the length of the TLA+ trace will be longer than the system trace
\* (due to the extra DropMessage actions).
\* Instead, we could compose a DropMessage action with all receiver actions such
\* as HandleAppendEntriesResponse that allows the receiver's inbox to equal any
\* SubSeq of the receives current inbox (where inbox is messages[receiver]). That
\* way, we can leave the other server's inboxes unchanged (resulting in fewer work for
\* TLC). A trade off of this variant is that we have to non-deterministically pick
\* the next message from the inbox instead of via Network!MessagesTo (which always
\* picks the first message in a server's inbox).
\*
\* Lastly, we can weaken Traceccfraft trace validation and simply ignore lost messages
\* accepting that lost messages remain in messages.
DropMessages ==
/\ l \in 1..Len(TraceLog)
/\ UNCHANGED <<reconfigurationVars, serverVars, candidateVars, leaderVars, logVars>>
/\ UNCHANGED <<l, ts>>
/\ Network!DropMessages(logline.msg.state.node_id)
\* Beware to only prime e.g. inbox in inbox'[rcv] and *not* also rcv, i.e.,
\* inbox[rcv]'. rcv is defined in terms of TLCGet("level") that correctly
\* handles priming, which causes for rcv' to equal rcv of the next log line.
@ -150,6 +122,22 @@ IsEvent(e) ==
/\ l' = l + 1
/\ ts' = logline.h_ts
\* Message loss is known in controlled environments, such as raft (driver) scenarios. However, this assumption
\* does not hold for traces collected from production workloads. In these instances, message loss must be
\* modeled in non-deterministically. For example, by composing message loss to the next-state relation:
\* Network!DropMessages(logline.msg.state.node_id) \cdot TraceNext
\* and
\* Network!DropMessages(logline.msg.state.node_id) \cdot CCF!Next
IsDropPendingTo ==
/\ IsEvent("drop_pending_to")
/\ Network!DropMessage(logline.msg.to_node_id,
LAMBDA msg:
/\ msg.type = RaftMsgType[logline.msg.packet.msg]
/\ msg.dest = logline.msg.to_node_id
/\ msg.source = logline.msg.from_node_id
)
/\ UNCHANGED <<reconfigurationVars, serverVars, candidateVars, leaderVars, logVars>>
IsTimeout ==
/\ IsEvent("become_candidate")
/\ logline.msg.state.leadership_state = "Candidate"
@ -451,11 +439,10 @@ TraceNext ==
\/ IsRcvProposeVoteRequest
DropAndNext ==
IF ENABLED TraceNext THEN TraceNext ELSE DropMessages \cdot TraceNext
\/ IsDropPendingTo
TraceSpec ==
TraceInit /\ [][DropAndNext]_<<l, ts, vars>>
TraceInit /\ [][TraceNext]_<<l, ts, vars>>
-------------------------------------------------------------------------------------
@ -620,6 +607,6 @@ ComposedNext ==
CCF == INSTANCE ccfraft
CCFSpec == CCF!Init /\ [][DropMessages \cdot (CCF!Next \/ ComposedNext)]_CCF!vars
CCFSpec == CCF!Init /\ [][CCF!Next \/ ComposedNext \/ IsDropPendingTo]_CCF!vars
==================================================================================