зеркало из https://github.com/microsoft/CCF.git
Structured verbose logging in `raft.h` (#4933)
This commit is contained in:
Родитель
d98332b71e
Коммит
51e4209fee
|
@ -27,6 +27,42 @@
|
|||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#ifdef VERBOSE_RAFT_LOGGING
|
||||
# define RAFT_TRACE_FMT(s, ...) \
|
||||
CCF_LOG_FMT(TRACE, "raft") \
|
||||
("{} | {} | {} | " s, \
|
||||
state->my_node_id, \
|
||||
leadership_state_string(), \
|
||||
membership_state, \
|
||||
##__VA_ARGS__)
|
||||
# define RAFT_DEBUG_FMT(s, ...) \
|
||||
CCF_LOG_FMT(DEBUG, "raft") \
|
||||
("{} | {} | {} | " s, \
|
||||
state->my_node_id, \
|
||||
leadership_state_string(), \
|
||||
membership_state, \
|
||||
##__VA_ARGS__)
|
||||
# define RAFT_INFO_FMT(s, ...) \
|
||||
CCF_LOG_FMT(INFO, "raft") \
|
||||
("{} | {} | {} | " s, \
|
||||
state->my_node_id, \
|
||||
leadership_state_string(), \
|
||||
membership_state, \
|
||||
##__VA_ARGS__)
|
||||
# define RAFT_FAIL_FMT(s, ...) \
|
||||
CCF_LOG_FMT(FAIL, "raft") \
|
||||
("{} | {} | {} | " s, \
|
||||
state->my_node_id, \
|
||||
leadership_state_string(), \
|
||||
membership_state, \
|
||||
##__VA_ARGS__)
|
||||
#else
|
||||
# define RAFT_TRACE_FMT LOG_TRACE_FMT
|
||||
# define RAFT_DEBUG_FMT LOG_DEBUG_FMT
|
||||
# define RAFT_INFO_FMT LOG_INFO_FMT
|
||||
# define RAFT_FAIL_FMT LOG_FAIL_FMT
|
||||
#endif
|
||||
|
||||
namespace aft
|
||||
{
|
||||
using Configuration = kv::Configuration;
|
||||
|
@ -214,7 +250,7 @@ namespace aft
|
|||
channels(channels_)
|
||||
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"reconfiguration type: {}",
|
||||
reconfiguration_type == ONE_TRANSACTION ? "1tx" : "2tx");
|
||||
|
||||
|
@ -248,7 +284,7 @@ namespace aft
|
|||
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
|
||||
if (consensus_type == ConsensusType::BFT)
|
||||
{
|
||||
LOG_FAIL_FMT("Unsupported");
|
||||
RAFT_FAIL_FMT("Unsupported");
|
||||
return false;
|
||||
}
|
||||
else
|
||||
|
@ -458,7 +494,7 @@ namespace aft
|
|||
const std::unordered_set<ccf::NodeId>& new_learner_nodes = {},
|
||||
const std::unordered_set<ccf::NodeId>& new_retired_nodes = {}) override
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: add new configuration at {}: {{{}}}", idx, conf);
|
||||
|
||||
if (reconfiguration_type == ReconfigurationType::ONE_TRANSACTION)
|
||||
|
@ -494,7 +530,7 @@ namespace aft
|
|||
|
||||
if (!new_learner_nodes.empty())
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: new learners: {{{}}}",
|
||||
fmt::join(new_learner_nodes, ", "));
|
||||
for (auto& id : new_learner_nodes)
|
||||
|
@ -508,7 +544,7 @@ namespace aft
|
|||
|
||||
if (!new_retired_nodes.empty())
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: newly retired nodes: {{{}}}",
|
||||
fmt::join(new_retired_nodes, ", "));
|
||||
for (auto& id : new_retired_nodes)
|
||||
|
@ -534,7 +570,7 @@ namespace aft
|
|||
}
|
||||
if (is_learner() && nid == state->my_node_id)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: observing own promotion, becoming an active "
|
||||
"follower");
|
||||
leadership_state = kv::LeadershipState::Follower;
|
||||
|
@ -575,7 +611,7 @@ namespace aft
|
|||
ticking = true;
|
||||
using namespace std::chrono_literals;
|
||||
timeout_elapsed = 0ms;
|
||||
LOG_INFO_FMT("Election timer has become active");
|
||||
RAFT_INFO_FMT("Election timer has become active");
|
||||
}
|
||||
|
||||
void add_resharing_result(
|
||||
|
@ -626,7 +662,7 @@ namespace aft
|
|||
{
|
||||
std::lock_guard<ccf::pal::Mutex> guard(state->lock);
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: ORC for configuration #{} from {}", rid, node_id);
|
||||
|
||||
const auto oit = orc_sets.find(rid);
|
||||
|
@ -643,13 +679,14 @@ namespace aft
|
|||
const auto& ncnodes = conf.nodes;
|
||||
if (ncnodes.find(node_id) == ncnodes.end())
|
||||
{
|
||||
LOG_DEBUG_FMT("Node not in the configuration {}: {}", rid, node_id);
|
||||
RAFT_DEBUG_FMT(
|
||||
"Node not in the configuration {}: {}", rid, node_id);
|
||||
return std::nullopt;
|
||||
}
|
||||
else
|
||||
{
|
||||
oit->second.insert(node_id);
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: have {} ORCs out of {} for configuration #{}",
|
||||
oit->second.size(),
|
||||
ncnodes.size(),
|
||||
|
@ -728,7 +765,7 @@ namespace aft
|
|||
|
||||
if (leadership_state != kv::LeadershipState::Leader)
|
||||
{
|
||||
LOG_FAIL_FMT(
|
||||
RAFT_FAIL_FMT(
|
||||
"Failed to replicate {} items: not leader", entries.size());
|
||||
rollback(state->last_idx);
|
||||
return false;
|
||||
|
@ -736,7 +773,7 @@ namespace aft
|
|||
|
||||
if (term != state->current_view)
|
||||
{
|
||||
LOG_FAIL_FMT(
|
||||
RAFT_FAIL_FMT(
|
||||
"Failed to replicate {} items at term {}, current term is {}",
|
||||
entries.size(),
|
||||
term,
|
||||
|
@ -744,7 +781,7 @@ namespace aft
|
|||
return false;
|
||||
}
|
||||
|
||||
LOG_DEBUG_FMT("Replicating {} entries", entries.size());
|
||||
RAFT_DEBUG_FMT("Replicating {} entries", entries.size());
|
||||
|
||||
for (auto& [index, data, is_globally_committable, hooks] : entries)
|
||||
{
|
||||
|
@ -763,7 +800,7 @@ namespace aft
|
|||
return false;
|
||||
}
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Replicated on leader {}: {}{} ({} hooks)",
|
||||
state->my_node_id,
|
||||
index,
|
||||
|
@ -777,7 +814,7 @@ namespace aft
|
|||
|
||||
if (globally_committable)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"membership: {} leadership: {}",
|
||||
membership_state,
|
||||
leadership_state_string());
|
||||
|
@ -809,7 +846,7 @@ namespace aft
|
|||
entry_size_not_limited = 0;
|
||||
for (const auto& it : all_other_nodes)
|
||||
{
|
||||
LOG_DEBUG_FMT("Sending updates to follower {}", it.first);
|
||||
RAFT_DEBUG_FMT("Sending updates to follower {}", it.first);
|
||||
send_append_entries(it.first, it.second.sent_idx + 1);
|
||||
}
|
||||
}
|
||||
|
@ -870,13 +907,13 @@ namespace aft
|
|||
|
||||
default:
|
||||
{
|
||||
LOG_FAIL_FMT("Unhandled AFT message type: {}", type);
|
||||
RAFT_FAIL_FMT("Unhandled AFT message type: {}", type);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (const ccf::NodeToNode::DroppedMessageException& e)
|
||||
{
|
||||
LOG_INFO_FMT("Dropped invalid message from {}", e.from);
|
||||
RAFT_INFO_FMT("Dropped invalid message from {}", e.from);
|
||||
return;
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
|
@ -925,7 +962,7 @@ namespace aft
|
|||
}
|
||||
if (search->second.last_ack_timeout >= election_timeout)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"No ack received from {} in last {}",
|
||||
node.first,
|
||||
election_timeout);
|
||||
|
@ -948,7 +985,7 @@ namespace aft
|
|||
// active configuration in which it has heard back from a majority of
|
||||
// backups within an election timeout.
|
||||
// Also see CheckQuorum action in tla/ccfraft.tla.
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Stepping down as leader {}: No ack received from a majority of "
|
||||
"backups in last {}",
|
||||
state->my_node_id,
|
||||
|
@ -992,7 +1029,7 @@ namespace aft
|
|||
term_of_probe = state->view_history.view_at(probe_index);
|
||||
}
|
||||
|
||||
LOG_TRACE_FMT(
|
||||
RAFT_TRACE_FMT(
|
||||
"Looking for match with {}.{}, from {}.{}, best answer is {}",
|
||||
tx_id.view,
|
||||
tx_id.seqno,
|
||||
|
@ -1040,14 +1077,14 @@ namespace aft
|
|||
}
|
||||
else
|
||||
{
|
||||
LOG_FAIL_FMT("Unsupported consensus type");
|
||||
RAFT_FAIL_FMT("Unsupported consensus type");
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
void send_append_entries(const ccf::NodeId& to, Index start_idx)
|
||||
{
|
||||
LOG_TRACE_FMT(
|
||||
RAFT_TRACE_FMT(
|
||||
"Sending append entries to node {} in batches of {}, covering the "
|
||||
"range {} -> {}",
|
||||
to,
|
||||
|
@ -1077,7 +1114,7 @@ namespace aft
|
|||
do
|
||||
{
|
||||
end_idx = calculate_end_index(start_idx);
|
||||
LOG_TRACE_FMT("Sending sub range {} -> {}", start_idx, end_idx);
|
||||
RAFT_TRACE_FMT("Sending sub range {} -> {}", start_idx, end_idx);
|
||||
send_append_entries_range(to, start_idx, end_idx);
|
||||
start_idx = std::min(end_idx + 1, state->last_idx);
|
||||
} while (end_idx != state->last_idx);
|
||||
|
@ -1101,7 +1138,7 @@ namespace aft
|
|||
const bool contains_new_view =
|
||||
(state->new_view_idx > prev_idx) && (state->new_view_idx <= end_idx);
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Send append entries from {} to {}: ({}.{}, {}.{}] ({})",
|
||||
state->my_node_id,
|
||||
to,
|
||||
|
@ -1142,7 +1179,7 @@ namespace aft
|
|||
{
|
||||
std::unique_lock<ccf::pal::Mutex> guard(state->lock);
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Received append entries: {}.{} to {}.{} (from {} in term {})",
|
||||
r.prev_term,
|
||||
r.prev_idx,
|
||||
|
@ -1170,7 +1207,7 @@ namespace aft
|
|||
else if (state->current_view > r.term)
|
||||
{
|
||||
// Reply false, since our term is later than the received term.
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Recv append entries to {} from {} but our term is later ({} > {})",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1184,14 +1221,14 @@ namespace aft
|
|||
const auto prev_term = get_term_internal(r.prev_idx);
|
||||
if (prev_term != r.prev_term)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Previous term for {} should be {}", r.prev_idx, prev_term);
|
||||
|
||||
// Reply false if the log doesn't contain an entry at r.prev_idx
|
||||
// whose term is r.prev_term.
|
||||
if (prev_term == 0)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries to {} from {} but our log does not yet "
|
||||
"contain index {}",
|
||||
state->my_node_id,
|
||||
|
@ -1201,7 +1238,7 @@ namespace aft
|
|||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries to {} from {} but our log at {} has the wrong "
|
||||
"previous term (ours: {}, theirs: {})",
|
||||
state->my_node_id,
|
||||
|
@ -1233,7 +1270,7 @@ namespace aft
|
|||
if (!leader_id.has_value() || leader_id.value() != from)
|
||||
{
|
||||
leader_id = from;
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Node {} thinks leader is {}", state->my_node_id, leader_id.value());
|
||||
}
|
||||
|
||||
|
@ -1242,7 +1279,7 @@ namespace aft
|
|||
if (
|
||||
consensus_type == ConsensusType::CFT && r.prev_idx < state->commit_idx)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
|
||||
"({})",
|
||||
state->my_node_id,
|
||||
|
@ -1253,7 +1290,7 @@ namespace aft
|
|||
}
|
||||
else if (r.prev_idx > state->last_idx)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries to {} from {} but prev_idx ({}) > last_idx ({})",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1262,7 +1299,7 @@ namespace aft
|
|||
return;
|
||||
}
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries to {} from {} for index {} and previous index {}",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1273,7 +1310,7 @@ namespace aft
|
|||
{
|
||||
if (state->last_idx > r.prev_idx)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"New follower received first append entries with mismatch - "
|
||||
"rolling back from {} to {}",
|
||||
state->last_idx,
|
||||
|
@ -1283,7 +1320,7 @@ namespace aft
|
|||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"New follower has no conflict with prev_idx {}", r.prev_idx);
|
||||
}
|
||||
is_new_follower = false;
|
||||
|
@ -1311,7 +1348,7 @@ namespace aft
|
|||
catch (const std::logic_error& e)
|
||||
{
|
||||
// This should only fail if there is malformed data.
|
||||
LOG_FAIL_FMT(
|
||||
RAFT_FAIL_FMT(
|
||||
"Recv append entries to {} from {} but the data is malformed: {}",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1324,7 +1361,7 @@ namespace aft
|
|||
auto ds = store->apply(entry, consensus_type, public_only, expected);
|
||||
if (ds == nullptr)
|
||||
{
|
||||
LOG_FAIL_FMT(
|
||||
RAFT_FAIL_FMT(
|
||||
"Recv append entries to {} from {} but the entry could not be "
|
||||
"deserialised",
|
||||
state->my_node_id,
|
||||
|
@ -1349,7 +1386,7 @@ namespace aft
|
|||
for (auto& ae : append_entries)
|
||||
{
|
||||
auto& [ds, i] = ae;
|
||||
LOG_DEBUG_FMT("Replicating on follower {}: {}", state->my_node_id, i);
|
||||
RAFT_DEBUG_FMT("Replicating on follower {}: {}", state->my_node_id, i);
|
||||
|
||||
bool track_deletes_on_missing_keys = false;
|
||||
kv::ApplyResult apply_success =
|
||||
|
@ -1383,7 +1420,7 @@ namespace aft
|
|||
{
|
||||
case kv::ApplyResult::FAIL:
|
||||
{
|
||||
LOG_FAIL_FMT("Follower failed to apply log entry: {}", i);
|
||||
RAFT_FAIL_FMT("Follower failed to apply log entry: {}", i);
|
||||
state->last_idx--;
|
||||
ledger->truncate(state->last_idx);
|
||||
send_append_entries_response(from, AppendEntriesResponseType::FAIL);
|
||||
|
@ -1392,7 +1429,7 @@ namespace aft
|
|||
|
||||
case kv::ApplyResult::PASS_SIGNATURE:
|
||||
{
|
||||
LOG_DEBUG_FMT("Deserialising signature at {}", i);
|
||||
RAFT_DEBUG_FMT("Deserialising signature at {}", i);
|
||||
if (
|
||||
membership_state == kv::MembershipState::Retired &&
|
||||
retirement_phase == kv::RetirementPhase::Ordered)
|
||||
|
@ -1499,7 +1536,7 @@ namespace aft
|
|||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Send append entries response from {} to {} for index {}: {}",
|
||||
state->my_node_id,
|
||||
to,
|
||||
|
@ -1521,7 +1558,7 @@ namespace aft
|
|||
|
||||
if (leadership_state != kv::LeadershipState::Leader)
|
||||
{
|
||||
LOG_FAIL_FMT(
|
||||
RAFT_FAIL_FMT(
|
||||
"Recv append entries response to {} from {}: no longer leader",
|
||||
state->my_node_id,
|
||||
from);
|
||||
|
@ -1532,7 +1569,7 @@ namespace aft
|
|||
if (node == all_other_nodes.end())
|
||||
{
|
||||
// Ignore if we don't recognise the node.
|
||||
LOG_FAIL_FMT(
|
||||
RAFT_FAIL_FMT(
|
||||
"Recv append entries response to {} from {}: unknown node",
|
||||
state->my_node_id,
|
||||
from);
|
||||
|
@ -1545,7 +1582,7 @@ namespace aft
|
|||
if (state->current_view < r.term)
|
||||
{
|
||||
// We are behind, update our state.
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries response to {} from {}: more recent term ({} "
|
||||
"> {})",
|
||||
state->my_node_id,
|
||||
|
@ -1564,7 +1601,7 @@ namespace aft
|
|||
// be older in this case.
|
||||
if (r.success == AppendEntriesResponseType::OK)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries response to {} from {}: stale term ({} != {})",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1582,7 +1619,7 @@ namespace aft
|
|||
// after an election.
|
||||
if (r.success == AppendEntriesResponseType::OK)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries response to {} from {}: stale idx",
|
||||
state->my_node_id,
|
||||
from);
|
||||
|
@ -1613,7 +1650,7 @@ namespace aft
|
|||
if (r.success != AppendEntriesResponseType::OK)
|
||||
{
|
||||
// Failed due to log inconsistency. Reset sent_idx, and try again soon.
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries response to {} from {}: failed",
|
||||
state->my_node_id,
|
||||
from);
|
||||
|
@ -1621,7 +1658,7 @@ namespace aft
|
|||
return;
|
||||
}
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv append entries response to {} from {} for index {}: success",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1632,7 +1669,7 @@ namespace aft
|
|||
void send_request_vote(const ccf::NodeId& to)
|
||||
{
|
||||
auto last_committable_idx = last_committable_index();
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Send request vote from {} to {} at {}",
|
||||
state->my_node_id,
|
||||
to,
|
||||
|
@ -1662,7 +1699,7 @@ namespace aft
|
|||
if (state->current_view > r.term)
|
||||
{
|
||||
// Reply false, since our term is later than the received term.
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv request vote to {} from {}: our term is later ({} > {})",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1673,7 +1710,7 @@ namespace aft
|
|||
}
|
||||
else if (state->current_view < r.term)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv request vote to {} from {}: their term is later ({} < {})",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1685,7 +1722,7 @@ namespace aft
|
|||
if (leader_id.has_value())
|
||||
{
|
||||
// Reply false, since we already know the leader in the current term.
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv request vote to {} from {}: leader {} already known in term {}",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1698,7 +1735,7 @@ namespace aft
|
|||
if ((voted_for.has_value()) && (voted_for.value() != from))
|
||||
{
|
||||
// Reply false, since we already voted for someone else.
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Recv request vote to {} from {}: already voted for {}",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1729,7 +1766,7 @@ namespace aft
|
|||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Voting against candidate at {}.{} because local state is at {}.{}",
|
||||
r.term_of_last_committable_idx,
|
||||
r.last_committable_idx,
|
||||
|
@ -1742,7 +1779,7 @@ namespace aft
|
|||
|
||||
void send_request_vote_response(const ccf::NodeId& to, bool answer)
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Send request vote response from {} to {}: {}",
|
||||
state->my_node_id,
|
||||
to,
|
||||
|
@ -1762,7 +1799,7 @@ namespace aft
|
|||
|
||||
if (leadership_state != kv::LeadershipState::Candidate)
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Recv request vote response to {} from: {}: we aren't a candidate",
|
||||
state->my_node_id,
|
||||
from);
|
||||
|
@ -1773,7 +1810,7 @@ namespace aft
|
|||
auto node = all_other_nodes.find(from);
|
||||
if (node == all_other_nodes.end())
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Recv request vote response to {} from {}: unknown node",
|
||||
state->my_node_id,
|
||||
from);
|
||||
|
@ -1782,7 +1819,7 @@ namespace aft
|
|||
|
||||
if (state->current_view < r.term)
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Recv request vote response to {} from {}: their term is more recent "
|
||||
"({} < {})",
|
||||
state->my_node_id,
|
||||
|
@ -1795,7 +1832,7 @@ namespace aft
|
|||
else if (state->current_view != r.term)
|
||||
{
|
||||
// Ignore as it is stale.
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Recv request vote response to {} from {}: stale ({} != {})",
|
||||
state->my_node_id,
|
||||
from,
|
||||
|
@ -1806,14 +1843,14 @@ namespace aft
|
|||
else if (!r.vote_granted)
|
||||
{
|
||||
// Do nothing.
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Recv request vote response to {} from {}: they voted no",
|
||||
state->my_node_id,
|
||||
from);
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Recv request vote response to {} from {}: they voted yes",
|
||||
state->my_node_id,
|
||||
from);
|
||||
|
@ -1852,7 +1889,7 @@ namespace aft
|
|||
restart_election_timeout();
|
||||
reset_last_ack_timeouts();
|
||||
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Becoming candidate {}: {}", state->my_node_id, state->current_view);
|
||||
|
||||
add_vote_for_me(state->my_node_id);
|
||||
|
@ -1886,7 +1923,7 @@ namespace aft
|
|||
// their own signature, which _will_ be considered committable.
|
||||
committable_indices.clear();
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Election index is {} in term {}", election_index, state->current_view);
|
||||
// Discard any un-committable updates we may hold,
|
||||
// since we have no signature for them. Except at startup,
|
||||
|
@ -1910,7 +1947,7 @@ namespace aft
|
|||
|
||||
reset_last_ack_timeouts();
|
||||
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Becoming leader {}: {}", state->my_node_id, state->current_view);
|
||||
|
||||
// Immediately commit if there are no other nodes.
|
||||
|
@ -1962,7 +1999,7 @@ namespace aft
|
|||
membership_state != kv::MembershipState::RetirementInitiated)
|
||||
{
|
||||
leadership_state = kv::LeadershipState::Follower;
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Becoming follower {}: {}.{}",
|
||||
state->my_node_id,
|
||||
state->current_view,
|
||||
|
@ -1975,7 +2012,7 @@ namespace aft
|
|||
// becomes a follower in the new term.
|
||||
void become_aware_of_new_term(Term term)
|
||||
{
|
||||
LOG_DEBUG_FMT("Becoming aware of new term {}", term);
|
||||
RAFT_DEBUG_FMT("Becoming aware of new term {}", term);
|
||||
|
||||
state->current_view = term;
|
||||
voted_for.reset();
|
||||
|
@ -1984,7 +2021,7 @@ namespace aft
|
|||
is_new_follower = true;
|
||||
}
|
||||
|
||||
std::string leadership_state_string()
|
||||
std::string leadership_state_string() const
|
||||
{
|
||||
if (leadership_state.has_value())
|
||||
return fmt::format("{}", leadership_state.value());
|
||||
|
@ -1996,7 +2033,7 @@ namespace aft
|
|||
void become_retiring()
|
||||
{
|
||||
membership_state = kv::MembershipState::RetirementInitiated;
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Becoming retiring {} (while {}): {}",
|
||||
state->my_node_id,
|
||||
leadership_state_string(),
|
||||
|
@ -2005,7 +2042,7 @@ namespace aft
|
|||
|
||||
void become_retired(Index idx, kv::RetirementPhase phase)
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Becoming retired, phase {} (leadership {}): {}: {} at {}",
|
||||
phase,
|
||||
leadership_state_string(),
|
||||
|
@ -2025,7 +2062,7 @@ namespace aft
|
|||
"retirement_idx already set to {}",
|
||||
retirement_idx.value());
|
||||
retirement_idx = idx;
|
||||
LOG_INFO_FMT("Node retiring at {}", idx);
|
||||
RAFT_INFO_FMT("Node retiring at {}", idx);
|
||||
}
|
||||
else if (phase == kv::RetirementPhase::Signed)
|
||||
{
|
||||
|
@ -2036,7 +2073,7 @@ namespace aft
|
|||
idx,
|
||||
retirement_idx.value());
|
||||
retirement_committable_idx = idx;
|
||||
LOG_INFO_FMT("Node retirement committable at {}", idx);
|
||||
RAFT_INFO_FMT("Node retirement committable at {}", idx);
|
||||
}
|
||||
else if (phase == kv::RetirementPhase::Completed)
|
||||
{
|
||||
|
@ -2061,7 +2098,7 @@ namespace aft
|
|||
}
|
||||
|
||||
votes_for_me[conf.idx].votes.insert(from);
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Node {} voted for {} in configuration {} with quorum {}",
|
||||
from,
|
||||
state->my_node_id,
|
||||
|
@ -2124,7 +2161,7 @@ namespace aft
|
|||
new_commit_cft_idx = confirmed;
|
||||
}
|
||||
}
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"In update_commit, new_commit_cft_idx: {}, "
|
||||
"last_idx: {}",
|
||||
new_commit_cft_idx,
|
||||
|
@ -2146,7 +2183,7 @@ namespace aft
|
|||
|
||||
void commit_if_possible(Index idx)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Commit if possible {} (ci: {}) (ti {})",
|
||||
idx,
|
||||
state->commit_idx,
|
||||
|
@ -2173,7 +2210,7 @@ namespace aft
|
|||
}
|
||||
else
|
||||
{
|
||||
LOG_FAIL_FMT("Unsupported consensus type");
|
||||
RAFT_FAIL_FMT("Unsupported consensus type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2206,7 +2243,7 @@ namespace aft
|
|||
to.nodes.find(id) == to.nodes.end() && rit != retired_nodes.end() &&
|
||||
rit->second <= state->commit_idx)
|
||||
{
|
||||
LOG_DEBUG_FMT("Configurations: is retired: {}", id);
|
||||
RAFT_DEBUG_FMT("Configurations: is retired: {}", id);
|
||||
r++;
|
||||
}
|
||||
}
|
||||
|
@ -2221,7 +2258,7 @@ namespace aft
|
|||
{
|
||||
if (to.nodes.find(nid) == to.nodes.end())
|
||||
{
|
||||
LOG_DEBUG_FMT("Configurations: required retirement: {}", nid);
|
||||
RAFT_DEBUG_FMT("Configurations: required retirement: {}", nid);
|
||||
r++;
|
||||
}
|
||||
}
|
||||
|
@ -2249,7 +2286,7 @@ namespace aft
|
|||
"Tried to commit {} but last_idx is {}", idx, state->last_idx));
|
||||
}
|
||||
|
||||
LOG_DEBUG_FMT("Starting commit");
|
||||
RAFT_DEBUG_FMT("Starting commit");
|
||||
|
||||
// This could happen if a follower becomes the leader when it
|
||||
// has committed fewer log entries, although it has them available.
|
||||
|
@ -2265,11 +2302,11 @@ namespace aft
|
|||
become_retired(idx, kv::RetirementPhase::Completed);
|
||||
}
|
||||
|
||||
LOG_DEBUG_FMT("Compacting...");
|
||||
RAFT_DEBUG_FMT("Compacting...");
|
||||
store->compact(idx);
|
||||
ledger->commit(idx);
|
||||
|
||||
LOG_DEBUG_FMT("Commit on {}: {}", state->my_node_id, idx);
|
||||
RAFT_DEBUG_FMT("Commit on {}: {}", state->my_node_id, idx);
|
||||
|
||||
// Examine each configuration that is followed by a globally committed
|
||||
// configuration.
|
||||
|
@ -2302,7 +2339,7 @@ namespace aft
|
|||
!rr.has_value() ||
|
||||
!resharing_tracker->have_resharing_result_for(rr.value(), idx))
|
||||
{
|
||||
LOG_TRACE_FMT(
|
||||
RAFT_TRACE_FMT(
|
||||
"Configurations: not switching to next configuration, resharing "
|
||||
"not completed yet.");
|
||||
break;
|
||||
|
@ -2311,7 +2348,7 @@ namespace aft
|
|||
|
||||
if (reconfiguration_type == ReconfigurationType::ONE_TRANSACTION)
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: discard committed configuration at {}", conf->idx);
|
||||
configurations.pop_front();
|
||||
changed = true;
|
||||
|
@ -2346,7 +2383,7 @@ namespace aft
|
|||
num_trusted_nodes == next->nodes.size() &&
|
||||
num_retired_nodes == num_required_retired_nodes)
|
||||
{
|
||||
LOG_TRACE_FMT(
|
||||
RAFT_TRACE_FMT(
|
||||
"Configurations: all nodes trusted ({}) or retired ({}), "
|
||||
"switching to configuration #{}",
|
||||
num_trusted_nodes,
|
||||
|
@ -2357,7 +2394,7 @@ namespace aft
|
|||
is_learner() &&
|
||||
next->nodes.find(state->my_node_id) != next->nodes.end())
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Becoming follower {}: {}",
|
||||
state->my_node_id,
|
||||
state->current_view);
|
||||
|
@ -2387,7 +2424,7 @@ namespace aft
|
|||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE_FMT(
|
||||
RAFT_TRACE_FMT(
|
||||
"Configurations: not enough trusted or retired nodes for "
|
||||
"configuration #{} ({}/{} trusted, {}/{} retired)",
|
||||
next->rid,
|
||||
|
@ -2449,7 +2486,7 @@ namespace aft
|
|||
{
|
||||
if (consensus_type == ConsensusType::CFT && idx < state->commit_idx)
|
||||
{
|
||||
LOG_FAIL_FMT(
|
||||
RAFT_FAIL_FMT(
|
||||
"Asked to rollback to idx:{} but committed to commit_idx:{} - "
|
||||
"ignoring rollback request",
|
||||
idx,
|
||||
|
@ -2459,10 +2496,10 @@ namespace aft
|
|||
|
||||
store->rollback({get_term_internal(idx), idx}, state->current_view);
|
||||
|
||||
LOG_DEBUG_FMT("Setting term in store to: {}", state->current_view);
|
||||
RAFT_DEBUG_FMT("Setting term in store to: {}", state->current_view);
|
||||
ledger->truncate(idx);
|
||||
state->last_idx = idx;
|
||||
LOG_DEBUG_FMT("Rolled back at {}", idx);
|
||||
RAFT_DEBUG_FMT("Rolled back at {}", idx);
|
||||
|
||||
state->view_history.rollback(idx);
|
||||
|
||||
|
@ -2495,7 +2532,7 @@ namespace aft
|
|||
membership_state = reconfiguration_type == ONE_TRANSACTION ?
|
||||
kv::MembershipState::Active :
|
||||
kv::MembershipState::RetirementInitiated;
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Becoming {} after rollback",
|
||||
reconfiguration_type == ONE_TRANSACTION ? "Active" :
|
||||
"RetirementInitiated");
|
||||
|
@ -2507,7 +2544,7 @@ namespace aft
|
|||
|
||||
while (!configurations.empty() && (configurations.back().idx > idx))
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: rollback configuration at {}",
|
||||
configurations.back().idx);
|
||||
configurations.pop_back();
|
||||
|
@ -2576,7 +2613,7 @@ namespace aft
|
|||
for (auto node_id : to_remove)
|
||||
{
|
||||
all_other_nodes.erase(node_id);
|
||||
LOG_INFO_FMT("Removed raft node {}", node_id);
|
||||
RAFT_INFO_FMT("Removed raft node {}", node_id);
|
||||
}
|
||||
|
||||
// Add all active nodes that are not already present in the node state.
|
||||
|
@ -2591,7 +2628,7 @@ namespace aft
|
|||
{
|
||||
if (!channels->have_channel(node_info.first))
|
||||
{
|
||||
LOG_DEBUG_FMT(
|
||||
RAFT_DEBUG_FMT(
|
||||
"Configurations: create node channel with {}", node_info.first);
|
||||
|
||||
channels->associate_node_address(
|
||||
|
@ -2611,7 +2648,7 @@ namespace aft
|
|||
send_append_entries(node_info.first, index);
|
||||
}
|
||||
|
||||
LOG_INFO_FMT(
|
||||
RAFT_INFO_FMT(
|
||||
"Added raft node {} ({}:{})",
|
||||
node_info.first,
|
||||
node_info.second.hostname,
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the Apache 2.0 License.
|
||||
|
||||
#define VERBOSE_RAFT_LOGGING
|
||||
|
||||
#include "driver.h"
|
||||
|
||||
#include "ccf/ds/hash.h"
|
||||
|
@ -41,6 +44,8 @@ int main(int argc, char** argv)
|
|||
// cmake with ".. -DVERBOSE_LOGGING=DEBUG"
|
||||
logger::config::level() = logger::DEBUG;
|
||||
|
||||
threading::ThreadMessaging::init(1);
|
||||
|
||||
const std::string filename = argv[1];
|
||||
|
||||
std::ifstream fstream;
|
||||
|
|
|
@ -799,4 +799,40 @@ struct formatter<kv::Configuration::Nodes>
|
|||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct formatter<kv::MembershipState>
|
||||
{
|
||||
template <typename ParseContext>
|
||||
auto parse(ParseContext& ctx)
|
||||
{
|
||||
return ctx.begin();
|
||||
}
|
||||
|
||||
template <typename FormatContext>
|
||||
auto format(const kv::MembershipState& state, FormatContext& ctx) const
|
||||
-> decltype(ctx.out())
|
||||
{
|
||||
const auto s = nlohmann::json(state).get<std::string>();
|
||||
return format_to(ctx.out(), "{}", s);
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct formatter<kv::LeadershipState>
|
||||
{
|
||||
template <typename ParseContext>
|
||||
auto parse(ParseContext& ctx)
|
||||
{
|
||||
return ctx.begin();
|
||||
}
|
||||
|
||||
template <typename FormatContext>
|
||||
auto format(const kv::LeadershipState& state, FormatContext& ctx) const
|
||||
-> decltype(ctx.out())
|
||||
{
|
||||
const auto s = nlohmann::json(state).get<std::string>();
|
||||
return format_to(ctx.out(), "{}", s);
|
||||
}
|
||||
};
|
||||
|
||||
FMT_END_NAMESPACE
|
Загрузка…
Ссылка в новой задаче