This commit is contained in:
Eddy Ashton 2021-07-23 11:21:15 +01:00 коммит произвёл GitHub
Родитель 840fcaabeb
Коммит a156bbf51d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 765 добавлений и 151 удалений

Просмотреть файл

@ -436,7 +436,7 @@ if(BUILD_TESTS)
NAME raft_scenario_test
COMMAND
${PYTHON} ${CMAKE_SOURCE_DIR}/tests/raft_scenarios_runner.py
./raft_driver --gen-scenarios ${CMAKE_SOURCE_DIR}/tests/raft_scenarios/
--gen-scenarios ./raft_driver ${CMAKE_SOURCE_DIR}/tests/raft_scenarios/
)
set_property(TEST raft_scenario_test PROPERTY LABELS raft_scenario)

Просмотреть файл

@ -2123,13 +2123,6 @@ namespace aft
void send_append_entries_response(
ccf::NodeId to, AppendEntriesResponseType answer)
{
LOG_DEBUG_FMT(
"Send append entries response from {} to {} for index {}: {}",
state->my_node_id.trim(),
to.trim(),
state->last_idx,
answer);
if (answer == AppendEntriesResponseType::REQUIRE_EVIDENCE)
{
state->requested_evidence_from = to;
@ -2148,6 +2141,13 @@ namespace aft
state->commit_idx :
state->last_idx;
LOG_DEBUG_FMT(
"Send append entries response from {} to {} for index {}: {}",
state->my_node_id.trim(),
to.trim(),
matching_idx,
answer);
AppendEntriesResponse response = {{raft_append_entries_response},
state->current_view,
matching_idx,

Просмотреть файл

@ -33,8 +33,16 @@ int main(int argc, char** argv)
"Too few arguments - first must be path to scenario");
}
const std::string filename = argv[1];
std::ifstream fstream;
fstream.open(argv[1]);
fstream.open(filename);
if (!fstream.is_open())
{
throw std::runtime_error(
fmt::format("File {} does not exist or could not be opened", filename));
}
string line;
while (getline(fstream, line))
@ -86,6 +94,10 @@ int main(int argc, char** argv)
assert(items.size() == 1);
driver->shuffle_messages_all();
break;
case shash("dispatch_one"):
assert(items.size() == 2);
driver->dispatch_one(items[1]);
break;
case shash("dispatch_all"):
assert(items.size() == 1);
driver->dispatch_all();
@ -116,6 +128,14 @@ int main(int argc, char** argv)
assert(items.size() == 2);
driver->reconnect_node(items[1]);
break;
case shash("drop_pending"):
assert(items.size() == 2);
driver->drop_pending(items[1]);
break;
case shash("drop_pending_to"):
assert(items.size() == 3);
driver->drop_pending_to(items[1], items[2]);
break;
case shash("assert_state_sync"):
assert(items.size() == 1);
driver->assert_state_sync();

Просмотреть файл

@ -23,7 +23,17 @@ std::string stringify(const std::vector<uint8_t>& v, size_t max_size = 15ul)
"[{} bytes] {}", v.size(), std::string(v.begin(), v.begin() + size));
}
struct LedgerStubProxy_WithLogging : public aft::LedgerStubProxy
std::string stringify(const std::optional<std::vector<uint8_t>>& o)
{
if (o.has_value())
{
return stringify(*o);
}
return "MISSING";
}
struct LedgerStubProxy_Mermaid : public aft::LedgerStubProxy
{
using LedgerStubProxy::LedgerStubProxy;
@ -32,23 +42,62 @@ struct LedgerStubProxy_WithLogging : public aft::LedgerStubProxy
bool globally_committable,
bool force_chunk) override
{
RAFT_DRIVER_OUT << " " << _id << "->>" << _id
<< ": ledger put s: " << stringify(data) << std::endl;
RAFT_DRIVER_OUT << fmt::format(
" {}->>{}: [ledger] appending: {}",
_id,
_id,
stringify(data))
<< std::endl;
aft::LedgerStubProxy::put_entry(data, globally_committable, force_chunk);
}
void truncate(aft::Index idx) override
{
RAFT_DRIVER_OUT << " " << _id << "->>" << _id << ": truncate i: " << idx
RAFT_DRIVER_OUT << fmt::format(
" {}->>{}: [ledger] truncating to {}", _id, _id, idx)
<< std::endl;
aft::LedgerStubProxy::truncate(idx);
}
};
struct LoggingStubStoreSig_Mermaid : public aft::LoggingStubStoreSig
{
using LoggingStubStoreSig::LoggingStubStoreSig;
void compact(aft::Index idx) override
{
RAFT_DRIVER_OUT << fmt::format(
" {}->>{}: [KV] compacting to {}", _id, _id, idx)
<< std::endl;
aft::LoggingStubStoreSig::compact(idx);
}
void rollback(const kv::TxID& tx_id, aft::Term t) override
{
RAFT_DRIVER_OUT << fmt::format(
" {}->>{}: [KV] rolling back to {}.{}, in term {}",
_id,
_id,
tx_id.term,
tx_id.version,
t)
<< std::endl;
aft::LoggingStubStoreSig::rollback(tx_id, t);
}
void initialise_term(aft::Term t) override
{
RAFT_DRIVER_OUT << fmt::format(
" {}->>{}: [KV] initialising in term {}", _id, _id, t)
<< std::endl;
aft::LoggingStubStoreSig::initialise_term(t);
}
};
using ms = std::chrono::milliseconds;
using TRaft = aft::
Aft<LedgerStubProxy_WithLogging, aft::ChannelStubProxy, aft::StubSnapshotter>;
using Store = aft::LoggingStubStoreSig;
Aft<LedgerStubProxy_Mermaid, aft::ChannelStubProxy, aft::StubSnapshotter>;
using Store = LoggingStubStoreSig_Mermaid;
using Adaptor = aft::Adaptor<Store>;
std::vector<uint8_t> cert;
@ -67,7 +116,7 @@ private:
std::shared_ptr<TRaft> raft;
};
std::unordered_map<ccf::NodeId, NodeDriver> _nodes;
std::map<ccf::NodeId, NodeDriver> _nodes;
std::set<std::pair<ccf::NodeId, ccf::NodeId>> _connections;
public:
@ -83,7 +132,7 @@ public:
auto raft = std::make_shared<TRaft>(
ConsensusType::CFT,
std::make_unique<Adaptor>(kv),
std::make_unique<LedgerStubProxy_WithLogging>(node_id),
std::make_unique<LedgerStubProxy_Mermaid>(node_id),
std::make_shared<aft::ChannelStubProxy>(),
std::make_shared<aft::StubSnapshotter>(),
nullptr,
@ -107,41 +156,58 @@ public:
}
}
void log(ccf::NodeId first, ccf::NodeId second, const std::string& message)
void log(
ccf::NodeId first,
ccf::NodeId second,
const std::string& message,
bool dropped = false)
{
RAFT_DRIVER_OUT << " " << first << "->>" << second << ": " << message
<< std::endl;
RAFT_DRIVER_OUT << " " << first << "-" << (dropped ? "X" : ">>") << second
<< ": " << message << std::endl;
}
void rlog(ccf::NodeId first, ccf::NodeId second, const std::string& message)
void rlog(
ccf::NodeId first,
ccf::NodeId second,
const std::string& message,
bool dropped = false)
{
RAFT_DRIVER_OUT << " " << first << "-->>" << second << ": " << message
<< std::endl;
RAFT_DRIVER_OUT << " " << first << "--" << (dropped ? "X" : ">>") << second
<< ": " << message << std::endl;
}
void log_msg_details(
ccf::NodeId node_id, ccf::NodeId tgt_node_id, aft::RequestVote rv)
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
aft::RequestVote rv,
bool dropped)
{
const auto s = fmt::format(
"request_vote for term {}, at tx {}.{}",
rv.term,
rv.term_of_last_committable_idx,
rv.last_committable_idx);
log(node_id, tgt_node_id, s);
log(node_id, tgt_node_id, s, dropped);
}
void log_msg_details(
ccf::NodeId node_id, ccf::NodeId tgt_node_id, aft::RequestVoteResponse rv)
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
aft::RequestVoteResponse rv,
bool dropped)
{
const auto s = fmt::format(
"request_vote_response for term {} = {}",
rv.term,
(rv.vote_granted ? "Y" : "N"));
rlog(node_id, tgt_node_id, s);
rlog(node_id, tgt_node_id, s, dropped);
}
void log_msg_details(
ccf::NodeId node_id, ccf::NodeId tgt_node_id, aft::AppendEntries ae)
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
aft::AppendEntries ae,
bool dropped)
{
const auto s = fmt::format(
"append_entries ({}.{}, {}.{}] (term {}, commit {})",
@ -151,13 +217,14 @@ public:
ae.idx,
ae.term,
ae.leader_commit_idx);
log(node_id, tgt_node_id, s);
log(node_id, tgt_node_id, s, dropped);
}
void log_msg_details(
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
aft::AppendEntriesResponse aer)
aft::AppendEntriesResponse aer,
bool dropped)
{
char const* success = "UNHANDLED";
switch (aer.success)
@ -183,13 +250,14 @@ public:
success,
aer.term,
aer.last_log_idx);
rlog(node_id, tgt_node_id, s);
rlog(node_id, tgt_node_id, s, dropped);
}
void log_msg_details(
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
const std::vector<uint8_t>& contents)
const std::vector<uint8_t>& contents,
bool dropped = false)
{
const uint8_t* data = contents.data();
size_t size = contents.size();
@ -200,25 +268,25 @@ public:
case (aft::RaftMsgType::raft_request_vote):
{
auto rv = *(aft::RequestVote*)data;
log_msg_details(node_id, tgt_node_id, rv);
log_msg_details(node_id, tgt_node_id, rv, dropped);
break;
}
case (aft::RaftMsgType::raft_request_vote_response):
{
auto rvr = *(aft::RequestVoteResponse*)data;
log_msg_details(node_id, tgt_node_id, rvr);
log_msg_details(node_id, tgt_node_id, rvr, dropped);
break;
}
case (aft::RaftMsgType::raft_append_entries):
{
auto ae = *(aft::AppendEntries*)data;
log_msg_details(node_id, tgt_node_id, ae);
log_msg_details(node_id, tgt_node_id, ae, dropped);
break;
}
case (aft::RaftMsgType::raft_append_entries_response):
{
auto aer = *(aft::AppendEntriesResponse*)data;
log_msg_details(node_id, tgt_node_id, aer);
log_msg_details(node_id, tgt_node_id, aer, dropped);
break;
}
default:
@ -257,8 +325,10 @@ public:
{
auto raft = _nodes.at(node_id).raft;
RAFT_DRIVER_OUT << fmt::format(
" Note right of {}: @{}.{} (committed {})",
" Note right of {}: {} @{}.{} (committed {})",
node_id,
raft->is_primary() ? "P" :
(raft->is_follower() ? "F" : "C"),
raft->get_term(),
raft->get_last_idx(),
raft->get_commit_idx())
@ -292,11 +362,14 @@ public:
}
template <class Messages>
size_t dispatch_one_queue(ccf::NodeId node_id, Messages& messages)
size_t dispatch_one_queue(
ccf::NodeId node_id,
Messages& messages,
const std::optional<size_t>& max_count = std::nullopt)
{
size_t count = 0;
while (messages.size())
while (messages.size() && (!max_count.has_value() || count < *max_count))
{
auto [tgt_node_id, contents] = messages.front();
messages.pop_front();
@ -310,40 +383,81 @@ public:
const uint8_t* data = contents.data();
auto size = contents.size();
auto msg_type = serialized::peek<aft::RaftMsgType>(data, size);
bool should_send = true;
if (msg_type == aft::raft_append_entries)
{
// Parse the indices to be sent to the recipient.
auto ae = *(aft::AppendEntries*)data;
auto& sender_ledger = _nodes.at(node_id).raft->ledger;
for (auto idx = ae.prev_idx + 1; idx <= ae.idx; ++idx)
auto& sender_raft = _nodes.at(node_id).raft;
const auto payload_opt =
sender_raft->ledger->get_append_entries_payload(ae, sender_raft);
if (!payload_opt.has_value())
{
const auto entry = sender_ledger->get_entry_by_idx(idx);
contents.insert(contents.end(), entry.begin(), entry.end());
// While trying to construct an AppendEntries, we asked for an
// entry that doesn't exist. This is a valid situation - we queued
// the AppendEntries, but rolled back before it was dispatched!
// We abandon this operation here.
// We could log this in Mermaid with the line below, but since
// this does not occur in a real node it is silently ignored. In a
// real node, the AppendEntries and truncate messages are ordered
// and processed by the host in that order. All AppendEntries
// referencing a specific index will be processed before any
// truncation that removes that index.
// RAFT_DRIVER_OUT
// << fmt::format(
// " Note right of {}: Abandoning AppendEntries"
// "containing {} - no longer in ledger",
// node_id,
// idx)
// << std::endl;
should_send = false;
}
else
{
contents.insert(
contents.end(), payload_opt->begin(), payload_opt->end());
}
}
log_msg_details(node_id, tgt_node_id, contents);
_nodes.at(tgt_node_id)
.raft->recv_message(node_id, contents.data(), contents.size());
count++;
if (should_send)
{
log_msg_details(node_id, tgt_node_id, contents);
_nodes.at(tgt_node_id)
.raft->recv_message(node_id, contents.data(), contents.size());
count++;
}
}
}
return count;
}
void dispatch_one(ccf::NodeId node_id)
void dispatch_one(
ccf::NodeId node_id, const std::optional<size_t>& max_count = std::nullopt)
{
auto raft = _nodes.at(node_id).raft;
dispatch_one_queue(node_id, channel_stub_proxy(*raft)->messages);
dispatch_one_queue(node_id, channel_stub_proxy(*raft)->messages, max_count);
}
void dispatch_all_once()
{
// The intent is to dispatch all _current_ messages, but no new ones. If we
// simply iterated, then we may dispatch new messages that are produced on
// later nodes, in response to messages from earlier-processed nodes. To
// avoid that, we count how many messages are present initially, and cap to
// only processing that many
std::map<ccf::NodeId, size_t> initial_message_counts;
for (auto& [node_id, driver] : _nodes)
{
initial_message_counts[node_id] =
channel_stub_proxy(*driver.raft)->messages.size();
}
for (auto& node : _nodes)
{
dispatch_one(node.first);
dispatch_one(node.first, initial_message_counts[node.first]);
}
}
@ -413,7 +527,7 @@ public:
if (!opt.has_value())
{
RAFT_DRIVER_OUT << fmt::format(
" Note right of {}: No primary to replicate {}",
" Note left of {}: No primary to replicate {}",
_nodes.begin()->first,
stringify(*data))
<< std::endl;
@ -487,6 +601,33 @@ public:
}
}
void drop_pending_to(ccf::NodeId from, ccf::NodeId to)
{
auto from_raft = _nodes.at(from).raft;
auto& messages = channel_stub_proxy(*from_raft)->messages;
auto it = messages.begin();
while (it != messages.end())
{
if (it->first == to)
{
log_msg_details(from, to, it->second, true);
it = messages.erase(it);
}
else
{
++it;
}
}
}
void drop_pending(ccf::NodeId from)
{
for (auto& [to, _] : _nodes)
{
drop_pending_to(from, to);
}
}
void assert_state_sync()
{
auto [target_id, nd] = *_nodes.begin();

Просмотреть файл

@ -17,7 +17,7 @@ namespace aft
ccf::NodeId _id;
public:
std::vector<std::shared_ptr<std::vector<uint8_t>>> ledger;
std::vector<std::vector<uint8_t>> ledger;
uint64_t skip_count = 0;
LedgerStubProxy(const ccf::NodeId& id) : _id(id) {}
@ -27,29 +27,76 @@ namespace aft
bool globally_committable,
bool force_chunk)
{
auto size = data.size();
auto buffer = std::make_shared<std::vector<uint8_t>>(size);
auto ptr = buffer->data();
serialized::write(ptr, size, data.data(), data.size());
ledger.push_back(buffer);
ledger.push_back(data);
}
void skip_entry(const uint8_t*& data, size_t& size)
{
skip_count++;
get_entry(data, size);
++skip_count;
}
std::vector<uint8_t> get_entry(const uint8_t*& data, size_t& size)
{
return {data, data + size};
const auto entry_size = serialized::read<size_t>(data, size);
std::vector<uint8_t> entry(data, data + entry_size);
serialized::skip(data, size, entry_size);
return entry;
}
std::vector<uint8_t> get_entry_by_idx(size_t idx)
std::optional<std::vector<uint8_t>> get_entry_by_idx(size_t idx)
{
// Ledger indices are 1-based, hence the -1
return *(ledger[idx - 1]);
if (idx > 0 && idx <= ledger.size())
{
return ledger[idx - 1];
}
return std::nullopt;
}
template <typename T>
std::optional<std::vector<uint8_t>> get_append_entries_payload(
const aft::AppendEntries& ae, T& term_getter)
{
std::vector<uint8_t> payload;
for (auto idx = ae.prev_idx + 1; idx <= ae.idx; ++idx)
{
auto entry_opt = get_entry_by_idx(idx);
if (!entry_opt.has_value())
{
return std::nullopt;
}
const auto& entry = *entry_opt;
// The payload that we eventually deserialise must include the
// ledger entry as well as the View and Index that identify it. In
// the real entries, they are nested in the payload and the IV. For
// test purposes, we just prefix them manually (to mirror the
// deserialisation in LoggingStubStore::ExecutionWrapper). We also
// size-prefix, so in a buffer of multiple of these messages we can
// extract each with get_entry above
const auto term_of_idx = term_getter->get_term(idx);
const auto size_before = payload.size();
auto additional_size =
sizeof(size_t) + sizeof(term_of_idx) + sizeof(idx);
const auto size_after = size_before + additional_size;
payload.resize(size_after);
{
uint8_t* data = payload.data() + size_before;
serialized::write(
data,
additional_size,
(sizeof(term_of_idx) + sizeof(idx) + entry.size()));
serialized::write(data, additional_size, term_of_idx);
serialized::write(data, additional_size, idx);
}
payload.insert(payload.end(), entry.begin(), entry.end());
}
return payload;
}
virtual void truncate(Index idx)
@ -186,63 +233,42 @@ namespace aft
class LoggingStubStore
{
private:
protected:
ccf::NodeId _id;
public:
LoggingStubStore(ccf::NodeId id) : _id(id) {}
virtual void compact(Index i)
{
#ifdef STUB_LOG
std::cout << " Node" << _id << "->>KV" << _id << ": compact i: " << i
<< std::endl;
#endif
}
virtual void compact(Index i) {}
virtual void rollback(const kv::TxID& tx_id, Term t)
{
#ifdef STUB_LOG
std::cout << " Node" << _id << "->>KV" << _id
<< ": rollback i: " << tx_id.version << " term: " << t;
std::cout << std::endl;
#endif
}
virtual void rollback(const kv::TxID& tx_id, Term t) {}
virtual void initialise_term(Term t)
{
#ifdef STUB_LOG
std::cout << " Node" << _id << "->>KV" << _id
<< ": initialise_term t: " << t << std::endl;
#endif
}
virtual void initialise_term(Term t) {}
kv::Version current_version()
{
return kv::NoVersion;
}
virtual kv::ApplyResult deserialise_views(
const std::vector<uint8_t>& data,
kv::ConsensusHookPtrs& hooks,
bool public_only = false,
kv::Term* term = nullptr,
kv::Version* index = nullptr,
kv::Tx* tx = nullptr,
ccf::PrimarySignature* sig = nullptr)
{
return kv::ApplyResult::PASS;
}
template <kv::ApplyResult AR>
class ExecutionWrapper : public kv::AbstractExecutionWrapper
{
private:
std::vector<uint8_t> data;
kv::ConsensusHookPtrs hooks;
aft::Term term;
kv::Version index;
std::vector<uint8_t> entry;
public:
ExecutionWrapper(const std::vector<uint8_t>& data_) : data(data_) {}
ExecutionWrapper(const std::vector<uint8_t>& data_)
{
const uint8_t* data = data_.data();
auto size = data_.size();
term = serialized::read<aft::Term>(data, size);
index = serialized::read<kv::Version>(data, size);
entry = serialized::read(data, size, size);
}
kv::ApplyResult apply() override
{
@ -256,17 +282,17 @@ namespace aft
const std::vector<uint8_t>& get_entry() override
{
return data;
return entry;
}
Term get_term() override
{
return 0;
return term;
}
kv::Version get_index() override
{
return 0;
return index;
}
kv::Version get_max_conflict_version() override

Просмотреть файл

@ -25,11 +25,50 @@ std::atomic<uint16_t> threading::ThreadMessaging::thread_count = 1;
std::vector<uint8_t> cert;
const auto request_timeout = ms(10);
aft::ChannelStubProxy* channel_stub_proxy(const TRaft& r)
{
return (aft::ChannelStubProxy*)r.channels.get();
}
void receive_message(
TRaft& sender, TRaft& receiver, std::vector<uint8_t> contents)
{
bool should_send = true;
{
// If this is AppendEntries, then append the serialised ledger entries to
// the message before transmitting
const uint8_t* data = contents.data();
auto size = contents.size();
auto msg_type = serialized::peek<aft::RaftMsgType>(data, size);
if (msg_type == aft::raft_append_entries)
{
// Parse the indices to be sent to the recipient.
auto ae = *(aft::AppendEntries*)data;
TRaft* ps = &sender;
const auto payload_opt =
sender.ledger->get_append_entries_payload(ae, ps);
if (payload_opt.has_value())
{
contents.insert(
contents.end(), payload_opt->begin(), payload_opt->end());
}
else
{
should_send = false;
}
}
}
if (should_send)
{
receiver.recv_message(sender.id(), contents.data(), contents.size());
}
}
DOCTEST_TEST_CASE("Single node startup" * doctest::test_suite("single"))
{
ccf::NodeId node_id = kv::test::PrimaryNodeId;
@ -49,7 +88,7 @@ DOCTEST_TEST_CASE("Single node startup" * doctest::test_suite("single"))
nullptr,
nullptr,
nullptr,
ms(10),
request_timeout,
election_timeout,
ms(1000));
@ -94,7 +133,7 @@ DOCTEST_TEST_CASE("Single node commit" * doctest::test_suite("single"))
nullptr,
nullptr,
nullptr,
ms(10),
request_timeout,
election_timeout,
ms(1000));
@ -216,7 +255,7 @@ DOCTEST_TEST_CASE(
rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView);
}
r1.recv_message(node_id0, rv_raw->data(), rv_raw->size());
receive_message(r0, r1, *rv_raw);
DOCTEST_INFO("Node 2 receives the request");
@ -230,7 +269,7 @@ DOCTEST_TEST_CASE(
rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView);
}
r2.recv_message(node_id0, rv_raw->data(), rv_raw->size());
receive_message(r0, r2, *rv_raw);
DOCTEST_INFO("Node 1 votes for Node 0");
@ -247,7 +286,7 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(rvrc.vote_granted);
}
r0.recv_message(node_id1, rvr_raw->data(), rvr_raw->size());
receive_message(r1, r0, *rvr_raw);
DOCTEST_INFO("Node 2 votes for Node 0");
@ -264,7 +303,7 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(rvrc.vote_granted);
}
r0.recv_message(node_id2, rvr_raw->data(), rvr_raw->size());
receive_message(r2, r0, *rvr_raw);
DOCTEST_INFO(
"Node 0 is now leader, and sends empty append entries to other nodes");
@ -296,25 +335,6 @@ DOCTEST_TEST_CASE(
}
}
template <class NodeMap>
static size_t dispatch_all(
NodeMap& nodes,
const ccf::NodeId& from,
aft::ChannelStubProxy::MessageList& messages)
{
size_t count = 0;
while (messages.size())
{
auto message = messages.front();
messages.pop_front();
auto tgt_node_id = get<0>(message);
auto contents = get<1>(message);
nodes[tgt_node_id]->recv_message(from, contents.data(), contents.size());
count++;
}
return count;
}
template <typename AssertionArg, class NodeMap, class Assertion>
static size_t dispatch_all_and_DOCTEST_CHECK(
NodeMap& nodes,
@ -325,18 +345,33 @@ static size_t dispatch_all_and_DOCTEST_CHECK(
size_t count = 0;
while (messages.size())
{
auto message = messages.front();
auto [tgt_node_id, contents] = messages.front();
messages.pop_front();
auto tgt_node_id = get<0>(message);
auto contents = get<1>(message);
AssertionArg arg = *(AssertionArg*)contents.data();
assertion(arg);
nodes[tgt_node_id]->recv_message(from, contents.data(), contents.size());
{
AssertionArg arg = *(AssertionArg*)contents.data();
assertion(arg);
}
receive_message(*nodes[from], *nodes[tgt_node_id], contents);
count++;
}
return count;
}
template <class NodeMap>
static size_t dispatch_all(
NodeMap& nodes,
const ccf::NodeId& from,
aft::ChannelStubProxy::MessageList& messages)
{
return dispatch_all_and_DOCTEST_CHECK<bool>(
nodes, from, messages, [](const auto&) {
// Pass
});
}
DOCTEST_TEST_CASE(
"Multiple nodes append entries" * doctest::test_suite("multiple"))
{
@ -459,11 +494,11 @@ DOCTEST_TEST_CASE(
DOCTEST_INFO("Tell the leader to replicate a message");
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data, true, hooks}}, 1));
DOCTEST_REQUIRE(r0.ledger->ledger.size() == 1);
DOCTEST_REQUIRE(*r0.ledger->ledger.front() == entry);
DOCTEST_REQUIRE(r0.ledger->ledger.front() == entry);
DOCTEST_INFO("The other nodes are not told about this yet");
DOCTEST_REQUIRE(r0c->messages.size() == 0);
r0.periodic(ms(10));
r0.periodic(request_timeout);
DOCTEST_INFO("Now the other nodes are sent append_entries");
DOCTEST_REQUIRE(
@ -590,7 +625,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
auto data = std::make_shared<std::vector<uint8_t>>(first_entry);
auto hooks = std::make_shared<kv::ConsensusHookPtrs>();
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data, true, hooks}}, 1));
r0.periodic(ms(10));
r0.periodic(request_timeout);
DOCTEST_REQUIRE(
1 ==
@ -721,12 +756,12 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, data_1, true, hooks}}, 1));
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, data_2, true, hooks}}, 1));
DOCTEST_REQUIRE(r0.ledger->ledger.size() == 2);
r0.periodic(ms(10));
r0.periodic(request_timeout);
DOCTEST_REQUIRE(r0c->messages.size() == 1);
// Receive append entries (idx: 2, prev_idx: 0)
ae_idx_2 = r0c->messages.front().second;
r1.recv_message(node_id0, ae_idx_2.data(), ae_idx_2.size());
receive_message(r0, r1, ae_idx_2);
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 2);
}
@ -750,7 +785,8 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
r1c->messages.pop_front();
auto aer = *(aft::AppendEntriesResponse*)aer_v.data();
aer.success = aft::AppendEntriesResponseType::FAIL;
r0.recv_message(node_id1, reinterpret_cast<uint8_t*>(&aer), sizeof(aer));
const auto p = reinterpret_cast<uint8_t*>(&aer);
receive_message(r1, r0, {p, p + sizeof(aer)});
DOCTEST_REQUIRE(r0c->messages.size() == 1);
// Only the third entry is deserialised
@ -763,7 +799,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
DOCTEST_INFO("Receiving stale append entries has no effect");
{
r1.recv_message(node_id0, ae_idx_2.data(), ae_idx_2.size());
receive_message(r0, r1, ae_idx_2);
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3);
}
@ -774,7 +810,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
auto hooks = std::make_shared<kv::ConsensusHookPtrs>();
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{4, data, true, hooks}}, 1));
DOCTEST_REQUIRE(r0.ledger->ledger.size() == 4);
r0.periodic(ms(10));
r0.periodic(request_timeout);
DOCTEST_REQUIRE(r0c->messages.size() == 1);
DOCTEST_REQUIRE(1 == dispatch_all(nodes, node_id0, r0c->messages));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 4);
@ -788,7 +824,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
auto hooks = std::make_shared<kv::ConsensusHookPtrs>();
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{5, data, true, hooks}}, 1));
DOCTEST_REQUIRE(r0.ledger->ledger.size() == 5);
r0.periodic(ms(10));
r0.periodic(request_timeout);
DOCTEST_REQUIRE(r0c->messages.size() == 1);
r0c->messages.pop_front();
@ -798,7 +834,8 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
r1c->messages.pop_front();
auto aer = *(aft::AppendEntriesResponse*)aer_v.data();
aer.success = aft::AppendEntriesResponseType::FAIL;
r0.recv_message(node_id1, reinterpret_cast<uint8_t*>(&aer), sizeof(aer));
const auto p = reinterpret_cast<uint8_t*>(&aer);
receive_message(r1, r0, {p, p + sizeof(aer)});
DOCTEST_REQUIRE(r0c->messages.size() == 1);
// Receive append entries (idx: 5, prev_idx: 3)
@ -942,6 +979,16 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
dispatch_all(nodes, node_id0, r0c->messages);
}
// Tick to allow any remaining entries to be sent
r0.periodic(request_timeout);
dispatch_all(nodes, node_id0, r0c->messages);
{
DOCTEST_INFO("Nodes 0 and 1 have the same complete ledger");
DOCTEST_REQUIRE(r0.ledger->ledger.size() == individual_entries);
DOCTEST_REQUIRE(r1.ledger->ledger.size() == individual_entries);
}
DOCTEST_INFO("Node 2 joins the ensemble");
aft::Configuration::Nodes config1;
@ -969,13 +1016,12 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
}));
DOCTEST_REQUIRE(r2.ledger->ledger.size() == 0);
DOCTEST_REQUIRE(r0.ledger->ledger.size() == individual_entries);
DOCTEST_INFO("Node 2 asks for Node 0 to send all the data up to now");
DOCTEST_REQUIRE(r2c->messages.size() == 1);
auto aer = r2c->messages.front().second;
r2c->messages.pop_front();
r0.recv_message(node_id2, aer.data(), aer.size());
receive_message(r2, r0, aer);
DOCTEST_REQUIRE(r0c->messages.size() > num_small_entries_sent);
DOCTEST_REQUIRE(

Просмотреть файл

@ -0,0 +1,125 @@
nodes,0,1,2
connect,0,1
connect,1,2
connect,0,2
# Node 0 starts first, and wins the first election
periodic_one,0,110
dispatch_all
periodic_all,10
dispatch_all
# An initial entry is written and successfully replicated
replicate,1,helloworld
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
assert_state_sync
# An additional entry takes much longer to replicate, as it is lost on bad connection between nodes
replicate,1,salutonmondo
periodic_all,10
drop_pending,0
dispatch_all
periodic_all,10
drop_pending,0
dispatch_all
periodic_all,10
drop_pending,0
dispatch_all
# Before either follower times out, the message eventually reaches Node 1 (though not Node 2)
periodic_all,10
drop_pending_to,0,2
# N0->N1 "Are you up-to-date?"
dispatch_all_once
# N1->N0 "No, I'm not"
dispatch_all_once
# N0->N1 "Here's the bit you're missing"
dispatch_all_once
# N1->N0 "Ok, I've got it"
drop_pending_to,1,0 #< This is dropped!
dispatch_all_once
# At this point, 1.2 is committed, but nobody knows that yet
# Eventually Node 2 is partitioned for so long that it calls an election
periodic_one,2,100
# But this RequestVote is lost!
periodic_all,10
drop_pending,1 #< The ACKs from 1 are constantly dropped
drop_pending,2
dispatch_all_once
periodic_all,10
drop_pending,1 #< The ACKs from 1 are constantly dropped
drop_pending,2
dispatch_all_once
# Eventually Node 2 is partitioned for so long that it calls many more elections, which nobody hears about
periodic_one,2,100
drop_pending,2
periodic_one,2,100
drop_pending,2
periodic_one,2,100
drop_pending,2
# TODO: Add more precise state assert? Currently 2 is @5.1, and is a Candidate
# Eventually Node 1 stops hearing from Node 0, and calls an election
state_all
drop_pending,1
periodic_one,1,100
dispatch_all_once #< Finally, everyone hears about this!
drop_pending,0 #< Node 0's response (in favour is dropped)
dispatch_all_once #< Node 2's response, denying and giving a higher term, is received
# Node 2 tries yet again for an election, and this time is able to send that to Node 1
periodic_one,2,100
drop_pending_to,2,0
dispatch_all_once
# But Node 1 votes against, because it is further ahead than Node 2
dispatch_all_once
# Now Node 1 tries for an election
periodic_one,1,100
dispatch_all_once
# It gets positive votes from both, but we only deliver the one from Node 2
drop_pending,0
dispatch_all_once
# Node 1 is now primary, though it still doesn't know that 2 is committed
state_all
# Now we allow the network to heal and return to normal
periodic_all,10
dispatch_all_once #< "Where are you?"
periodic_all,10
dispatch_all_once #< "Out-of-date"
periodic_all,10
dispatch_all_once #< "Here's latest"
state_all
assert_state_sync

Просмотреть файл

@ -1,7 +1,7 @@
nodes,0,1,2
# Node 2 is initially partitioned
connect,0,1
connect,1,2
connect,0,2
# Node 0 starts first, and begins sending messages first
periodic_one,0,110
@ -16,10 +16,73 @@ replicate,1,helloworld
periodic_all,10
dispatch_all
# Build an uncommitted suffix
replicate,1,salutonmondo
replicate,1,hallo
replicate,1,hullo
replicate,1,ahoyhoy
# Dispatch AppendEntries containing this suffix, which will be received by Node 1, making
# it committed (known by a majority of nodes), though its commit status remains unknown
periodic_all,10
dispatch_one,0
# Receive ACKs from Node 1, allowing commit to advance on Node 0
# NB: This is not strictly necessary - the suffix is retained by Node 1 and election rules
# because it _could_ be committed. Here we show that Node 0 could hear this (and then advertise
# this commit point to clients), but even if that didn't happen the decision taken locally by
# Node 1 and the final result are the same
dispatch_one,1
state_all
# Build a further suffix which is _not_ shared in time
replicate,1,z
replicate,1,zz
replicate,1,zzz
state_all
# Now the partition shifts - 0 is partitioned, but 1 and 2 can communicate
disconnect,0,1
connect,1,2
# Since 2 has been partitioned, it is likely to call an election (though it will lose)
periodic_one,2,110
dispatch_all
# Eventually, Node 1 (the only live/connected node who has the committed suffix), will call
# an election and win
periodic_one,1,110
# Node 1 asks for votes
dispatch_one,1
# Node 2 responds in support
dispatch_one,2
# Node 1 sends an initial AppendEntries probe
dispatch_one,1
# Node 2 indicates it is behind
dispatch_one,2
# Node 1 sends its committable suffix
dispatch_one,1
# Node 2 ACKs all of that, advancing commit on Node 1
dispatch_one,2
# Node 1 sends Node 2 a heartbeat indicating the current commit index
periodic_all,10
dispatch_one,1
state_all
# Node 0 rejoins the network
connect,0,1
connect,0,2
periodic_all,10
dispatch_all

Просмотреть файл

@ -0,0 +1,103 @@
nodes,0,1,2
connect,0,1
connect,1,2
connect,0,2
# Node 0 starts first, and begins sending messages first
periodic_one,0,110
dispatch_all
periodic_all,10
dispatch_all
state_all
replicate,1,helloworld
periodic_all,10
dispatch_all
replicate,1,salutonmondo
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
# Node 0 is idle/disconnected for too long, so Node 1 takes over
periodic_one,1,110
dispatch_all
periodic_all,10
dispatch_all
state_all
assert_state_sync
# NB: Node 1 i now primary in term 2. There is no primary in term 1,
# and attempting to replicate in it will produce an error
replicate,2,my world now
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
assert_state_sync
replicate,2,im in charge
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
assert_state_sync
# Node 1 is partitioned for a while
disconnect,0,1
disconnect,1,2
# While 1 was partitioned, it continued to receive transactions!
replicate,2,i think i am still in charge
replicate,2,i am going to continue like i am the primary
replicate,2,until i am told otherwise
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
# The partition lasts long enough that Node 0 starts an election
periodic_one,0,110
# It passes the election
periodic_all,10
dispatch_all
# The network heals
connect,0,1
connect,1,2
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
# Election has now succeeded, and new primary can replicate in term 3
replicate,3,look at me i am the primary now
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
assert_state_sync

Просмотреть файл

@ -0,0 +1,70 @@
nodes,0,1,2
connect,0,1
connect,1,2
connect,0,2
# Node 0 starts first, and begins sending messages first
periodic_one,0,110
dispatch_all
periodic_all,10
dispatch_all
state_all
replicate,1,hello world
periodic_all,10
dispatch_all
replicate,1,saluton mondo
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
# Node 0 is partitioned, and replicates another item
disconnect,0,1
disconnect,0,2
replicate,1,drop me 1
periodic_all,10
dispatch_all
state_all
# Node 1 calls an election, and wins
periodic_one,1,110
dispatch_all
periodic_all,10
dispatch_all #< This AppendEntries starts at a point that Node 1 would accept, but is dropped
state_all
# Node 1 replicates a new entry, resulting in the same seqno as Node 0
replicate,2,keep me 1
periodic_all,10
dispatch_all
replicate,2,keep me 2
periodic_all,10
dispatch_all
state_all
# Network heals
connect,0,1
connect,0,2
# Node 0 is brought up-to-date
periodic_all,10
dispatch_all
periodic_all,10
dispatch_all
state_all
assert_state_sync

Просмотреть файл

@ -2,7 +2,7 @@
# Licensed under the Apache 2.0 License.
import os
import sys
from random import randrange, choice
from random import randrange, choice, choices
from itertools import combinations, count
@ -10,7 +10,10 @@ def fully_connected_scenario(nodes, steps):
index = count(start=1)
step_def = {
0: lambda: "dispatch_all",
1: lambda: "periodic_all,{}".format(randrange(500)),
# Most of the time, advance by a small periodic amount. Occasionally time out long enough to trigger an election
1: lambda: "periodic_all,{}".format(
choices([randrange(20), randrange(100, 500)], weights=[10, 1])[0]
),
2: lambda: "replicate,latest,{}".format(f"hello {next(index)}"),
}
@ -19,6 +22,7 @@ def fully_connected_scenario(nodes, steps):
for first, second in combinations(range(nodes), 2):
lines.append("connect,{},{}".format(first, second))
# Get past the initial election
lines.append("periodic_one,0,110")
lines.append("dispatch_all")
@ -30,6 +34,22 @@ def fully_connected_scenario(nodes, steps):
lines.append("state_all")
# Allow the network to reconcile, and assert it reaches a stable state
# It is likely this scenario has resulted in a lot of elections and very little commit advancement.
# To reach a stable state, we need to give each node a chance to win an election and share their state.
# In a real network, we expect this to arise from the randomised election timeouts, and it is sufficient
# for one of a quorum of nodes to win and share their state. This exhaustive approach ensures convergence,
# even in the pessimal case.
for node in range(nodes):
lines.append(f"periodic_one,{node},100")
lines.append("dispatch_all")
lines.append("periodic_all,10")
lines.append("dispatch_all")
lines.append("state_all")
lines.append("assert_state_sync")
return "\n".join(lines)

Просмотреть файл

@ -58,7 +58,7 @@ if __name__ == "__main__":
parser.add_argument("driver", type=str, help="Path to raft_driver binary")
parser.add_argument("--gen-scenarios", action="store_true")
parser.add_argument("files", nargs="+", type=str, help="Path to scenario files")
parser.add_argument("files", nargs="*", type=str, help="Path to scenario files")
args = parser.parse_args()