зеркало из https://github.com/microsoft/CCF.git
Join from an existing snapshot (#1532)
This commit is contained in:
Родитель
136f64e598
Коммит
e42b511983
|
@ -535,6 +535,23 @@ if(BUILD_TESTS)
|
|||
4000
|
||||
)
|
||||
|
||||
add_e2e_test(
|
||||
NAME snapshots_test_suite
|
||||
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
|
||||
CONSENSUS raft
|
||||
LABEL suite
|
||||
ADDITIONAL_ARGS
|
||||
--test-duration
|
||||
150
|
||||
--enforce-reqs
|
||||
--test-suite
|
||||
snapshots
|
||||
--raft-election-timeout
|
||||
4000
|
||||
--snapshot-tx-interval
|
||||
5
|
||||
)
|
||||
|
||||
add_e2e_test(
|
||||
NAME full_test_suite
|
||||
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
|
||||
|
@ -571,13 +588,14 @@ if(BUILD_TESTS)
|
|||
NAME reconfiguration_test
|
||||
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py
|
||||
CONSENSUS raft
|
||||
ADDITIONAL_ARGS --raft-election-timeout 4000
|
||||
)
|
||||
|
||||
add_e2e_test(
|
||||
NAME reconfiguration_snapshot_test
|
||||
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py
|
||||
CONSENSUS raft
|
||||
ADDITIONAL_ARGS --snapshot-tx-interval 10
|
||||
ADDITIONAL_ARGS --snapshot-tx-interval 10 --raft-election-timeout 4000
|
||||
)
|
||||
|
||||
add_e2e_test(
|
||||
|
|
|
@ -114,5 +114,15 @@ namespace consensus
|
|||
{
|
||||
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_commit, to_host, idx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialise ledger at a given index (e.g. after a snapshot)
|
||||
*
|
||||
* @param idx Index to start ledger from
|
||||
*/
|
||||
void init(Index idx)
|
||||
{
|
||||
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_init, to_host, idx);
|
||||
}
|
||||
};
|
||||
}
|
|
@ -28,6 +28,7 @@ namespace consensus
|
|||
DEFINE_RINGBUFFER_MSG_TYPE(ledger_append),
|
||||
DEFINE_RINGBUFFER_MSG_TYPE(ledger_truncate),
|
||||
DEFINE_RINGBUFFER_MSG_TYPE(ledger_commit),
|
||||
DEFINE_RINGBUFFER_MSG_TYPE(ledger_init),
|
||||
|
||||
/// Create a new snapshot. Enclave -> Host
|
||||
DEFINE_RINGBUFFER_MSG_TYPE(ledger_snapshot),
|
||||
|
@ -45,6 +46,7 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
|
|||
consensus::ledger_no_entry,
|
||||
consensus::Index,
|
||||
consensus::LedgerRequestPurpose);
|
||||
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_init, consensus::Index);
|
||||
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
|
||||
consensus::ledger_append,
|
||||
bool /* committable */,
|
||||
|
|
|
@ -28,7 +28,7 @@ namespace raft
|
|||
std::vector<Index> terms;
|
||||
|
||||
public:
|
||||
static constexpr Term InvalidTerm = 0;
|
||||
static constexpr Term InvalidTerm = ccf::VIEW_UNKNOWN;
|
||||
|
||||
void initialise(const std::vector<Index>& terms_)
|
||||
{
|
||||
|
@ -56,7 +56,9 @@ namespace raft
|
|||
}
|
||||
|
||||
for (int64_t i = terms.size(); i < term; ++i)
|
||||
{
|
||||
terms.push_back(idx);
|
||||
}
|
||||
LOG_DEBUG_FMT("Resulting terms: {}", fmt::join(terms, ", "));
|
||||
}
|
||||
|
||||
|
@ -66,7 +68,9 @@ namespace raft
|
|||
|
||||
// Indices before the index of the first term are unknown
|
||||
if (it == terms.begin())
|
||||
{
|
||||
return InvalidTerm;
|
||||
}
|
||||
|
||||
return (it - terms.begin());
|
||||
}
|
||||
|
@ -271,6 +275,23 @@ namespace raft
|
|||
become_leader();
|
||||
}
|
||||
|
||||
void init_as_follower(Index index, Term term)
|
||||
{
|
||||
// This should only be called when the node resumes from a snapshot and
|
||||
// before it has received any append entries.
|
||||
std::lock_guard<SpinLock> guard(lock);
|
||||
|
||||
last_idx = index;
|
||||
commit_idx = index;
|
||||
|
||||
term_history.update(index, term);
|
||||
|
||||
ledger->init(index);
|
||||
snapshotter->set_last_snapshot_idx(index);
|
||||
|
||||
become_follower(term);
|
||||
}
|
||||
|
||||
Index get_last_idx()
|
||||
{
|
||||
return last_idx;
|
||||
|
|
|
@ -54,6 +54,11 @@ namespace raft
|
|||
raft->force_become_leader(seqno, view, terms, commit_seqno);
|
||||
}
|
||||
|
||||
void init_as_backup(SeqNo seqno, View view) override
|
||||
{
|
||||
raft->init_as_follower(seqno, view);
|
||||
}
|
||||
|
||||
bool replicate(const kv::BatchVector& entries, View view) override
|
||||
{
|
||||
return raft->replicate(entries, view);
|
||||
|
|
|
@ -160,7 +160,7 @@ DOCTEST_TEST_CASE(
|
|||
auto rvc = get<1>(rv);
|
||||
DOCTEST_REQUIRE(rvc.term == 1);
|
||||
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
|
||||
DOCTEST_REQUIRE(rvc.last_commit_term == 0);
|
||||
DOCTEST_REQUIRE(rvc.last_commit_term == raft::TermHistory::InvalidTerm);
|
||||
|
||||
r1.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));
|
||||
|
||||
|
@ -172,7 +172,7 @@ DOCTEST_TEST_CASE(
|
|||
rvc = get<1>(rv);
|
||||
DOCTEST_REQUIRE(rvc.term == 1);
|
||||
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
|
||||
DOCTEST_REQUIRE(rvc.last_commit_term == 0);
|
||||
DOCTEST_REQUIRE(rvc.last_commit_term == raft::TermHistory::InvalidTerm);
|
||||
|
||||
r2.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));
|
||||
|
||||
|
@ -216,7 +216,7 @@ DOCTEST_TEST_CASE(
|
|||
DOCTEST_REQUIRE(aec.idx == 0);
|
||||
DOCTEST_REQUIRE(aec.term == 1);
|
||||
DOCTEST_REQUIRE(aec.prev_idx == 0);
|
||||
DOCTEST_REQUIRE(aec.prev_term == 0);
|
||||
DOCTEST_REQUIRE(aec.prev_term == raft::TermHistory::InvalidTerm);
|
||||
DOCTEST_REQUIRE(aec.leader_commit_idx == 0);
|
||||
|
||||
ae = r0.channels->sent_append_entries.front();
|
||||
|
@ -226,7 +226,7 @@ DOCTEST_TEST_CASE(
|
|||
DOCTEST_REQUIRE(aec.idx == 0);
|
||||
DOCTEST_REQUIRE(aec.term == 1);
|
||||
DOCTEST_REQUIRE(aec.prev_idx == 0);
|
||||
DOCTEST_REQUIRE(aec.prev_term == 0);
|
||||
DOCTEST_REQUIRE(aec.prev_term == raft::TermHistory::InvalidTerm);
|
||||
DOCTEST_REQUIRE(aec.leader_commit_idx == 0);
|
||||
}
|
||||
|
||||
|
@ -374,7 +374,7 @@ DOCTEST_TEST_CASE(
|
|||
DOCTEST_REQUIRE(msg.idx == 1);
|
||||
DOCTEST_REQUIRE(msg.term == 1);
|
||||
DOCTEST_REQUIRE(msg.prev_idx == 0);
|
||||
DOCTEST_REQUIRE(msg.prev_term == 0);
|
||||
DOCTEST_REQUIRE(msg.prev_term == raft::TermHistory::InvalidTerm);
|
||||
DOCTEST_REQUIRE(msg.leader_commit_idx == 0);
|
||||
}));
|
||||
|
||||
|
@ -472,7 +472,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
|
|||
DOCTEST_REQUIRE(msg.idx == 1);
|
||||
DOCTEST_REQUIRE(msg.term == 1);
|
||||
DOCTEST_REQUIRE(msg.prev_idx == 0);
|
||||
DOCTEST_REQUIRE(msg.prev_term == 0);
|
||||
DOCTEST_REQUIRE(msg.prev_term == raft::TermHistory::InvalidTerm);
|
||||
DOCTEST_REQUIRE(msg.leader_commit_idx == 0);
|
||||
}));
|
||||
|
||||
|
|
|
@ -319,7 +319,7 @@ namespace enclave
|
|||
|
||||
if (start_type == StartType::Join)
|
||||
{
|
||||
node->join({ccf_config});
|
||||
node->join(ccf_config);
|
||||
}
|
||||
else if (start_type == StartType::Recover)
|
||||
{
|
||||
|
|
|
@ -62,7 +62,9 @@ struct CCFConfig
|
|||
std::string target_port;
|
||||
std::vector<uint8_t> network_cert;
|
||||
size_t join_timer;
|
||||
MSGPACK_DEFINE(target_host, target_port, network_cert, join_timer);
|
||||
std::vector<uint8_t> snapshot;
|
||||
MSGPACK_DEFINE(
|
||||
target_host, target_port, network_cert, join_timer, snapshot);
|
||||
};
|
||||
Joining joining = {};
|
||||
|
||||
|
|
|
@ -647,6 +647,11 @@ namespace asynchost
|
|||
|
||||
Ledger(const Ledger& that) = delete;
|
||||
|
||||
void init_idx(size_t idx)
|
||||
{
|
||||
last_idx = idx;
|
||||
}
|
||||
|
||||
std::optional<std::vector<uint8_t>> read_entry(size_t idx)
|
||||
{
|
||||
auto f = get_file_from_idx(idx);
|
||||
|
@ -796,6 +801,12 @@ namespace asynchost
|
|||
void register_message_handlers(
|
||||
messaging::Dispatcher<ringbuffer::Message>& disp)
|
||||
{
|
||||
DISPATCHER_SET_MESSAGE_HANDLER(
|
||||
disp, consensus::ledger_init, [this](const uint8_t* data, size_t size) {
|
||||
auto idx = serialized::read<consensus::Index>(data, size);
|
||||
init_idx(idx);
|
||||
});
|
||||
|
||||
DISPATCHER_SET_MESSAGE_HANDLER(
|
||||
disp,
|
||||
consensus::ledger_append,
|
||||
|
|
|
@ -134,7 +134,11 @@ int main(int argc, char** argv)
|
|||
"--rpc-address)");
|
||||
|
||||
std::string ledger_dir("ledger");
|
||||
app.add_option("--ledger-dir", ledger_dir, "Ledger and snapshots directory")
|
||||
app.add_option("--ledger-dir", ledger_dir, "Ledger directory")
|
||||
->capture_default_str();
|
||||
|
||||
std::string snapshot_dir("snapshots");
|
||||
app.add_option("--snapshot-dir", snapshot_dir, "Snapshots directory")
|
||||
->capture_default_str();
|
||||
|
||||
size_t ledger_chunk_bytes = 5'000'000;
|
||||
|
@ -354,7 +358,7 @@ int main(int argc, char** argv)
|
|||
"key)")
|
||||
->required();
|
||||
|
||||
std::optional<size_t> recovery_threshold;
|
||||
std::optional<size_t> recovery_threshold = std::nullopt;
|
||||
start
|
||||
->add_option(
|
||||
"--recovery-threshold",
|
||||
|
@ -438,7 +442,8 @@ int main(int argc, char** argv)
|
|||
if ((*start || *join) && files::exists(ledger_dir))
|
||||
{
|
||||
throw std::logic_error(fmt::format(
|
||||
"On start/join, ledger directory should not exist ({})", ledger_dir));
|
||||
"On start and join, ledger directory should not exist ({})",
|
||||
ledger_dir));
|
||||
}
|
||||
else if (*recover && !files::exists(ledger_dir))
|
||||
{
|
||||
|
@ -544,8 +549,8 @@ int main(int argc, char** argv)
|
|||
asynchost::Ledger ledger(ledger_dir, writer_factory, ledger_chunk_bytes);
|
||||
ledger.register_message_handlers(bp.get_dispatcher());
|
||||
|
||||
asynchost::SnapshotManager snapshot(ledger_dir);
|
||||
snapshot.register_message_handlers(bp.get_dispatcher());
|
||||
asynchost::SnapshotManager snapshots(snapshot_dir);
|
||||
snapshots.register_message_handlers(bp.get_dispatcher());
|
||||
|
||||
// Begin listening for node-to-node and RPC messages.
|
||||
// This includes DNS resolution and potentially dynamic port assignment (if
|
||||
|
@ -634,6 +639,21 @@ int main(int argc, char** argv)
|
|||
ccf_config.joining.target_port = target_rpc_address.port;
|
||||
ccf_config.joining.network_cert = files::slurp(network_cert_file);
|
||||
ccf_config.joining.join_timer = join_timer;
|
||||
|
||||
auto snapshot_file = snapshots.find_latest_snapshot();
|
||||
if (snapshot_file.has_value())
|
||||
{
|
||||
ccf_config.joining.snapshot = files::slurp(snapshot_file.value());
|
||||
LOG_INFO_FMT(
|
||||
"Found latest snapshot file: {} (size: {})",
|
||||
snapshot_file.value(),
|
||||
ccf_config.joining.snapshot.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
"No snapshot found, node will request transactions from the beginning");
|
||||
}
|
||||
}
|
||||
else if (*recover)
|
||||
{
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <filesystem>
|
||||
#include <fstream>
|
||||
#include <iostream>
|
||||
#include <optional>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
|
@ -49,7 +50,7 @@ namespace asynchost
|
|||
if (fs::is_directory(snapshot_dir))
|
||||
{
|
||||
LOG_INFO_FMT(
|
||||
"Snapshots will be stored in existing directory {}", snapshot_dir);
|
||||
"Snapshots will be stored in existing directory: {}", snapshot_dir);
|
||||
}
|
||||
else if (!fs::create_directory(snapshot_dir))
|
||||
{
|
||||
|
@ -58,6 +59,36 @@ namespace asynchost
|
|||
}
|
||||
}
|
||||
|
||||
std::optional<std::string> find_latest_snapshot()
|
||||
{
|
||||
std::optional<std::string> snapshot_file = std::nullopt;
|
||||
size_t latest_idx = 0;
|
||||
|
||||
for (auto& f : fs::directory_iterator(snapshot_dir))
|
||||
{
|
||||
auto file_name = f.path().filename().string();
|
||||
auto pos = file_name.find(fmt::format("{}.", snapshot_file_prefix));
|
||||
if (pos == std::string::npos)
|
||||
{
|
||||
LOG_FAIL_FMT(
|
||||
"Ignoring \"{}\" because it does not start with {}",
|
||||
file_name,
|
||||
snapshot_file_prefix);
|
||||
continue;
|
||||
}
|
||||
|
||||
pos = file_name.find(".");
|
||||
size_t snapshot_idx = std::stol(file_name.substr(pos + 1));
|
||||
if (snapshot_idx > latest_idx)
|
||||
{
|
||||
snapshot_file = f.path().string();
|
||||
latest_idx = snapshot_idx;
|
||||
}
|
||||
}
|
||||
|
||||
return snapshot_file;
|
||||
}
|
||||
|
||||
void register_message_handlers(
|
||||
messaging::Dispatcher<ringbuffer::Message>& disp)
|
||||
{
|
||||
|
|
|
@ -266,6 +266,11 @@ namespace kv
|
|||
state = Primary;
|
||||
}
|
||||
|
||||
virtual void init_as_backup(SeqNo, View)
|
||||
{
|
||||
state = Backup;
|
||||
}
|
||||
|
||||
virtual bool replicate(const BatchVector& entries, View view) = 0;
|
||||
virtual std::pair<View, SeqNo> get_committed_txid() = 0;
|
||||
|
||||
|
|
|
@ -19,12 +19,4 @@ namespace ccf
|
|||
tls::Pem network_enc_pubk;
|
||||
};
|
||||
};
|
||||
|
||||
struct Join
|
||||
{
|
||||
struct In
|
||||
{
|
||||
CCFConfig config;
|
||||
};
|
||||
};
|
||||
}
|
||||
|
|
|
@ -633,18 +633,21 @@ namespace ccf
|
|||
{
|
||||
auto txid = store.next_txid();
|
||||
auto commit_txid = consensus->get_committed_txid();
|
||||
|
||||
LOG_DEBUG_FMT(
|
||||
"Signed at {} in view: {} commit was: {}.{}",
|
||||
txid.version,
|
||||
txid.term,
|
||||
commit_txid.first,
|
||||
commit_txid.second);
|
||||
|
||||
store.commit(
|
||||
txid,
|
||||
[txid, commit_txid, this]() {
|
||||
kv::Tx sig(txid.version);
|
||||
auto sig_view = sig.get_view(signatures);
|
||||
crypto::Sha256Hash root = replicated_state_tree.get_root();
|
||||
|
||||
Signature sig_value(
|
||||
id,
|
||||
txid.version,
|
||||
|
@ -654,6 +657,7 @@ namespace ccf
|
|||
root,
|
||||
kp.sign_hash(root.h.data(), root.h.size()),
|
||||
replicated_state_tree.serialise());
|
||||
|
||||
sig_view->put(0, sig_value);
|
||||
return sig.commit_reserved();
|
||||
},
|
||||
|
|
|
@ -292,7 +292,7 @@ namespace ccf
|
|||
|
||||
accept_network_tls_connections(args.config);
|
||||
|
||||
reset_quote();
|
||||
reset_data(quote);
|
||||
sm.advance(State::partOfNetwork);
|
||||
|
||||
return Success<CreateNew::Out>(
|
||||
|
@ -350,10 +350,9 @@ namespace ccf
|
|||
//
|
||||
// funcs in state "pending"
|
||||
//
|
||||
void initiate_join(const Join::In& args)
|
||||
void initiate_join(CCFConfig& config)
|
||||
{
|
||||
auto network_ca =
|
||||
std::make_shared<tls::CA>(args.config.joining.network_cert);
|
||||
auto network_ca = std::make_shared<tls::CA>(config.joining.network_cert);
|
||||
auto join_client_cert = std::make_unique<tls::Cert>(
|
||||
network_ca, node_cert, node_sign_kp->private_key_pem());
|
||||
|
||||
|
@ -362,9 +361,9 @@ namespace ccf
|
|||
rpcsessions->create_client(std::move(join_client_cert));
|
||||
|
||||
join_client->connect(
|
||||
args.config.joining.target_host,
|
||||
args.config.joining.target_port,
|
||||
[this, args](
|
||||
config.joining.target_host,
|
||||
config.joining.target_port,
|
||||
[this, &config](
|
||||
http_status status, http::HeaderMap&&, std::vector<uint8_t>&& data) {
|
||||
std::lock_guard<SpinLock> guard(lock);
|
||||
if (!sm.check(State::pending))
|
||||
|
@ -426,9 +425,44 @@ namespace ccf
|
|||
setup_consensus(resp.network_info.public_only);
|
||||
setup_history();
|
||||
|
||||
if (!config.joining.snapshot.empty())
|
||||
{
|
||||
// It is only possible to deserialise the snapshot then, once the
|
||||
// ledger secrets have been passed in by the network
|
||||
LOG_DEBUG_FMT(
|
||||
"Deserialising snapshot ({})", config.joining.snapshot.size());
|
||||
auto rc =
|
||||
network.tables->deserialise_snapshot(config.joining.snapshot);
|
||||
|
||||
if (rc != kv::DeserialiseSuccess::PASS)
|
||||
{
|
||||
throw std::logic_error(
|
||||
fmt::format("Failed to apply snapshot: {}", rc));
|
||||
}
|
||||
|
||||
kv::ReadOnlyTx tx;
|
||||
auto sig_view = tx.get_read_only_view(network.signatures);
|
||||
auto sig = sig_view->get(0);
|
||||
if (!sig.has_value())
|
||||
{
|
||||
throw std::logic_error(
|
||||
fmt::format("No signatures found after applying snapshot"));
|
||||
}
|
||||
|
||||
auto seqno = network.tables->current_version();
|
||||
consensus->init_as_backup(seqno, sig->view);
|
||||
|
||||
reset_data(config.joining.snapshot);
|
||||
LOG_INFO_FMT(
|
||||
"Joiner successfully resumed from snapshot at seqno {} and "
|
||||
"view {}",
|
||||
seqno,
|
||||
sig->view);
|
||||
}
|
||||
|
||||
open_member_frontend();
|
||||
|
||||
accept_network_tls_connections(args.config);
|
||||
accept_network_tls_connections(config);
|
||||
|
||||
if (resp.network_info.public_only)
|
||||
{
|
||||
|
@ -439,7 +473,7 @@ namespace ccf
|
|||
}
|
||||
else
|
||||
{
|
||||
reset_quote();
|
||||
reset_data(quote);
|
||||
sm.advance(State::partOfNetwork);
|
||||
}
|
||||
|
||||
|
@ -463,7 +497,7 @@ namespace ccf
|
|||
// Send RPC request to remote node to join the network.
|
||||
JoinNetworkNodeToNode::In join_params;
|
||||
|
||||
join_params.node_info_network = args.config.node_info_network;
|
||||
join_params.node_info_network = config.node_info_network;
|
||||
join_params.public_encryption_key =
|
||||
node_encrypt_kp->public_key_pem().raw();
|
||||
join_params.quote = quote;
|
||||
|
@ -471,8 +505,8 @@ namespace ccf
|
|||
|
||||
LOG_DEBUG_FMT(
|
||||
"Sending join request to {}:{}",
|
||||
args.config.joining.target_host,
|
||||
args.config.joining.target_port);
|
||||
config.joining.target_host,
|
||||
config.joining.target_port);
|
||||
|
||||
const auto body = serdes::pack(join_params, serdes::Pack::Text);
|
||||
|
||||
|
@ -485,19 +519,19 @@ namespace ccf
|
|||
join_client->send_request(r.build_request());
|
||||
}
|
||||
|
||||
void join(const Join::In& args)
|
||||
void join(CCFConfig& config)
|
||||
{
|
||||
std::lock_guard<SpinLock> guard(lock);
|
||||
sm.expect(State::pending);
|
||||
|
||||
initiate_join(args);
|
||||
initiate_join(config);
|
||||
|
||||
join_timer = timers.new_timer(
|
||||
std::chrono::milliseconds(args.config.joining.join_timer),
|
||||
[this, args]() {
|
||||
std::chrono::milliseconds(config.joining.join_timer),
|
||||
[this, &config]() {
|
||||
if (sm.check(State::pending))
|
||||
{
|
||||
initiate_join(args);
|
||||
initiate_join(config);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -741,7 +775,7 @@ namespace ccf
|
|||
}
|
||||
}
|
||||
|
||||
reset_quote();
|
||||
reset_data(quote);
|
||||
sm.advance(State::partOfNetwork);
|
||||
}
|
||||
|
||||
|
@ -1287,10 +1321,10 @@ namespace ccf
|
|||
.raw();
|
||||
}
|
||||
|
||||
void reset_quote()
|
||||
void reset_data(std::vector<uint8_t>& data)
|
||||
{
|
||||
quote.clear();
|
||||
quote.shrink_to_fit();
|
||||
data.clear();
|
||||
data.shrink_to_fit();
|
||||
}
|
||||
|
||||
void backup_finish_recovery()
|
||||
|
|
|
@ -51,7 +51,7 @@ namespace serdes
|
|||
{
|
||||
if (input.size() == 0)
|
||||
{
|
||||
return {};
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
if (input[0] == '{')
|
||||
|
|
|
@ -108,6 +108,24 @@ namespace ccf
|
|||
next_snapshot_indices.push_back(last_snapshot_idx);
|
||||
}
|
||||
|
||||
void set_last_snapshot_idx(consensus::Index idx)
|
||||
{
|
||||
std::lock_guard<SpinLock> guard(lock);
|
||||
|
||||
// Should only be called once, after a snapshot has been applied
|
||||
if (last_snapshot_idx != 0)
|
||||
{
|
||||
throw std::logic_error(
|
||||
"Last snapshot index can only be set if no snapshot has been "
|
||||
"generated");
|
||||
}
|
||||
|
||||
last_snapshot_idx = idx;
|
||||
|
||||
next_snapshot_indices.clear();
|
||||
next_snapshot_indices.push_back(last_snapshot_idx);
|
||||
}
|
||||
|
||||
void snapshot(consensus::Index idx)
|
||||
{
|
||||
std::lock_guard<SpinLock> guard(lock);
|
||||
|
@ -119,7 +137,7 @@ namespace ccf
|
|||
idx,
|
||||
last_snapshot_idx);
|
||||
|
||||
if (idx - last_snapshot_idx > snapshot_tx_interval)
|
||||
if (idx - last_snapshot_idx >= snapshot_tx_interval)
|
||||
{
|
||||
auto msg = std::make_unique<threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
|
||||
msg->data.self = shared_from_this();
|
||||
|
@ -140,6 +158,11 @@ namespace ccf
|
|||
{
|
||||
next_snapshot_indices.pop_front();
|
||||
}
|
||||
|
||||
if (next_snapshot_indices.empty())
|
||||
{
|
||||
next_snapshot_indices.push_back(last_snapshot_idx);
|
||||
}
|
||||
}
|
||||
|
||||
bool requires_snapshot(consensus::Index idx)
|
||||
|
|
|
@ -161,7 +161,15 @@ class Network:
|
|||
self.nodes.append(node)
|
||||
return node
|
||||
|
||||
def _add_node(self, node, lib_name, args, target_node=None, recovery=False):
|
||||
def _add_node(
|
||||
self,
|
||||
node,
|
||||
lib_name,
|
||||
args,
|
||||
target_node=None,
|
||||
recovery=False,
|
||||
from_snapshot=False,
|
||||
):
|
||||
forwarded_args = {
|
||||
arg: getattr(args, arg)
|
||||
for arg in infra.network.Network.node_args_to_forward
|
||||
|
@ -173,12 +181,23 @@ class Network:
|
|||
timeout=args.ledger_recovery_timeout if recovery else 3
|
||||
)
|
||||
|
||||
snapshot_dir = None
|
||||
if from_snapshot:
|
||||
LOG.info("Joining from snapshot")
|
||||
snapshot_dir = target_node.get_snapshots()
|
||||
# For now, we must have a snapshot to resume from when attempting
|
||||
# to join from one
|
||||
assert (
|
||||
len(os.listdir(snapshot_dir)) > 0
|
||||
), f"There are no snapshots to resume from in directory {snapshot_dir}"
|
||||
|
||||
node.join(
|
||||
lib_name=lib_name,
|
||||
workspace=args.workspace,
|
||||
label=args.label,
|
||||
common_dir=self.common_dir,
|
||||
target_rpc_address=f"{target_node.host}:{target_node.rpc_port}",
|
||||
snapshot_dir=snapshot_dir,
|
||||
**forwarded_args,
|
||||
)
|
||||
|
||||
|
@ -415,14 +434,22 @@ class Network:
|
|||
raise NodeShutdownError("Fatal error found during node shutdown")
|
||||
|
||||
def create_and_add_pending_node(
|
||||
self, lib_name, host, args, target_node=None, timeout=JOIN_TIMEOUT
|
||||
self,
|
||||
lib_name,
|
||||
host,
|
||||
args,
|
||||
target_node=None,
|
||||
from_snapshot=False,
|
||||
timeout=JOIN_TIMEOUT,
|
||||
):
|
||||
"""
|
||||
Create a new node and add it to the network. Note that the new node
|
||||
still needs to be trusted by members to complete the join protocol.
|
||||
"""
|
||||
new_node = self.create_node(host)
|
||||
self._add_node(new_node, lib_name, args, target_node)
|
||||
self._add_node(
|
||||
new_node, lib_name, args, target_node, from_snapshot=from_snapshot
|
||||
)
|
||||
primary, _ = self.find_primary()
|
||||
try:
|
||||
self.consortium.wait_for_node_to_exist_in_store(
|
||||
|
@ -450,12 +477,16 @@ class Network:
|
|||
|
||||
return new_node
|
||||
|
||||
def create_and_trust_node(self, lib_name, host, args, target_node=None):
|
||||
def create_and_trust_node(
|
||||
self, lib_name, host, args, target_node=None, from_snapshot=False
|
||||
):
|
||||
"""
|
||||
Create a new node, add it to the network and let members vote to trust
|
||||
it so that it becomes part of the consensus protocol.
|
||||
"""
|
||||
new_node = self.create_and_add_pending_node(lib_name, host, args, target_node)
|
||||
new_node = self.create_and_add_pending_node(
|
||||
lib_name, host, args, target_node, from_snapshot
|
||||
)
|
||||
|
||||
primary, _ = self.find_primary()
|
||||
try:
|
||||
|
|
|
@ -82,8 +82,7 @@ class Node:
|
|||
workspace,
|
||||
label,
|
||||
common_dir,
|
||||
None,
|
||||
members_info,
|
||||
members_info=members_info,
|
||||
**kwargs,
|
||||
)
|
||||
self.network_state = NodeNetworkState.joined
|
||||
|
@ -96,6 +95,7 @@ class Node:
|
|||
label,
|
||||
common_dir,
|
||||
target_rpc_address,
|
||||
snapshot_dir,
|
||||
**kwargs,
|
||||
):
|
||||
self._start(
|
||||
|
@ -105,7 +105,8 @@ class Node:
|
|||
workspace,
|
||||
label,
|
||||
common_dir,
|
||||
target_rpc_address,
|
||||
target_rpc_address=target_rpc_address,
|
||||
snapshot_dir=snapshot_dir,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
@ -130,6 +131,7 @@ class Node:
|
|||
label,
|
||||
common_dir,
|
||||
target_rpc_address=None,
|
||||
snapshot_dir=None,
|
||||
members_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
|
@ -161,6 +163,7 @@ class Node:
|
|||
common_dir,
|
||||
target_rpc_address,
|
||||
members_info,
|
||||
snapshot_dir,
|
||||
binary_dir=self.binary_dir,
|
||||
**kwargs,
|
||||
)
|
||||
|
@ -238,6 +241,7 @@ class Node:
|
|||
"""
|
||||
# Until the node has joined, the SSL handshake will fail as the node
|
||||
# is not yet endorsed by the network certificate
|
||||
|
||||
try:
|
||||
with self.client(connection_timeout=timeout) as nc:
|
||||
rep = nc.get("/node/commit")
|
||||
|
@ -250,6 +254,9 @@ class Node:
|
|||
def get_ledger(self):
|
||||
return self.remote.get_ledger()
|
||||
|
||||
def get_snapshots(self):
|
||||
return self.remote.get_snapshots()
|
||||
|
||||
def client(self, identity=None, **kwargs):
|
||||
akwargs = {
|
||||
"cert": os.path.join(self.common_dir, f"{identity}_cert.pem")
|
||||
|
|
|
@ -551,6 +551,7 @@ class CCFRemote(object):
|
|||
common_dir,
|
||||
target_rpc_address=None,
|
||||
members_info=None,
|
||||
snapshot_dir=None,
|
||||
join_timer=None,
|
||||
host_log_level="info",
|
||||
sig_tx_interval=5000,
|
||||
|
@ -587,6 +588,11 @@ class CCFRemote(object):
|
|||
if self.ledger_dir
|
||||
else f"{local_node_id}.ledger"
|
||||
)
|
||||
self.snapshot_dir = os.path.normpath(snapshot_dir) if snapshot_dir else None
|
||||
self.snapshot_dir_name = (
|
||||
os.path.basename(self.snapshot_dir) if self.snapshot_dir else "snapshots"
|
||||
)
|
||||
|
||||
self.common_dir = common_dir
|
||||
|
||||
exe_files = [self.BIN, lib_path] + self.DEPS
|
||||
|
@ -679,6 +685,9 @@ class CCFRemote(object):
|
|||
f"--join-timer={join_timer}",
|
||||
]
|
||||
data_files += [os.path.join(self.common_dir, "networkcert.pem")]
|
||||
|
||||
if snapshot_dir:
|
||||
data_files += [snapshot_dir]
|
||||
elif start_type == StartType.recover:
|
||||
cmd += ["recover", "--network-cert-file=networkcert.pem"]
|
||||
else:
|
||||
|
@ -757,6 +766,10 @@ class CCFRemote(object):
|
|||
self.remote.get(self.ledger_dir_name, self.common_dir)
|
||||
return os.path.join(self.common_dir, self.ledger_dir_name)
|
||||
|
||||
def get_snapshots(self):
|
||||
self.remote.get(self.snapshot_dir_name, self.common_dir)
|
||||
return os.path.join(self.common_dir, self.snapshot_dir_name)
|
||||
|
||||
def ledger_path(self):
|
||||
return os.path.join(self.remote.root, self.ledger_dir_name)
|
||||
|
||||
|
|
|
@ -43,6 +43,16 @@ def test_add_node_from_backup(network, args):
|
|||
return network
|
||||
|
||||
|
||||
@reqs.description("Adding a valid node from snapshot")
|
||||
@reqs.at_least_n_nodes(2)
|
||||
def test_add_node_from_snapshot(network, args):
|
||||
new_node = network.create_and_trust_node(
|
||||
args.package, "localhost", args, from_snapshot=True
|
||||
)
|
||||
assert new_node
|
||||
return network
|
||||
|
||||
|
||||
@reqs.description("Adding as many pending nodes as current number of nodes")
|
||||
@reqs.supports_methods("log/private")
|
||||
def test_add_as_many_pending_nodes(network, args):
|
||||
|
@ -113,6 +123,9 @@ def run(args):
|
|||
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
|
||||
) as network:
|
||||
network.start_and_join(args)
|
||||
if args.snapshot_tx_interval is not None:
|
||||
test_add_node_from_snapshot(network, args)
|
||||
|
||||
test_add_node_from_backup(network, args)
|
||||
test_add_node(network, args)
|
||||
test_add_node_untrusted_code(network, args)
|
||||
|
|
|
@ -39,7 +39,8 @@ suite_membership_recovery = [
|
|||
]
|
||||
suites["membership_recovery"] = suite_membership_recovery
|
||||
|
||||
# This suite tests that nodes addition, deletion and primary changes can be interleaved
|
||||
# This suite tests that nodes addition, deletion and primary changes
|
||||
# can be interleaved
|
||||
suite_reconfiguration = [
|
||||
reconfiguration.test_add_node,
|
||||
reconfiguration.test_retire_primary,
|
||||
|
@ -53,6 +54,21 @@ suite_reconfiguration = [
|
|||
]
|
||||
suites["reconfiguration"] = suite_reconfiguration
|
||||
|
||||
# Temporary suite while snapshotting feature is being implemented
|
||||
# https://github.com/microsoft/CCF/milestone/12
|
||||
suite_snapshots = [
|
||||
reconfiguration.test_add_node_from_snapshot,
|
||||
election.test_kill_primary,
|
||||
# The new primary has no snapshot so issue new entries
|
||||
# to generate at least one snapshot
|
||||
e2e_logging.test,
|
||||
e2e_logging.test,
|
||||
e2e_logging.test,
|
||||
e2e_logging.test,
|
||||
reconfiguration.test_add_node_from_snapshot,
|
||||
]
|
||||
suites["snapshots"] = suite_snapshots
|
||||
|
||||
all_tests_suite = [
|
||||
# e2e_logging:
|
||||
e2e_logging.test,
|
||||
|
|
Загрузка…
Ссылка в новой задаче