From 7f38bb8ff5d70c2a9d6c85222a3a7a78d07dd3e4 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Wed, 30 Sep 2020 13:16:57 +0100 Subject: [PATCH] Fix cft election (#1641) --- .azure-pipelines-templates/daily-matrix.yml | 2 +- .azure-pipelines-templates/matrix.yml | 4 +- CMakeLists.txt | 14 + python/requirements.txt | 2 +- src/consensus/aft/raft.h | 91 ++++- src/consensus/aft/raft_consensus.h | 5 + src/consensus/aft/raft_types.h | 8 +- src/consensus/aft/test/driver.h | 4 +- src/consensus/aft/test/main.cpp | 374 +------------------- src/kv/kv_types.h | 1 + src/kv/test/stub_consensus.h | 5 + src/node/history.h | 10 +- src/node/rpc/member_frontend.h | 2 +- src/node/rpc/node_frontend.h | 25 ++ tests/code_update.py | 2 +- tests/committable.py | 63 ++++ tests/infra/network.py | 3 +- tests/memberclient.py | 5 +- tests/reconfiguration.py | 34 +- tests/recovery.py | 6 - tests/rotation.py | 56 +++ 21 files changed, 300 insertions(+), 416 deletions(-) create mode 100644 tests/committable.py create mode 100644 tests/rotation.py diff --git a/.azure-pipelines-templates/daily-matrix.yml b/.azure-pipelines-templates/daily-matrix.yml index 2185e93996..5cf5d61896 100644 --- a/.azure-pipelines-templates/daily-matrix.yml +++ b/.azure-pipelines-templates/daily-matrix.yml @@ -37,7 +37,7 @@ jobs: cmake_args: '${{ parameters.build.debug.cmake_args }} ${{ parameters.build.NoSGX.cmake_args }}' suffix: 'Instrumented' artifact_name: 'NoSGX_Instrumented' - ctest_filter: '-LE "benchmark|perf|long_test"' + ctest_filter: '-LE "benchmark|perf"' ctest_timeout: '300' - template: common.yml diff --git a/.azure-pipelines-templates/matrix.yml b/.azure-pipelines-templates/matrix.yml index 160e9b5feb..049209aeb7 100644 --- a/.azure-pipelines-templates/matrix.yml +++ b/.azure-pipelines-templates/matrix.yml @@ -29,9 +29,9 @@ parameters: test: NoSGX: - ctest_args: '-LE "benchmark|perf|tlstest|long_test"' + ctest_args: '-LE "benchmark|perf|tlstest"' SGX: - ctest_args: '-LE "benchmark|perf|tlstest|long_test"' + ctest_args: '-LE "benchmark|perf|tlstest"' perf: ctest_args: '-L "benchmark|perf"' diff --git a/CMakeLists.txt b/CMakeLists.txt index ff25a8b3bb..425e402509 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -623,6 +623,20 @@ if(BUILD_TESTS) 4000 ) + add_e2e_test( + NAME rotation_test + PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/rotation.py + CONSENSUS cft + ADDITIONAL_ARGS --raft-election-timeout 4000 + ) + + add_e2e_test( + NAME committable_suffix_test + PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/committable.py + CONSENSUS cft + ADDITIONAL_ARGS --raft-election-timeout 4000 + ) + add_e2e_test( NAME lua_e2e_batched PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_batched.py diff --git a/python/requirements.txt b/python/requirements.txt index 9847311cd1..b8e4c52a28 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,5 +1,5 @@ msgpack==1.0.0 -loguru==0.5.2 +loguru==0.5.3 requests==2.24.0 requests-http-signature==0.1.0 websocket-client==0.57.0 diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index db647c0c1f..31f8d0191a 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -74,6 +74,13 @@ namespace aft ReplicaState replica_state; std::chrono::milliseconds timeout_elapsed; + // Last (committable) index preceding the node's election, this is + // used to decide when to start issuing signatures. While commit_idx + // hasn't caught up with election_index, a newly elected leader is + // effectively finishing establishing commit over the previous term + // or even previous terms, and can therefore not meaningfully sign + // over the commit level. + kv::Version election_index = 0; // BFT RequestsMap& pbft_requests_map; @@ -184,6 +191,12 @@ namespace aft return replica_state == Follower; } + Index last_committable_index() const + { + return committable_indices.empty() ? state->commit_idx : + committable_indices.back(); + } + void enable_all_domains() { // When receiving append entries as a follower, all security domains will @@ -281,6 +294,20 @@ namespace aft return {get_term_internal(state->commit_idx), state->commit_idx}; } + std::optional> get_signable_commit_term_and_idx() + { + std::lock_guard guard(state->lock); + if (state->commit_idx >= election_index) + { + return std::pair{get_term_internal(state->commit_idx), + state->commit_idx}; + } + else + { + return std::nullopt; + } + } + Term get_term(Index idx) { if (consensus_type == ConsensusType::BFT && is_follower()) @@ -596,11 +623,12 @@ namespace aft } LOG_DEBUG_FMT( - "Received pt: {} pi: {} t: {} i: {}", + "Received pt: {} pi: {} t: {} i: {} toi: {}", r.prev_term, r.prev_idx, r.term, - r.idx); + r.idx, + r.term_of_idx); // Don't check that the sender node ID is valid. Accept anything that // passes the integrity check. This way, entries containing dynamic @@ -621,7 +649,7 @@ namespace aft else if (state->current_view > r.term) { // Reply false, since our term is later than the received term. - LOG_DEBUG_FMT( + LOG_INFO_FMT( "Recv append entries to {} from {} but our term is later ({} > {})", state->my_node_id, r.from_node, @@ -775,11 +803,18 @@ namespace aft case kv::DeserialiseSuccess::PASS_SIGNATURE: { LOG_DEBUG_FMT("Deserialising signature at {}", i); + auto prev_lci = last_committable_index(); committable_indices.push_back(i); if (sig_term) { - state->view_history.update(state->commit_idx + 1, sig_term); + // A signature for sig_term tells us that all transactions from + // the previous signature onwards (at least, if not further back) + // happened in sig_term. We reflect this in the history. + if (r.term_of_idx == aft::ViewHistory::InvalidView) + state->view_history.update(1, r.term); + else + state->view_history.update(prev_lci + 1, sig_term); commit_if_possible(r.leader_commit_idx); } if (consensus_type == ConsensusType::BFT) @@ -808,7 +843,13 @@ namespace aft // After entries have been deserialised, we try to commit the leader's // commit index and update our term history accordingly commit_if_possible(r.leader_commit_idx); - state->view_history.update(state->commit_idx + 1, r.term_of_idx); + + // The term may have changed, and we have not have seen a signature yet. + auto lci = last_committable_index(); + if (r.term_of_idx == aft::ViewHistory::InvalidView) + state->view_history.update(1, r.term); + else + state->view_history.update(lci + 1, r.term_of_idx); send_append_entries_response(r.from_node, true); } @@ -1168,10 +1209,13 @@ namespace aft { LOG_INFO_FMT("Send request vote from {} to {}", state->my_node_id, to); + auto last_committable_idx = last_committable_index(); + CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci"); + RequestVote rv = {{raft_request_vote, state->my_node_id}, state->current_view, - state->commit_idx, - get_term_internal(state->commit_idx)}; + last_committable_idx, + get_term_internal(last_committable_idx)}; channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, rv); } @@ -1238,12 +1282,17 @@ namespace aft return; } - // If the candidate's log is at least as up-to-date as ours, vote yes - auto last_commit_term = get_term_internal(state->commit_idx); + // If the candidate's committable log is at least as up-to-date as ours, + // vote yes - auto answer = (r.last_commit_term > last_commit_term) || - ((r.last_commit_term == last_commit_term) && - (r.last_commit_idx >= state->commit_idx)); + auto last_committable_idx = last_committable_index(); + auto term_of_last_committable_index = + get_term_internal(last_committable_idx); + + auto answer = + (r.term_of_last_committable_idx > term_of_last_committable_index) || + ((r.term_of_last_committable_idx == term_of_last_committable_index) && + (r.last_committable_idx >= last_committable_idx)); if (answer) { @@ -1380,12 +1429,14 @@ namespace aft void become_leader() { - // Discard any un-committed updates we may hold, + election_index = last_committable_index(); + LOG_DEBUG_FMT("Election index is {}", election_index); + // Discard any un-committable updates we may hold, // since we have no signature for them. Except at startup, // where we do not want to roll back the genesis transaction. if (state->commit_idx) { - rollback(state->commit_idx); + rollback(election_index); } else { @@ -1393,7 +1444,6 @@ namespace aft store->set_term(state->current_view); } - committable_indices.clear(); replica_state = Leader; leader_id = state->my_node_id; @@ -1433,9 +1483,7 @@ namespace aft voted_for = NoNode; votes_for_me.clear(); - // Rollback unreplicated commits. - rollback(state->commit_idx); - committable_indices.clear(); + rollback(last_committable_index()); LOG_INFO_FMT( "Becoming follower {}: {}", state->my_node_id, state->current_view); @@ -1512,6 +1560,11 @@ namespace aft void commit_if_possible(Index idx) { + LOG_DEBUG_FMT( + "Commit if possible {} (ci: {}) (ti {})", + idx, + state->commit_idx, + get_term_internal(idx)); if ( (idx > state->commit_idx) && (get_term_internal(idx) <= state->current_view)) @@ -1695,4 +1748,4 @@ namespace aft } } }; -} \ No newline at end of file +} diff --git a/src/consensus/aft/raft_consensus.h b/src/consensus/aft/raft_consensus.h index 84fc672808..4218ff8f7b 100644 --- a/src/consensus/aft/raft_consensus.h +++ b/src/consensus/aft/raft_consensus.h @@ -72,6 +72,11 @@ namespace aft return aft->get_commit_term_and_idx(); } + std::optional> get_signable_txid() override + { + return aft->get_signable_commit_term_and_idx(); + } + View get_view(SeqNo seqno) override { return aft->get_term(seqno); diff --git a/src/consensus/aft/raft_types.h b/src/consensus/aft/raft_types.h index a5ef92dbd3..85edb0b33e 100644 --- a/src/consensus/aft/raft_types.h +++ b/src/consensus/aft/raft_types.h @@ -186,12 +186,8 @@ namespace aft struct RequestVote : RaftHeader { Term term; - // last_log_idx in vanilla raft but last_commit_idx here to preserve - // verifiability - Index last_commit_idx; - // last_log_term in vanilla raft but last_commit_term here to preserve - // verifiability - Term last_commit_term; + Index last_committable_idx; + Term term_of_last_committable_idx; }; struct RequestVoteResponse : RaftHeader diff --git a/src/consensus/aft/test/driver.h b/src/consensus/aft/test/driver.h index a020214be0..1a20747db1 100644 --- a/src/consensus/aft/test/driver.h +++ b/src/consensus/aft/test/driver.h @@ -88,8 +88,8 @@ public: aft::NodeId node_id, aft::NodeId tgt_node_id, aft::RequestVote rv) { std::ostringstream s; - s << "request_vote t: " << rv.term << ", lli: " << rv.last_commit_idx - << ", llt: " << rv.last_commit_term; + s << "request_vote t: " << rv.term << ", lci: " << rv.last_committable_idx + << ", tolci: " << rv.term_of_last_committable_idx; log(node_id, tgt_node_id, s.str()); } diff --git a/src/consensus/aft/test/main.cpp b/src/consensus/aft/test/main.cpp index 236a38c3c3..aeeed0c6be 100644 --- a/src/consensus/aft/test/main.cpp +++ b/src/consensus/aft/test/main.cpp @@ -195,8 +195,9 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(get<0>(rv) == node_id1); auto rvc = get<1>(rv); DOCTEST_REQUIRE(rvc.term == 1); - DOCTEST_REQUIRE(rvc.last_commit_idx == 0); - DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView); + DOCTEST_REQUIRE(rvc.last_committable_idx == 0); + DOCTEST_REQUIRE( + rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView); r1.recv_message(reinterpret_cast(&rvc), sizeof(rvc)); @@ -207,8 +208,9 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(get<0>(rv) == node_id2); rvc = get<1>(rv); DOCTEST_REQUIRE(rvc.term == 1); - DOCTEST_REQUIRE(rvc.last_commit_idx == 0); - DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView); + DOCTEST_REQUIRE(rvc.last_committable_idx == 0); + DOCTEST_REQUIRE( + rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView); r2.recv_message(reinterpret_cast(&rvc), sizeof(rvc)); @@ -1040,366 +1042,4 @@ DOCTEST_TEST_CASE("Exceed append entries limit") (sent_entries > num_small_entries_sent && sent_entries <= num_small_entries_sent + num_big_entries)); DOCTEST_REQUIRE(r2.ledger->ledger.size() == individual_entries); -} - -// Reproduces issue described here: https://github.com/microsoft/CCF/issues/521 -// Once this is fixed test will need to be modified since right now it -// DOCTEST_CHECKs that the issue stands -DOCTEST_TEST_CASE( - "Primary gets invalidated if it compacts right before a term change that it " - "doesn't participate in") -{ - auto kv_store0 = std::make_shared(0); - auto kv_store1 = std::make_shared(1); - auto kv_store2 = std::make_shared(2); - - aft::NodeId node_id0(0); - aft::NodeId node_id1(1); - aft::NodeId node_id2(2); - - ms request_timeout(10); - - TRaft r0( - ConsensusType::CFT, - std::make_unique(kv_store0), - std::make_unique(node_id0), - std::make_shared(), - std::make_shared(), - nullptr, - nullptr, - cert, - request_map, - std::make_shared(node_id0), - nullptr, - request_timeout, - ms(20)); - TRaft r1( - ConsensusType::CFT, - std::make_unique(kv_store1), - std::make_unique(node_id1), - std::make_shared(), - std::make_shared(), - nullptr, - nullptr, - cert, - request_map, - std::make_shared(node_id1), - nullptr, - request_timeout, - ms(100)); - TRaft r2( - ConsensusType::CFT, - std::make_unique(kv_store2), - std::make_unique(node_id2), - std::make_shared(), - std::make_shared(), - nullptr, - nullptr, - cert, - request_map, - std::make_shared(node_id2), - nullptr, - request_timeout, - ms(50)); - - aft::Configuration::Nodes config0; - config0[node_id0] = {}; - config0[node_id1] = {}; - config0[node_id2] = {}; - r0.add_configuration(0, config0); - r1.add_configuration(0, config0); - r2.add_configuration(0, config0); - - map nodes; - nodes[node_id0] = &r0; - nodes[node_id1] = &r1; - nodes[node_id2] = &r2; - - r0.periodic(std::chrono::milliseconds(200)); - - DOCTEST_INFO("Initial election for Node 0"); - { - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, ((aft::ChannelStubProxy*)r0.channels.get())->sent_request_vote)); - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_request_vote_response)); - - DOCTEST_REQUIRE(r0.is_leader()); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 2); - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 0); - } - - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response.clear(); - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response.clear(); - - auto first_entry = std::make_shared>(); - for (auto i = 0; i < 3; ++i) - { - first_entry->push_back(1); - } - auto second_entry = std::make_shared>(); - for (auto i = 0; i < 3; ++i) - { - second_entry->push_back(2); - } - - auto third_entry = std::make_shared>(); - for (auto i = 0; i < 3; ++i) - { - third_entry->push_back(3); - } - - DOCTEST_INFO("Node 0 compacts twice but Nodes 1 and 2 only once"); - { - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, first_entry, true}}, 1)); - DOCTEST_REQUIRE(r0.ledger->ledger.size() == 1); - r0.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 1 and 2 receive append entries and respond - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response)); - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); - - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, second_entry, true}}, 1)); - DOCTEST_REQUIRE(r0.ledger->ledger.size() == 2); - r0.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 1 and 2 receive append entries and respond - // Node 0 will compact again and be ahead of Node 1 and 2 - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response)); - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); - - DOCTEST_CHECK(r0.get_term() == 1); - DOCTEST_CHECK(r0.get_commit_idx() == 2); - DOCTEST_CHECK(r0.get_last_idx() == 2); - - DOCTEST_CHECK(r1.get_term() == 1); - DOCTEST_CHECK(r1.get_commit_idx() == 1); - DOCTEST_CHECK(r1.get_last_idx() == 2); - - DOCTEST_CHECK(r2.get_term() == 1); - DOCTEST_CHECK(r2.get_commit_idx() == 1); - DOCTEST_CHECK(r2.get_last_idx() == 2); - - // clean up - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_request_vote_response.clear(); - - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_msg_count() == 0); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_msg_count() == 0); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r2.channels.get())->sent_msg_count() == 0); - } - - DOCTEST_INFO("Node 1 exceeds its election timeout and starts an election"); - { - auto by_0 = [](auto const& lhs, auto const& rhs) -> bool { - return get<0>(lhs) < get<0>(rhs); - }; - - r1.periodic(std::chrono::milliseconds(200)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.size() == - 2); - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.sort(by_0); - - DOCTEST_INFO("Node 2 receives the vote request"); - // pop for first node (node 0) so that it doesn't participate in the - // election - auto vote_req_from_1_to_0 = - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.front(); - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.pop_front(); - - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.last_commit_idx == 1); - DOCTEST_REQUIRE(msg.last_commit_term == 1); - })); - - DOCTEST_INFO("Node 2 votes for Node 1, Node 0 is suspended"); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get())->sent_request_vote_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.vote_granted); - })); - - DOCTEST_INFO("Node 1 is now leader"); - DOCTEST_REQUIRE(r1.is_leader()); - // pop Node 0's append entries - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries.pop_front(); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.idx == 1); - DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.prev_idx == 1); - DOCTEST_REQUIRE(msg.prev_term == 1); - DOCTEST_REQUIRE(msg.leader_commit_idx == 1); - })); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); - } - - DOCTEST_INFO( - "Node 1 and Node 2 proceed to compact at idx 2, where Node 0 has " - "compacted for a previous term"); - { - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{2, second_entry, true}}, 2)); - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 2); - r1.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 0 and 2 receive append entries and respond - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); - - // Node 0 will not respond here since it received an append entries it can - // not process [prev_idex (1) < commit_idx (2)] - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get()) - ->sent_append_entries_response.size() == 0); - - DOCTEST_INFO("Another entry from Node 1 so that Node 2 can also compact"); - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2)); - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3); - r1.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 0 and 2 receive append entries - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 0); - - // Node 0 will now have an ae response which will return false because - // its log for index 2 has the wrong term (ours: 1, theirs: 2) - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get()) - ->sent_append_entries_response.size() == 1); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.last_log_idx == 2); - DOCTEST_REQUIRE(!msg.success); - })); - - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3); - DOCTEST_REQUIRE(r2.ledger->ledger.size() == 3); - - DOCTEST_CHECK(r1.get_term() == 2); - DOCTEST_CHECK(r1.get_commit_idx() == 2); - DOCTEST_CHECK(r1.get_last_idx() == 3); - - DOCTEST_CHECK(r2.get_term() == 2); - DOCTEST_CHECK(r2.get_commit_idx() == 2); - DOCTEST_CHECK(r2.get_last_idx() == 3); - } -} +} \ No newline at end of file diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index 2283c34331..5548ba805d 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -290,6 +290,7 @@ namespace kv virtual bool replicate(const BatchVector& entries, View view) = 0; virtual std::pair get_committed_txid() = 0; + virtual std::optional> get_signable_txid() = 0; virtual View get_view(SeqNo seqno) = 0; virtual View get_view() = 0; diff --git a/src/kv/test/stub_consensus.h b/src/kv/test/stub_consensus.h index 990a44ee68..cf303916e7 100644 --- a/src/kv/test/stub_consensus.h +++ b/src/kv/test/stub_consensus.h @@ -93,6 +93,11 @@ namespace kv return {2, 0}; } + std::optional> get_signable_txid() override + { + return get_committed_txid(); + } + SeqNo get_committed_seqno() override { return 0; diff --git a/src/node/history.h b/src/node/history.h index bd18d29878..4e944b8da5 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -676,8 +676,16 @@ namespace ccf return; } + // Signatures are only emitted when the consensus is establishing commit + // over the node's own transactions + auto signable_txid = consensus->get_signable_txid(); + if (!signable_txid.has_value()) + { + return; + } + + auto commit_txid = signable_txid.value(); auto txid = store.next_txid(); - auto commit_txid = consensus->get_committed_txid(); LOG_DEBUG_FMT( "Signed at {} in view: {} commit was: {}.{}", diff --git a/src/node/rpc/member_frontend.h b/src/node/rpc/member_frontend.h index 7ab14be8d6..d853498ccd 100644 --- a/src/node/rpc/member_frontend.h +++ b/src/node/rpc/member_frontend.h @@ -958,7 +958,7 @@ namespace ccf }; make_endpoint("read", HTTP_POST, json_adapter(read)) // This can be executed locally, but can't currently take ReadOnlyTx due - // to restristions in our lua wrappers + // to restrictions in our lua wrappers .set_forwarding_required(ForwardingRequired::Sometimes) .set_auto_schema() .install(); diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index e8948fc993..cff8438ec1 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -360,6 +360,31 @@ namespace ccf make_read_only_endpoint("primary", HTTP_HEAD, is_primary) .set_forwarding_required(ForwardingRequired::Never) .install(); + + auto consensus_config = [this](CommandEndpointContext& args) { + // Query node for configurations, separate current from pending + if (consensus != nullptr) + { + auto cfg = consensus->get_latest_configuration(); + nlohmann::json c; + for (auto& [nid, ninfo] : cfg) + { + nlohmann::json n; + n["address"] = fmt::format("{}:{}", ninfo.hostname, ninfo.port); + c[fmt::format("{}", nid)] = n; + } + args.rpc_ctx->set_response_body(c.dump()); + } + else + { + args.rpc_ctx->set_response_status(HTTP_STATUS_NOT_FOUND); + args.rpc_ctx->set_response_body("No configured consensus"); + } + }; + + make_command_endpoint("config", HTTP_GET, consensus_config) + .set_forwarding_required(ForwardingRequired::Never) + .install(); } }; diff --git a/tests/code_update.py b/tests/code_update.py index 07897925aa..a0757399ea 100644 --- a/tests/code_update.py +++ b/tests/code_update.py @@ -152,7 +152,7 @@ if __name__ == "__main__": parser.add_argument( "-p", "--package", - help="The enclave package to load (e.g., libsimplebank)", + help="The enclave package to load (e.g., liblogging)", default="liblogging", ) parser.add_argument( diff --git a/tests/committable.py b/tests/committable.py new file mode 100644 index 0000000000..909bf6b31c --- /dev/null +++ b/tests/committable.py @@ -0,0 +1,63 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. +import infra.e2e_args +import infra.network +import infra.proc +import time + +from loguru import logger as LOG + + +def run(args): + hosts = ["localhost"] * 5 + + with infra.network.network( + hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_join(args) + primary, backups = network.find_nodes() + + # Suspend three of the backups to prevent commit + backups[1].suspend() + backups[2].suspend() + backups[3].stop() + + txs = [] + # Run some transactions that can't be committed + with primary.client("user0") as uc: + for i in range(10): + txs.append( + uc.post("/app/log/private", {"id": 100 + i, "msg": "Hello world"}) + ) + + # Wait for a signature to ensure those transactions are committable + time.sleep(args.sig_tx_interval * 2 / 1000) + + # Kill the primary, restore other backups + primary.stop() + backups[1].resume() + backups[2].resume() + new_primary, new_term = network.wait_for_new_primary(primary.node_id) + LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") + assert new_primary.node_id == backups[0].node_id + + # Check that uncommitted but committable suffix is preserved + with new_primary.client("user0") as uc: + check_commit = infra.checker.Checker(uc) + for tx in txs: + check_commit(tx) + + +if __name__ == "__main__": + + def add(parser): + parser.add_argument( + "-p", + "--package", + help="The enclave package to load (e.g., liblogging)", + default="liblogging", + ) + + args = infra.e2e_args.cli_args(add) + args.package = args.app_script and "liblua_generic" or "liblogging" + run(args) diff --git a/tests/infra/network.py b/tests/infra/network.py index 6d384632a6..b4c7280978 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -624,7 +624,7 @@ class Network: backup = random.choice(backups) return primary, backup - def wait_for_all_nodes_to_catch_up(self, primary, timeout=3): + def wait_for_all_nodes_to_catch_up(self, primary, timeout=10): """ Wait for all nodes to have joined the network and globally replicated all transactions globally executed on the primary (including transactions @@ -649,6 +649,7 @@ class Network: caught_up_nodes = [] for node in self.get_joined_nodes(): with node.client() as c: + c.get("/node/commit") resp = c.get(f"/node/local_tx?view={view}&seqno={seqno}") if resp.status_code != 200: # Node may not have joined the network yet, try again diff --git a/tests/memberclient.py b/tests/memberclient.py index 4533440691..580a791baf 100644 --- a/tests/memberclient.py +++ b/tests/memberclient.py @@ -57,7 +57,10 @@ def test_add_member(network, args): except infra.member.NoRecoveryShareFound as e: assert e.response.body.text() == "Only active members are given recovery shares" - new_member.ack(primary) + r = new_member.ack(primary) + with primary.client() as nc: + check_commit = infra.checker.Checker(nc) + check_commit(r) return network diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index 07d0b62ccf..dc40f6d56b 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -9,6 +9,26 @@ import time from loguru import logger as LOG +def node_configs(network): + configs = {} + for node in network.nodes: + try: + with node.client() as nc: + configs[node.node_id] = nc.get("/node/config").body.json() + except Exception: + pass + return configs + + +def count_nodes(configs, network): + nodes = set(str(k) for k in configs.keys()) + stopped = {str(n.node_id) for n in network.nodes if n.is_stopped()} + for node_id, node_config in configs.items(): + nodes_in_config = set(node_config.keys()) - stopped + assert nodes == nodes_in_config, f"{nodes} {nodes_in_config} {node_id}" + return len(nodes) + + def check_can_progress(node, timeout=3): with node.client() as c: r = c.get("/node/commit") @@ -102,16 +122,16 @@ def test_retire_backup(network, args): @reqs.description("Retiring the primary") @reqs.can_kill_n_nodes(1) def test_retire_primary(network, args): + pre_count = count_nodes(node_configs(network), network) + primary, backup = network.find_primary_and_any_backup() network.consortium.retire_node(primary, primary) - LOG.debug( - f"Waiting {network.election_duration}s for a new primary to be elected..." - ) - time.sleep(network.election_duration) - new_primary, new_term = network.find_primary() - assert new_primary.node_id != primary.node_id + new_primary, new_term = network.wait_for_new_primary(primary.node_id) LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") check_can_progress(backup) + network.nodes.remove(primary) + post_count = count_nodes(node_configs(network), network) + assert pre_count == post_count + 1 primary.stop() return network @@ -141,7 +161,7 @@ if __name__ == "__main__": parser.add_argument( "-p", "--package", - help="The enclave package to load (e.g., libsimplebank)", + help="The enclave package to load (e.g., liblogging)", default="liblogging", ) diff --git a/tests/recovery.py b/tests/recovery.py index 1581391970..251fb5495b 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -82,12 +82,6 @@ def test_share_resilience(network, args, from_snapshot=False): ) submitted_shares_count += 1 - # In theory, check_commit should be sufficient to guarantee that the new primary - # will know about all the recovery shares submitted so far. However, because of - # https://github.com/microsoft/CCF/issues/589, we have to wait for all nodes - # to have committed all transactions. - recovered_network.wait_for_all_nodes_to_catch_up(primary) - # Here, we kill the current primary instead of just suspending it. # However, because of https://github.com/microsoft/CCF/issues/99#issuecomment-630875387, # the new primary will most likely be the previous primary, which defies the point of this test. diff --git a/tests/rotation.py b/tests/rotation.py new file mode 100644 index 0000000000..f531a7ba21 --- /dev/null +++ b/tests/rotation.py @@ -0,0 +1,56 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. +import infra.e2e_args +import infra.network +import infra.proc +import suite.test_requirements as reqs +import reconfiguration + +from loguru import logger as LOG + + +@reqs.description("Suspend and resume primary") +@reqs.at_least_n_nodes(3) +def test_suspend_primary(network, args): + primary, _ = network.find_primary() + primary.suspend() + new_primary, new_term = network.wait_for_new_primary(primary.node_id) + LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") + reconfiguration.check_can_progress(new_primary) + primary.resume() + reconfiguration.check_can_progress(new_primary) + return network + + +def run(args): + hosts = ["localhost", "localhost"] + + with infra.network.network( + hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_join(args) + + # Replace primary repeatedly and check the network still operates + for _ in range(10): + reconfiguration.test_add_node(network, args) + reconfiguration.test_retire_primary(network, args) + + reconfiguration.test_add_node(network, args) + # Suspend primary repeatedly and check the network still operates + for _ in range(10): + test_suspend_primary(network, args) + + +if __name__ == "__main__": + + def add(parser): + parser.add_argument( + "-p", + "--package", + help="The enclave package to load (e.g., liblogging)", + default="liblogging", + ) + + args = infra.e2e_args.cli_args(add) + args.package = args.app_script and "liblua_generic" or "liblogging" + run(args)