diff --git a/CMakeLists.txt b/CMakeLists.txt index 68e37ae709..6ca7180800 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -535,6 +535,23 @@ if(BUILD_TESTS) 4000 ) + add_e2e_test( + NAME snapshots_test_suite + PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py + CONSENSUS raft + LABEL suite + ADDITIONAL_ARGS + --test-duration + 150 + --enforce-reqs + --test-suite + snapshots + --raft-election-timeout + 4000 + --snapshot-tx-interval + 5 + ) + add_e2e_test( NAME full_test_suite PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py @@ -571,13 +588,14 @@ if(BUILD_TESTS) NAME reconfiguration_test PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py CONSENSUS raft + ADDITIONAL_ARGS --raft-election-timeout 4000 ) add_e2e_test( NAME reconfiguration_snapshot_test PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py CONSENSUS raft - ADDITIONAL_ARGS --snapshot-tx-interval 10 + ADDITIONAL_ARGS --snapshot-tx-interval 10 --raft-election-timeout 4000 ) add_e2e_test( diff --git a/src/consensus/ledger_enclave.h b/src/consensus/ledger_enclave.h index 90c887d487..0a7511ff37 100644 --- a/src/consensus/ledger_enclave.h +++ b/src/consensus/ledger_enclave.h @@ -114,5 +114,15 @@ namespace consensus { RINGBUFFER_WRITE_MESSAGE(consensus::ledger_commit, to_host, idx); } + + /** + * Initialise ledger at a given index (e.g. after a snapshot) + * + * @param idx Index to start ledger from + */ + void init(Index idx) + { + RINGBUFFER_WRITE_MESSAGE(consensus::ledger_init, to_host, idx); + } }; } \ No newline at end of file diff --git a/src/consensus/ledger_enclave_types.h b/src/consensus/ledger_enclave_types.h index 9d5b5d3363..3e72291345 100644 --- a/src/consensus/ledger_enclave_types.h +++ b/src/consensus/ledger_enclave_types.h @@ -28,6 +28,7 @@ namespace consensus DEFINE_RINGBUFFER_MSG_TYPE(ledger_append), DEFINE_RINGBUFFER_MSG_TYPE(ledger_truncate), DEFINE_RINGBUFFER_MSG_TYPE(ledger_commit), + DEFINE_RINGBUFFER_MSG_TYPE(ledger_init), /// Create a new snapshot. Enclave -> Host DEFINE_RINGBUFFER_MSG_TYPE(ledger_snapshot), @@ -45,6 +46,7 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( consensus::ledger_no_entry, consensus::Index, consensus::LedgerRequestPurpose); +DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_init, consensus::Index); DECLARE_RINGBUFFER_MESSAGE_PAYLOAD( consensus::ledger_append, bool /* committable */, diff --git a/src/consensus/raft/raft.h b/src/consensus/raft/raft.h index 45eaeb1107..714f54b46f 100644 --- a/src/consensus/raft/raft.h +++ b/src/consensus/raft/raft.h @@ -28,7 +28,7 @@ namespace raft std::vector terms; public: - static constexpr Term InvalidTerm = 0; + static constexpr Term InvalidTerm = ccf::VIEW_UNKNOWN; void initialise(const std::vector& terms_) { @@ -56,7 +56,9 @@ namespace raft } for (int64_t i = terms.size(); i < term; ++i) + { terms.push_back(idx); + } LOG_DEBUG_FMT("Resulting terms: {}", fmt::join(terms, ", ")); } @@ -66,7 +68,9 @@ namespace raft // Indices before the index of the first term are unknown if (it == terms.begin()) + { return InvalidTerm; + } return (it - terms.begin()); } @@ -271,6 +275,23 @@ namespace raft become_leader(); } + void init_as_follower(Index index, Term term) + { + // This should only be called when the node resumes from a snapshot and + // before it has received any append entries. + std::lock_guard guard(lock); + + last_idx = index; + commit_idx = index; + + term_history.update(index, term); + + ledger->init(index); + snapshotter->set_last_snapshot_idx(index); + + become_follower(term); + } + Index get_last_idx() { return last_idx; diff --git a/src/consensus/raft/raft_consensus.h b/src/consensus/raft/raft_consensus.h index 0822db7497..4fcb28a82f 100644 --- a/src/consensus/raft/raft_consensus.h +++ b/src/consensus/raft/raft_consensus.h @@ -54,6 +54,11 @@ namespace raft raft->force_become_leader(seqno, view, terms, commit_seqno); } + void init_as_backup(SeqNo seqno, View view) override + { + raft->init_as_follower(seqno, view); + } + bool replicate(const kv::BatchVector& entries, View view) override { return raft->replicate(entries, view); diff --git a/src/consensus/raft/test/main.cpp b/src/consensus/raft/test/main.cpp index 5cd736203b..0d48cbfce7 100644 --- a/src/consensus/raft/test/main.cpp +++ b/src/consensus/raft/test/main.cpp @@ -160,7 +160,7 @@ DOCTEST_TEST_CASE( auto rvc = get<1>(rv); DOCTEST_REQUIRE(rvc.term == 1); DOCTEST_REQUIRE(rvc.last_commit_idx == 0); - DOCTEST_REQUIRE(rvc.last_commit_term == 0); + DOCTEST_REQUIRE(rvc.last_commit_term == raft::TermHistory::InvalidTerm); r1.recv_message(reinterpret_cast(&rvc), sizeof(rvc)); @@ -172,7 +172,7 @@ DOCTEST_TEST_CASE( rvc = get<1>(rv); DOCTEST_REQUIRE(rvc.term == 1); DOCTEST_REQUIRE(rvc.last_commit_idx == 0); - DOCTEST_REQUIRE(rvc.last_commit_term == 0); + DOCTEST_REQUIRE(rvc.last_commit_term == raft::TermHistory::InvalidTerm); r2.recv_message(reinterpret_cast(&rvc), sizeof(rvc)); @@ -216,7 +216,7 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(aec.idx == 0); DOCTEST_REQUIRE(aec.term == 1); DOCTEST_REQUIRE(aec.prev_idx == 0); - DOCTEST_REQUIRE(aec.prev_term == 0); + DOCTEST_REQUIRE(aec.prev_term == raft::TermHistory::InvalidTerm); DOCTEST_REQUIRE(aec.leader_commit_idx == 0); ae = r0.channels->sent_append_entries.front(); @@ -226,7 +226,7 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(aec.idx == 0); DOCTEST_REQUIRE(aec.term == 1); DOCTEST_REQUIRE(aec.prev_idx == 0); - DOCTEST_REQUIRE(aec.prev_term == 0); + DOCTEST_REQUIRE(aec.prev_term == raft::TermHistory::InvalidTerm); DOCTEST_REQUIRE(aec.leader_commit_idx == 0); } @@ -374,7 +374,7 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(msg.idx == 1); DOCTEST_REQUIRE(msg.term == 1); DOCTEST_REQUIRE(msg.prev_idx == 0); - DOCTEST_REQUIRE(msg.prev_term == 0); + DOCTEST_REQUIRE(msg.prev_term == raft::TermHistory::InvalidTerm); DOCTEST_REQUIRE(msg.leader_commit_idx == 0); })); @@ -472,7 +472,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple")) DOCTEST_REQUIRE(msg.idx == 1); DOCTEST_REQUIRE(msg.term == 1); DOCTEST_REQUIRE(msg.prev_idx == 0); - DOCTEST_REQUIRE(msg.prev_term == 0); + DOCTEST_REQUIRE(msg.prev_term == raft::TermHistory::InvalidTerm); DOCTEST_REQUIRE(msg.leader_commit_idx == 0); })); diff --git a/src/enclave/enclave.h b/src/enclave/enclave.h index cf9c657d99..b33650ee47 100644 --- a/src/enclave/enclave.h +++ b/src/enclave/enclave.h @@ -319,7 +319,7 @@ namespace enclave if (start_type == StartType::Join) { - node->join({ccf_config}); + node->join(ccf_config); } else if (start_type == StartType::Recover) { diff --git a/src/enclave/interface.h b/src/enclave/interface.h index 12366c96e0..6c46de4f55 100644 --- a/src/enclave/interface.h +++ b/src/enclave/interface.h @@ -62,7 +62,9 @@ struct CCFConfig std::string target_port; std::vector network_cert; size_t join_timer; - MSGPACK_DEFINE(target_host, target_port, network_cert, join_timer); + std::vector snapshot; + MSGPACK_DEFINE( + target_host, target_port, network_cert, join_timer, snapshot); }; Joining joining = {}; diff --git a/src/host/ledger.h b/src/host/ledger.h index 28d4a15a26..2461c51a85 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -647,6 +647,11 @@ namespace asynchost Ledger(const Ledger& that) = delete; + void init_idx(size_t idx) + { + last_idx = idx; + } + std::optional> read_entry(size_t idx) { auto f = get_file_from_idx(idx); @@ -796,6 +801,12 @@ namespace asynchost void register_message_handlers( messaging::Dispatcher& disp) { + DISPATCHER_SET_MESSAGE_HANDLER( + disp, consensus::ledger_init, [this](const uint8_t* data, size_t size) { + auto idx = serialized::read(data, size); + init_idx(idx); + }); + DISPATCHER_SET_MESSAGE_HANDLER( disp, consensus::ledger_append, diff --git a/src/host/main.cpp b/src/host/main.cpp index 539ff95e42..7e5a169fab 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -134,7 +134,11 @@ int main(int argc, char** argv) "--rpc-address)"); std::string ledger_dir("ledger"); - app.add_option("--ledger-dir", ledger_dir, "Ledger and snapshots directory") + app.add_option("--ledger-dir", ledger_dir, "Ledger directory") + ->capture_default_str(); + + std::string snapshot_dir("snapshots"); + app.add_option("--snapshot-dir", snapshot_dir, "Snapshots directory") ->capture_default_str(); size_t ledger_chunk_bytes = 5'000'000; @@ -354,7 +358,7 @@ int main(int argc, char** argv) "key)") ->required(); - std::optional recovery_threshold; + std::optional recovery_threshold = std::nullopt; start ->add_option( "--recovery-threshold", @@ -438,7 +442,8 @@ int main(int argc, char** argv) if ((*start || *join) && files::exists(ledger_dir)) { throw std::logic_error(fmt::format( - "On start/join, ledger directory should not exist ({})", ledger_dir)); + "On start and join, ledger directory should not exist ({})", + ledger_dir)); } else if (*recover && !files::exists(ledger_dir)) { @@ -544,8 +549,8 @@ int main(int argc, char** argv) asynchost::Ledger ledger(ledger_dir, writer_factory, ledger_chunk_bytes); ledger.register_message_handlers(bp.get_dispatcher()); - asynchost::SnapshotManager snapshot(ledger_dir); - snapshot.register_message_handlers(bp.get_dispatcher()); + asynchost::SnapshotManager snapshots(snapshot_dir); + snapshots.register_message_handlers(bp.get_dispatcher()); // Begin listening for node-to-node and RPC messages. // This includes DNS resolution and potentially dynamic port assignment (if @@ -634,6 +639,21 @@ int main(int argc, char** argv) ccf_config.joining.target_port = target_rpc_address.port; ccf_config.joining.network_cert = files::slurp(network_cert_file); ccf_config.joining.join_timer = join_timer; + + auto snapshot_file = snapshots.find_latest_snapshot(); + if (snapshot_file.has_value()) + { + ccf_config.joining.snapshot = files::slurp(snapshot_file.value()); + LOG_INFO_FMT( + "Found latest snapshot file: {} (size: {})", + snapshot_file.value(), + ccf_config.joining.snapshot.size()); + } + else + { + LOG_INFO_FMT( + "No snapshot found, node will request transactions from the beginning"); + } } else if (*recover) { diff --git a/src/host/snapshot.h b/src/host/snapshot.h index 7303331c86..dadd60e487 100644 --- a/src/host/snapshot.h +++ b/src/host/snapshot.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace fs = std::filesystem; @@ -49,7 +50,7 @@ namespace asynchost if (fs::is_directory(snapshot_dir)) { LOG_INFO_FMT( - "Snapshots will be stored in existing directory {}", snapshot_dir); + "Snapshots will be stored in existing directory: {}", snapshot_dir); } else if (!fs::create_directory(snapshot_dir)) { @@ -58,6 +59,36 @@ namespace asynchost } } + std::optional find_latest_snapshot() + { + std::optional snapshot_file = std::nullopt; + size_t latest_idx = 0; + + for (auto& f : fs::directory_iterator(snapshot_dir)) + { + auto file_name = f.path().filename().string(); + auto pos = file_name.find(fmt::format("{}.", snapshot_file_prefix)); + if (pos == std::string::npos) + { + LOG_FAIL_FMT( + "Ignoring \"{}\" because it does not start with {}", + file_name, + snapshot_file_prefix); + continue; + } + + pos = file_name.find("."); + size_t snapshot_idx = std::stol(file_name.substr(pos + 1)); + if (snapshot_idx > latest_idx) + { + snapshot_file = f.path().string(); + latest_idx = snapshot_idx; + } + } + + return snapshot_file; + } + void register_message_handlers( messaging::Dispatcher& disp) { @@ -71,4 +102,4 @@ namespace asynchost }); } }; -} \ No newline at end of file +} diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index 5643eb4aa7..fb42cb79ce 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -266,6 +266,11 @@ namespace kv state = Primary; } + virtual void init_as_backup(SeqNo, View) + { + state = Backup; + } + virtual bool replicate(const BatchVector& entries, View view) = 0; virtual std::pair get_committed_txid() = 0; diff --git a/src/node/call_types.h b/src/node/call_types.h index 0e4424a7e4..13e48d935b 100644 --- a/src/node/call_types.h +++ b/src/node/call_types.h @@ -19,12 +19,4 @@ namespace ccf tls::Pem network_enc_pubk; }; }; - - struct Join - { - struct In - { - CCFConfig config; - }; - }; } diff --git a/src/node/history.h b/src/node/history.h index 9eda52be05..d7a6055625 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -633,18 +633,21 @@ namespace ccf { auto txid = store.next_txid(); auto commit_txid = consensus->get_committed_txid(); + LOG_DEBUG_FMT( "Signed at {} in view: {} commit was: {}.{}", txid.version, txid.term, commit_txid.first, commit_txid.second); + store.commit( txid, [txid, commit_txid, this]() { kv::Tx sig(txid.version); auto sig_view = sig.get_view(signatures); crypto::Sha256Hash root = replicated_state_tree.get_root(); + Signature sig_value( id, txid.version, @@ -654,6 +657,7 @@ namespace ccf root, kp.sign_hash(root.h.data(), root.h.size()), replicated_state_tree.serialise()); + sig_view->put(0, sig_value); return sig.commit_reserved(); }, diff --git a/src/node/node_state.h b/src/node/node_state.h index 5c91ae7a10..36ad3b3aa9 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -292,7 +292,7 @@ namespace ccf accept_network_tls_connections(args.config); - reset_quote(); + reset_data(quote); sm.advance(State::partOfNetwork); return Success( @@ -350,10 +350,9 @@ namespace ccf // // funcs in state "pending" // - void initiate_join(const Join::In& args) + void initiate_join(CCFConfig& config) { - auto network_ca = - std::make_shared(args.config.joining.network_cert); + auto network_ca = std::make_shared(config.joining.network_cert); auto join_client_cert = std::make_unique( network_ca, node_cert, node_sign_kp->private_key_pem()); @@ -362,9 +361,9 @@ namespace ccf rpcsessions->create_client(std::move(join_client_cert)); join_client->connect( - args.config.joining.target_host, - args.config.joining.target_port, - [this, args]( + config.joining.target_host, + config.joining.target_port, + [this, &config]( http_status status, http::HeaderMap&&, std::vector&& data) { std::lock_guard guard(lock); if (!sm.check(State::pending)) @@ -426,9 +425,44 @@ namespace ccf setup_consensus(resp.network_info.public_only); setup_history(); + if (!config.joining.snapshot.empty()) + { + // It is only possible to deserialise the snapshot then, once the + // ledger secrets have been passed in by the network + LOG_DEBUG_FMT( + "Deserialising snapshot ({})", config.joining.snapshot.size()); + auto rc = + network.tables->deserialise_snapshot(config.joining.snapshot); + + if (rc != kv::DeserialiseSuccess::PASS) + { + throw std::logic_error( + fmt::format("Failed to apply snapshot: {}", rc)); + } + + kv::ReadOnlyTx tx; + auto sig_view = tx.get_read_only_view(network.signatures); + auto sig = sig_view->get(0); + if (!sig.has_value()) + { + throw std::logic_error( + fmt::format("No signatures found after applying snapshot")); + } + + auto seqno = network.tables->current_version(); + consensus->init_as_backup(seqno, sig->view); + + reset_data(config.joining.snapshot); + LOG_INFO_FMT( + "Joiner successfully resumed from snapshot at seqno {} and " + "view {}", + seqno, + sig->view); + } + open_member_frontend(); - accept_network_tls_connections(args.config); + accept_network_tls_connections(config); if (resp.network_info.public_only) { @@ -439,7 +473,7 @@ namespace ccf } else { - reset_quote(); + reset_data(quote); sm.advance(State::partOfNetwork); } @@ -463,7 +497,7 @@ namespace ccf // Send RPC request to remote node to join the network. JoinNetworkNodeToNode::In join_params; - join_params.node_info_network = args.config.node_info_network; + join_params.node_info_network = config.node_info_network; join_params.public_encryption_key = node_encrypt_kp->public_key_pem().raw(); join_params.quote = quote; @@ -471,8 +505,8 @@ namespace ccf LOG_DEBUG_FMT( "Sending join request to {}:{}", - args.config.joining.target_host, - args.config.joining.target_port); + config.joining.target_host, + config.joining.target_port); const auto body = serdes::pack(join_params, serdes::Pack::Text); @@ -485,19 +519,19 @@ namespace ccf join_client->send_request(r.build_request()); } - void join(const Join::In& args) + void join(CCFConfig& config) { std::lock_guard guard(lock); sm.expect(State::pending); - initiate_join(args); + initiate_join(config); join_timer = timers.new_timer( - std::chrono::milliseconds(args.config.joining.join_timer), - [this, args]() { + std::chrono::milliseconds(config.joining.join_timer), + [this, &config]() { if (sm.check(State::pending)) { - initiate_join(args); + initiate_join(config); return true; } return false; @@ -741,7 +775,7 @@ namespace ccf } } - reset_quote(); + reset_data(quote); sm.advance(State::partOfNetwork); } @@ -1287,10 +1321,10 @@ namespace ccf .raw(); } - void reset_quote() + void reset_data(std::vector& data) { - quote.clear(); - quote.shrink_to_fit(); + data.clear(); + data.shrink_to_fit(); } void backup_finish_recovery() diff --git a/src/node/rpc/serdes.h b/src/node/rpc/serdes.h index f45c0fdfd5..a2c8c7f89d 100644 --- a/src/node/rpc/serdes.h +++ b/src/node/rpc/serdes.h @@ -51,7 +51,7 @@ namespace serdes { if (input.size() == 0) { - return {}; + return std::nullopt; } if (input[0] == '{') diff --git a/src/node/snapshotter.h b/src/node/snapshotter.h index 705464001f..c437e9b57e 100644 --- a/src/node/snapshotter.h +++ b/src/node/snapshotter.h @@ -108,6 +108,24 @@ namespace ccf next_snapshot_indices.push_back(last_snapshot_idx); } + void set_last_snapshot_idx(consensus::Index idx) + { + std::lock_guard guard(lock); + + // Should only be called once, after a snapshot has been applied + if (last_snapshot_idx != 0) + { + throw std::logic_error( + "Last snapshot index can only be set if no snapshot has been " + "generated"); + } + + last_snapshot_idx = idx; + + next_snapshot_indices.clear(); + next_snapshot_indices.push_back(last_snapshot_idx); + } + void snapshot(consensus::Index idx) { std::lock_guard guard(lock); @@ -119,7 +137,7 @@ namespace ccf idx, last_snapshot_idx); - if (idx - last_snapshot_idx > snapshot_tx_interval) + if (idx - last_snapshot_idx >= snapshot_tx_interval) { auto msg = std::make_unique>(&snapshot_cb); msg->data.self = shared_from_this(); @@ -140,6 +158,11 @@ namespace ccf { next_snapshot_indices.pop_front(); } + + if (next_snapshot_indices.empty()) + { + next_snapshot_indices.push_back(last_snapshot_idx); + } } bool requires_snapshot(consensus::Index idx) diff --git a/tests/infra/network.py b/tests/infra/network.py index 5750ca7381..caa27d3c7c 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -161,7 +161,15 @@ class Network: self.nodes.append(node) return node - def _add_node(self, node, lib_name, args, target_node=None, recovery=False): + def _add_node( + self, + node, + lib_name, + args, + target_node=None, + recovery=False, + from_snapshot=False, + ): forwarded_args = { arg: getattr(args, arg) for arg in infra.network.Network.node_args_to_forward @@ -173,12 +181,23 @@ class Network: timeout=args.ledger_recovery_timeout if recovery else 3 ) + snapshot_dir = None + if from_snapshot: + LOG.info("Joining from snapshot") + snapshot_dir = target_node.get_snapshots() + # For now, we must have a snapshot to resume from when attempting + # to join from one + assert ( + len(os.listdir(snapshot_dir)) > 0 + ), f"There are no snapshots to resume from in directory {snapshot_dir}" + node.join( lib_name=lib_name, workspace=args.workspace, label=args.label, common_dir=self.common_dir, target_rpc_address=f"{target_node.host}:{target_node.rpc_port}", + snapshot_dir=snapshot_dir, **forwarded_args, ) @@ -415,14 +434,22 @@ class Network: raise NodeShutdownError("Fatal error found during node shutdown") def create_and_add_pending_node( - self, lib_name, host, args, target_node=None, timeout=JOIN_TIMEOUT + self, + lib_name, + host, + args, + target_node=None, + from_snapshot=False, + timeout=JOIN_TIMEOUT, ): """ Create a new node and add it to the network. Note that the new node still needs to be trusted by members to complete the join protocol. """ new_node = self.create_node(host) - self._add_node(new_node, lib_name, args, target_node) + self._add_node( + new_node, lib_name, args, target_node, from_snapshot=from_snapshot + ) primary, _ = self.find_primary() try: self.consortium.wait_for_node_to_exist_in_store( @@ -450,12 +477,16 @@ class Network: return new_node - def create_and_trust_node(self, lib_name, host, args, target_node=None): + def create_and_trust_node( + self, lib_name, host, args, target_node=None, from_snapshot=False + ): """ Create a new node, add it to the network and let members vote to trust it so that it becomes part of the consensus protocol. """ - new_node = self.create_and_add_pending_node(lib_name, host, args, target_node) + new_node = self.create_and_add_pending_node( + lib_name, host, args, target_node, from_snapshot + ) primary, _ = self.find_primary() try: diff --git a/tests/infra/node.py b/tests/infra/node.py index dd3aa0d40e..0c90cc5e3c 100644 --- a/tests/infra/node.py +++ b/tests/infra/node.py @@ -82,8 +82,7 @@ class Node: workspace, label, common_dir, - None, - members_info, + members_info=members_info, **kwargs, ) self.network_state = NodeNetworkState.joined @@ -96,6 +95,7 @@ class Node: label, common_dir, target_rpc_address, + snapshot_dir, **kwargs, ): self._start( @@ -105,7 +105,8 @@ class Node: workspace, label, common_dir, - target_rpc_address, + target_rpc_address=target_rpc_address, + snapshot_dir=snapshot_dir, **kwargs, ) @@ -130,6 +131,7 @@ class Node: label, common_dir, target_rpc_address=None, + snapshot_dir=None, members_info=None, **kwargs, ): @@ -161,6 +163,7 @@ class Node: common_dir, target_rpc_address, members_info, + snapshot_dir, binary_dir=self.binary_dir, **kwargs, ) @@ -238,6 +241,7 @@ class Node: """ # Until the node has joined, the SSL handshake will fail as the node # is not yet endorsed by the network certificate + try: with self.client(connection_timeout=timeout) as nc: rep = nc.get("/node/commit") @@ -250,6 +254,9 @@ class Node: def get_ledger(self): return self.remote.get_ledger() + def get_snapshots(self): + return self.remote.get_snapshots() + def client(self, identity=None, **kwargs): akwargs = { "cert": os.path.join(self.common_dir, f"{identity}_cert.pem") diff --git a/tests/infra/remote.py b/tests/infra/remote.py index 5756806e4e..41092cba3f 100644 --- a/tests/infra/remote.py +++ b/tests/infra/remote.py @@ -551,6 +551,7 @@ class CCFRemote(object): common_dir, target_rpc_address=None, members_info=None, + snapshot_dir=None, join_timer=None, host_log_level="info", sig_tx_interval=5000, @@ -587,6 +588,11 @@ class CCFRemote(object): if self.ledger_dir else f"{local_node_id}.ledger" ) + self.snapshot_dir = os.path.normpath(snapshot_dir) if snapshot_dir else None + self.snapshot_dir_name = ( + os.path.basename(self.snapshot_dir) if self.snapshot_dir else "snapshots" + ) + self.common_dir = common_dir exe_files = [self.BIN, lib_path] + self.DEPS @@ -679,6 +685,9 @@ class CCFRemote(object): f"--join-timer={join_timer}", ] data_files += [os.path.join(self.common_dir, "networkcert.pem")] + + if snapshot_dir: + data_files += [snapshot_dir] elif start_type == StartType.recover: cmd += ["recover", "--network-cert-file=networkcert.pem"] else: @@ -757,6 +766,10 @@ class CCFRemote(object): self.remote.get(self.ledger_dir_name, self.common_dir) return os.path.join(self.common_dir, self.ledger_dir_name) + def get_snapshots(self): + self.remote.get(self.snapshot_dir_name, self.common_dir) + return os.path.join(self.common_dir, self.snapshot_dir_name) + def ledger_path(self): return os.path.join(self.remote.root, self.ledger_dir_name) diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index f242c25c4d..c8406af501 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -43,6 +43,16 @@ def test_add_node_from_backup(network, args): return network +@reqs.description("Adding a valid node from snapshot") +@reqs.at_least_n_nodes(2) +def test_add_node_from_snapshot(network, args): + new_node = network.create_and_trust_node( + args.package, "localhost", args, from_snapshot=True + ) + assert new_node + return network + + @reqs.description("Adding as many pending nodes as current number of nodes") @reqs.supports_methods("log/private") def test_add_as_many_pending_nodes(network, args): @@ -113,6 +123,9 @@ def run(args): hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb ) as network: network.start_and_join(args) + if args.snapshot_tx_interval is not None: + test_add_node_from_snapshot(network, args) + test_add_node_from_backup(network, args) test_add_node(network, args) test_add_node_untrusted_code(network, args) diff --git a/tests/suite/test_suite.py b/tests/suite/test_suite.py index 765b475217..8b91c4be5b 100644 --- a/tests/suite/test_suite.py +++ b/tests/suite/test_suite.py @@ -39,7 +39,8 @@ suite_membership_recovery = [ ] suites["membership_recovery"] = suite_membership_recovery -# This suite tests that nodes addition, deletion and primary changes can be interleaved +# This suite tests that nodes addition, deletion and primary changes +# can be interleaved suite_reconfiguration = [ reconfiguration.test_add_node, reconfiguration.test_retire_primary, @@ -53,6 +54,21 @@ suite_reconfiguration = [ ] suites["reconfiguration"] = suite_reconfiguration +# Temporary suite while snapshotting feature is being implemented +# https://github.com/microsoft/CCF/milestone/12 +suite_snapshots = [ + reconfiguration.test_add_node_from_snapshot, + election.test_kill_primary, + # The new primary has no snapshot so issue new entries + # to generate at least one snapshot + e2e_logging.test, + e2e_logging.test, + e2e_logging.test, + e2e_logging.test, + reconfiguration.test_add_node_from_snapshot, +] +suites["snapshots"] = suite_snapshots + all_tests_suite = [ # e2e_logging: e2e_logging.test,