This commit is contained in:
Amaury Chamayou 2020-09-30 13:16:57 +01:00 коммит произвёл GitHub
Родитель 758f96e5d3
Коммит 7f38bb8ff5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
21 изменённых файлов: 300 добавлений и 416 удалений

Просмотреть файл

@ -37,7 +37,7 @@ jobs:
cmake_args: '${{ parameters.build.debug.cmake_args }} ${{ parameters.build.NoSGX.cmake_args }}'
suffix: 'Instrumented'
artifact_name: 'NoSGX_Instrumented'
ctest_filter: '-LE "benchmark|perf|long_test"'
ctest_filter: '-LE "benchmark|perf"'
ctest_timeout: '300'
- template: common.yml

Просмотреть файл

@ -29,9 +29,9 @@ parameters:
test:
NoSGX:
ctest_args: '-LE "benchmark|perf|tlstest|long_test"'
ctest_args: '-LE "benchmark|perf|tlstest"'
SGX:
ctest_args: '-LE "benchmark|perf|tlstest|long_test"'
ctest_args: '-LE "benchmark|perf|tlstest"'
perf:
ctest_args: '-L "benchmark|perf"'

Просмотреть файл

@ -623,6 +623,20 @@ if(BUILD_TESTS)
4000
)
add_e2e_test(
NAME rotation_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/rotation.py
CONSENSUS cft
ADDITIONAL_ARGS --raft-election-timeout 4000
)
add_e2e_test(
NAME committable_suffix_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/committable.py
CONSENSUS cft
ADDITIONAL_ARGS --raft-election-timeout 4000
)
add_e2e_test(
NAME lua_e2e_batched
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_batched.py

Просмотреть файл

@ -1,5 +1,5 @@
msgpack==1.0.0
loguru==0.5.2
loguru==0.5.3
requests==2.24.0
requests-http-signature==0.1.0
websocket-client==0.57.0

Просмотреть файл

@ -74,6 +74,13 @@ namespace aft
ReplicaState replica_state;
std::chrono::milliseconds timeout_elapsed;
// Last (committable) index preceding the node's election, this is
// used to decide when to start issuing signatures. While commit_idx
// hasn't caught up with election_index, a newly elected leader is
// effectively finishing establishing commit over the previous term
// or even previous terms, and can therefore not meaningfully sign
// over the commit level.
kv::Version election_index = 0;
// BFT
RequestsMap& pbft_requests_map;
@ -184,6 +191,12 @@ namespace aft
return replica_state == Follower;
}
Index last_committable_index() const
{
return committable_indices.empty() ? state->commit_idx :
committable_indices.back();
}
void enable_all_domains()
{
// When receiving append entries as a follower, all security domains will
@ -281,6 +294,20 @@ namespace aft
return {get_term_internal(state->commit_idx), state->commit_idx};
}
std::optional<std::pair<Term, Index>> get_signable_commit_term_and_idx()
{
std::lock_guard<SpinLock> guard(state->lock);
if (state->commit_idx >= election_index)
{
return std::pair<Term, Index>{get_term_internal(state->commit_idx),
state->commit_idx};
}
else
{
return std::nullopt;
}
}
Term get_term(Index idx)
{
if (consensus_type == ConsensusType::BFT && is_follower())
@ -596,11 +623,12 @@ namespace aft
}
LOG_DEBUG_FMT(
"Received pt: {} pi: {} t: {} i: {}",
"Received pt: {} pi: {} t: {} i: {} toi: {}",
r.prev_term,
r.prev_idx,
r.term,
r.idx);
r.idx,
r.term_of_idx);
// Don't check that the sender node ID is valid. Accept anything that
// passes the integrity check. This way, entries containing dynamic
@ -621,7 +649,7 @@ namespace aft
else if (state->current_view > r.term)
{
// Reply false, since our term is later than the received term.
LOG_DEBUG_FMT(
LOG_INFO_FMT(
"Recv append entries to {} from {} but our term is later ({} > {})",
state->my_node_id,
r.from_node,
@ -775,11 +803,18 @@ namespace aft
case kv::DeserialiseSuccess::PASS_SIGNATURE:
{
LOG_DEBUG_FMT("Deserialising signature at {}", i);
auto prev_lci = last_committable_index();
committable_indices.push_back(i);
if (sig_term)
{
state->view_history.update(state->commit_idx + 1, sig_term);
// A signature for sig_term tells us that all transactions from
// the previous signature onwards (at least, if not further back)
// happened in sig_term. We reflect this in the history.
if (r.term_of_idx == aft::ViewHistory::InvalidView)
state->view_history.update(1, r.term);
else
state->view_history.update(prev_lci + 1, sig_term);
commit_if_possible(r.leader_commit_idx);
}
if (consensus_type == ConsensusType::BFT)
@ -808,7 +843,13 @@ namespace aft
// After entries have been deserialised, we try to commit the leader's
// commit index and update our term history accordingly
commit_if_possible(r.leader_commit_idx);
state->view_history.update(state->commit_idx + 1, r.term_of_idx);
// The term may have changed, and we have not have seen a signature yet.
auto lci = last_committable_index();
if (r.term_of_idx == aft::ViewHistory::InvalidView)
state->view_history.update(1, r.term);
else
state->view_history.update(lci + 1, r.term_of_idx);
send_append_entries_response(r.from_node, true);
}
@ -1168,10 +1209,13 @@ namespace aft
{
LOG_INFO_FMT("Send request vote from {} to {}", state->my_node_id, to);
auto last_committable_idx = last_committable_index();
CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");
RequestVote rv = {{raft_request_vote, state->my_node_id},
state->current_view,
state->commit_idx,
get_term_internal(state->commit_idx)};
last_committable_idx,
get_term_internal(last_committable_idx)};
channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, rv);
}
@ -1238,12 +1282,17 @@ namespace aft
return;
}
// If the candidate's log is at least as up-to-date as ours, vote yes
auto last_commit_term = get_term_internal(state->commit_idx);
// If the candidate's committable log is at least as up-to-date as ours,
// vote yes
auto answer = (r.last_commit_term > last_commit_term) ||
((r.last_commit_term == last_commit_term) &&
(r.last_commit_idx >= state->commit_idx));
auto last_committable_idx = last_committable_index();
auto term_of_last_committable_index =
get_term_internal(last_committable_idx);
auto answer =
(r.term_of_last_committable_idx > term_of_last_committable_index) ||
((r.term_of_last_committable_idx == term_of_last_committable_index) &&
(r.last_committable_idx >= last_committable_idx));
if (answer)
{
@ -1380,12 +1429,14 @@ namespace aft
void become_leader()
{
// Discard any un-committed updates we may hold,
election_index = last_committable_index();
LOG_DEBUG_FMT("Election index is {}", election_index);
// Discard any un-committable updates we may hold,
// since we have no signature for them. Except at startup,
// where we do not want to roll back the genesis transaction.
if (state->commit_idx)
{
rollback(state->commit_idx);
rollback(election_index);
}
else
{
@ -1393,7 +1444,6 @@ namespace aft
store->set_term(state->current_view);
}
committable_indices.clear();
replica_state = Leader;
leader_id = state->my_node_id;
@ -1433,9 +1483,7 @@ namespace aft
voted_for = NoNode;
votes_for_me.clear();
// Rollback unreplicated commits.
rollback(state->commit_idx);
committable_indices.clear();
rollback(last_committable_index());
LOG_INFO_FMT(
"Becoming follower {}: {}", state->my_node_id, state->current_view);
@ -1512,6 +1560,11 @@ namespace aft
void commit_if_possible(Index idx)
{
LOG_DEBUG_FMT(
"Commit if possible {} (ci: {}) (ti {})",
idx,
state->commit_idx,
get_term_internal(idx));
if (
(idx > state->commit_idx) &&
(get_term_internal(idx) <= state->current_view))
@ -1695,4 +1748,4 @@ namespace aft
}
}
};
}
}

Просмотреть файл

@ -72,6 +72,11 @@ namespace aft
return aft->get_commit_term_and_idx();
}
std::optional<std::pair<View, SeqNo>> get_signable_txid() override
{
return aft->get_signable_commit_term_and_idx();
}
View get_view(SeqNo seqno) override
{
return aft->get_term(seqno);

Просмотреть файл

@ -186,12 +186,8 @@ namespace aft
struct RequestVote : RaftHeader
{
Term term;
// last_log_idx in vanilla raft but last_commit_idx here to preserve
// verifiability
Index last_commit_idx;
// last_log_term in vanilla raft but last_commit_term here to preserve
// verifiability
Term last_commit_term;
Index last_committable_idx;
Term term_of_last_committable_idx;
};
struct RequestVoteResponse : RaftHeader

Просмотреть файл

@ -88,8 +88,8 @@ public:
aft::NodeId node_id, aft::NodeId tgt_node_id, aft::RequestVote rv)
{
std::ostringstream s;
s << "request_vote t: " << rv.term << ", lli: " << rv.last_commit_idx
<< ", llt: " << rv.last_commit_term;
s << "request_vote t: " << rv.term << ", lci: " << rv.last_committable_idx
<< ", tolci: " << rv.term_of_last_committable_idx;
log(node_id, tgt_node_id, s.str());
}

Просмотреть файл

@ -195,8 +195,9 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(get<0>(rv) == node_id1);
auto rvc = get<1>(rv);
DOCTEST_REQUIRE(rvc.term == 1);
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView);
DOCTEST_REQUIRE(rvc.last_committable_idx == 0);
DOCTEST_REQUIRE(
rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView);
r1.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));
@ -207,8 +208,9 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(get<0>(rv) == node_id2);
rvc = get<1>(rv);
DOCTEST_REQUIRE(rvc.term == 1);
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView);
DOCTEST_REQUIRE(rvc.last_committable_idx == 0);
DOCTEST_REQUIRE(
rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView);
r2.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));
@ -1040,366 +1042,4 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
(sent_entries > num_small_entries_sent &&
sent_entries <= num_small_entries_sent + num_big_entries));
DOCTEST_REQUIRE(r2.ledger->ledger.size() == individual_entries);
}
// Reproduces issue described here: https://github.com/microsoft/CCF/issues/521
// Once this is fixed test will need to be modified since right now it
// DOCTEST_CHECKs that the issue stands
DOCTEST_TEST_CASE(
"Primary gets invalidated if it compacts right before a term change that it "
"doesn't participate in")
{
auto kv_store0 = std::make_shared<StoreSig>(0);
auto kv_store1 = std::make_shared<StoreSig>(1);
auto kv_store2 = std::make_shared<StoreSig>(2);
aft::NodeId node_id0(0);
aft::NodeId node_id1(1);
aft::NodeId node_id2(2);
ms request_timeout(10);
TRaft r0(
ConsensusType::CFT,
std::make_unique<Adaptor>(kv_store0),
std::make_unique<aft::LedgerStubProxy>(node_id0),
std::make_shared<aft::ChannelStubProxy>(),
std::make_shared<aft::StubSnapshotter>(),
nullptr,
nullptr,
cert,
request_map,
std::make_shared<aft::State>(node_id0),
nullptr,
request_timeout,
ms(20));
TRaft r1(
ConsensusType::CFT,
std::make_unique<Adaptor>(kv_store1),
std::make_unique<aft::LedgerStubProxy>(node_id1),
std::make_shared<aft::ChannelStubProxy>(),
std::make_shared<aft::StubSnapshotter>(),
nullptr,
nullptr,
cert,
request_map,
std::make_shared<aft::State>(node_id1),
nullptr,
request_timeout,
ms(100));
TRaft r2(
ConsensusType::CFT,
std::make_unique<Adaptor>(kv_store2),
std::make_unique<aft::LedgerStubProxy>(node_id2),
std::make_shared<aft::ChannelStubProxy>(),
std::make_shared<aft::StubSnapshotter>(),
nullptr,
nullptr,
cert,
request_map,
std::make_shared<aft::State>(node_id2),
nullptr,
request_timeout,
ms(50));
aft::Configuration::Nodes config0;
config0[node_id0] = {};
config0[node_id1] = {};
config0[node_id2] = {};
r0.add_configuration(0, config0);
r1.add_configuration(0, config0);
r2.add_configuration(0, config0);
map<aft::NodeId, TRaft*> nodes;
nodes[node_id0] = &r0;
nodes[node_id1] = &r1;
nodes[node_id2] = &r2;
r0.periodic(std::chrono::milliseconds(200));
DOCTEST_INFO("Initial election for Node 0");
{
DOCTEST_REQUIRE(
2 ==
dispatch_all(
nodes, ((aft::ChannelStubProxy*)r0.channels.get())->sent_request_vote));
DOCTEST_REQUIRE(
1 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())
->sent_request_vote_response));
DOCTEST_REQUIRE(r0.is_leader());
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() ==
2);
DOCTEST_REQUIRE(
2 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() ==
0);
}
((aft::ChannelStubProxy*)r1.channels.get())
->sent_append_entries_response.clear();
((aft::ChannelStubProxy*)r2.channels.get())
->sent_append_entries_response.clear();
auto first_entry = std::make_shared<std::vector<uint8_t>>();
for (auto i = 0; i < 3; ++i)
{
first_entry->push_back(1);
}
auto second_entry = std::make_shared<std::vector<uint8_t>>();
for (auto i = 0; i < 3; ++i)
{
second_entry->push_back(2);
}
auto third_entry = std::make_shared<std::vector<uint8_t>>();
for (auto i = 0; i < 3; ++i)
{
third_entry->push_back(3);
}
DOCTEST_INFO("Node 0 compacts twice but Nodes 1 and 2 only once");
{
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, first_entry, true}}, 1));
DOCTEST_REQUIRE(r0.ledger->ledger.size() == 1);
r0.periodic(ms(10));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() ==
2);
// Nodes 1 and 2 receive append entries and respond
DOCTEST_REQUIRE(
2 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() ==
0);
DOCTEST_REQUIRE(
1 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())
->sent_append_entries_response));
DOCTEST_REQUIRE(
1 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r2.channels.get())
->sent_append_entries_response));
DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, second_entry, true}}, 1));
DOCTEST_REQUIRE(r0.ledger->ledger.size() == 2);
r0.periodic(ms(10));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() ==
2);
// Nodes 1 and 2 receive append entries and respond
// Node 0 will compact again and be ahead of Node 1 and 2
DOCTEST_REQUIRE(
2 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() ==
0);
DOCTEST_REQUIRE(
1 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())
->sent_append_entries_response));
DOCTEST_REQUIRE(
1 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r2.channels.get())
->sent_append_entries_response));
DOCTEST_CHECK(r0.get_term() == 1);
DOCTEST_CHECK(r0.get_commit_idx() == 2);
DOCTEST_CHECK(r0.get_last_idx() == 2);
DOCTEST_CHECK(r1.get_term() == 1);
DOCTEST_CHECK(r1.get_commit_idx() == 1);
DOCTEST_CHECK(r1.get_last_idx() == 2);
DOCTEST_CHECK(r2.get_term() == 1);
DOCTEST_CHECK(r2.get_commit_idx() == 1);
DOCTEST_CHECK(r2.get_last_idx() == 2);
// clean up
((aft::ChannelStubProxy*)r2.channels.get())
->sent_request_vote_response.clear();
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())->sent_msg_count() == 0);
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_msg_count() == 0);
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r2.channels.get())->sent_msg_count() == 0);
}
DOCTEST_INFO("Node 1 exceeds its election timeout and starts an election");
{
auto by_0 = [](auto const& lhs, auto const& rhs) -> bool {
return get<0>(lhs) < get<0>(rhs);
};
r1.periodic(std::chrono::milliseconds(200));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.size() ==
2);
((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.sort(by_0);
DOCTEST_INFO("Node 2 receives the vote request");
// pop for first node (node 0) so that it doesn't participate in the
// election
auto vote_req_from_1_to_0 =
((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.front();
((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.pop_front();
DOCTEST_REQUIRE(
1 ==
dispatch_all_and_DOCTEST_CHECK(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote,
[](const auto& msg) {
DOCTEST_REQUIRE(msg.term == 2);
DOCTEST_REQUIRE(msg.last_commit_idx == 1);
DOCTEST_REQUIRE(msg.last_commit_term == 1);
}));
DOCTEST_INFO("Node 2 votes for Node 1, Node 0 is suspended");
DOCTEST_REQUIRE(
1 ==
dispatch_all_and_DOCTEST_CHECK(
nodes,
((aft::ChannelStubProxy*)r2.channels.get())->sent_request_vote_response,
[](const auto& msg) {
DOCTEST_REQUIRE(msg.term == 2);
DOCTEST_REQUIRE(msg.vote_granted);
}));
DOCTEST_INFO("Node 1 is now leader");
DOCTEST_REQUIRE(r1.is_leader());
// pop Node 0's append entries
((aft::ChannelStubProxy*)r1.channels.get())
->sent_append_entries.pop_front();
DOCTEST_REQUIRE(
1 ==
dispatch_all_and_DOCTEST_CHECK(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries,
[](const auto& msg) {
DOCTEST_REQUIRE(msg.idx == 1);
DOCTEST_REQUIRE(msg.term == 2);
DOCTEST_REQUIRE(msg.prev_idx == 1);
DOCTEST_REQUIRE(msg.prev_term == 1);
DOCTEST_REQUIRE(msg.leader_commit_idx == 1);
}));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
0);
DOCTEST_REQUIRE(
1 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r2.channels.get())
->sent_append_entries_response));
}
DOCTEST_INFO(
"Node 1 and Node 2 proceed to compact at idx 2, where Node 0 has "
"compacted for a previous term");
{
DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{2, second_entry, true}}, 2));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 2);
r1.periodic(ms(10));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
2);
// Nodes 0 and 2 receive append entries and respond
DOCTEST_REQUIRE(
2 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
0);
DOCTEST_REQUIRE(
1 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r2.channels.get())
->sent_append_entries_response));
// Node 0 will not respond here since it received an append entries it can
// not process [prev_idex (1) < commit_idx (2)]
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())
->sent_append_entries_response.size() == 0);
DOCTEST_INFO("Another entry from Node 1 so that Node 2 can also compact");
DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3);
r1.periodic(ms(10));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
2);
// Nodes 0 and 2 receive append entries
DOCTEST_REQUIRE(
2 ==
dispatch_all(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
0);
// Node 0 will now have an ae response which will return false because
// its log for index 2 has the wrong term (ours: 1, theirs: 2)
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())
->sent_append_entries_response.size() == 1);
DOCTEST_REQUIRE(
1 ==
dispatch_all_and_DOCTEST_CHECK(
nodes,
((aft::ChannelStubProxy*)r0.channels.get())
->sent_append_entries_response,
[](const auto& msg) {
DOCTEST_REQUIRE(msg.last_log_idx == 2);
DOCTEST_REQUIRE(!msg.success);
}));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3);
DOCTEST_REQUIRE(r2.ledger->ledger.size() == 3);
DOCTEST_CHECK(r1.get_term() == 2);
DOCTEST_CHECK(r1.get_commit_idx() == 2);
DOCTEST_CHECK(r1.get_last_idx() == 3);
DOCTEST_CHECK(r2.get_term() == 2);
DOCTEST_CHECK(r2.get_commit_idx() == 2);
DOCTEST_CHECK(r2.get_last_idx() == 3);
}
}
}

Просмотреть файл

@ -290,6 +290,7 @@ namespace kv
virtual bool replicate(const BatchVector& entries, View view) = 0;
virtual std::pair<View, SeqNo> get_committed_txid() = 0;
virtual std::optional<std::pair<View, SeqNo>> get_signable_txid() = 0;
virtual View get_view(SeqNo seqno) = 0;
virtual View get_view() = 0;

Просмотреть файл

@ -93,6 +93,11 @@ namespace kv
return {2, 0};
}
std::optional<std::pair<View, SeqNo>> get_signable_txid() override
{
return get_committed_txid();
}
SeqNo get_committed_seqno() override
{
return 0;

Просмотреть файл

@ -676,8 +676,16 @@ namespace ccf
return;
}
// Signatures are only emitted when the consensus is establishing commit
// over the node's own transactions
auto signable_txid = consensus->get_signable_txid();
if (!signable_txid.has_value())
{
return;
}
auto commit_txid = signable_txid.value();
auto txid = store.next_txid();
auto commit_txid = consensus->get_committed_txid();
LOG_DEBUG_FMT(
"Signed at {} in view: {} commit was: {}.{}",

Просмотреть файл

@ -958,7 +958,7 @@ namespace ccf
};
make_endpoint("read", HTTP_POST, json_adapter(read))
// This can be executed locally, but can't currently take ReadOnlyTx due
// to restristions in our lua wrappers
// to restrictions in our lua wrappers
.set_forwarding_required(ForwardingRequired::Sometimes)
.set_auto_schema<KVRead>()
.install();

Просмотреть файл

@ -360,6 +360,31 @@ namespace ccf
make_read_only_endpoint("primary", HTTP_HEAD, is_primary)
.set_forwarding_required(ForwardingRequired::Never)
.install();
auto consensus_config = [this](CommandEndpointContext& args) {
// Query node for configurations, separate current from pending
if (consensus != nullptr)
{
auto cfg = consensus->get_latest_configuration();
nlohmann::json c;
for (auto& [nid, ninfo] : cfg)
{
nlohmann::json n;
n["address"] = fmt::format("{}:{}", ninfo.hostname, ninfo.port);
c[fmt::format("{}", nid)] = n;
}
args.rpc_ctx->set_response_body(c.dump());
}
else
{
args.rpc_ctx->set_response_status(HTTP_STATUS_NOT_FOUND);
args.rpc_ctx->set_response_body("No configured consensus");
}
};
make_command_endpoint("config", HTTP_GET, consensus_config)
.set_forwarding_required(ForwardingRequired::Never)
.install();
}
};

Просмотреть файл

@ -152,7 +152,7 @@ if __name__ == "__main__":
parser.add_argument(
"-p",
"--package",
help="The enclave package to load (e.g., libsimplebank)",
help="The enclave package to load (e.g., liblogging)",
default="liblogging",
)
parser.add_argument(

63
tests/committable.py Normal file
Просмотреть файл

@ -0,0 +1,63 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.e2e_args
import infra.network
import infra.proc
import time
from loguru import logger as LOG
def run(args):
hosts = ["localhost"] * 5
with infra.network.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_join(args)
primary, backups = network.find_nodes()
# Suspend three of the backups to prevent commit
backups[1].suspend()
backups[2].suspend()
backups[3].stop()
txs = []
# Run some transactions that can't be committed
with primary.client("user0") as uc:
for i in range(10):
txs.append(
uc.post("/app/log/private", {"id": 100 + i, "msg": "Hello world"})
)
# Wait for a signature to ensure those transactions are committable
time.sleep(args.sig_tx_interval * 2 / 1000)
# Kill the primary, restore other backups
primary.stop()
backups[1].resume()
backups[2].resume()
new_primary, new_term = network.wait_for_new_primary(primary.node_id)
LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}")
assert new_primary.node_id == backups[0].node_id
# Check that uncommitted but committable suffix is preserved
with new_primary.client("user0") as uc:
check_commit = infra.checker.Checker(uc)
for tx in txs:
check_commit(tx)
if __name__ == "__main__":
def add(parser):
parser.add_argument(
"-p",
"--package",
help="The enclave package to load (e.g., liblogging)",
default="liblogging",
)
args = infra.e2e_args.cli_args(add)
args.package = args.app_script and "liblua_generic" or "liblogging"
run(args)

Просмотреть файл

@ -624,7 +624,7 @@ class Network:
backup = random.choice(backups)
return primary, backup
def wait_for_all_nodes_to_catch_up(self, primary, timeout=3):
def wait_for_all_nodes_to_catch_up(self, primary, timeout=10):
"""
Wait for all nodes to have joined the network and globally replicated
all transactions globally executed on the primary (including transactions
@ -649,6 +649,7 @@ class Network:
caught_up_nodes = []
for node in self.get_joined_nodes():
with node.client() as c:
c.get("/node/commit")
resp = c.get(f"/node/local_tx?view={view}&seqno={seqno}")
if resp.status_code != 200:
# Node may not have joined the network yet, try again

Просмотреть файл

@ -57,7 +57,10 @@ def test_add_member(network, args):
except infra.member.NoRecoveryShareFound as e:
assert e.response.body.text() == "Only active members are given recovery shares"
new_member.ack(primary)
r = new_member.ack(primary)
with primary.client() as nc:
check_commit = infra.checker.Checker(nc)
check_commit(r)
return network

Просмотреть файл

@ -9,6 +9,26 @@ import time
from loguru import logger as LOG
def node_configs(network):
configs = {}
for node in network.nodes:
try:
with node.client() as nc:
configs[node.node_id] = nc.get("/node/config").body.json()
except Exception:
pass
return configs
def count_nodes(configs, network):
nodes = set(str(k) for k in configs.keys())
stopped = {str(n.node_id) for n in network.nodes if n.is_stopped()}
for node_id, node_config in configs.items():
nodes_in_config = set(node_config.keys()) - stopped
assert nodes == nodes_in_config, f"{nodes} {nodes_in_config} {node_id}"
return len(nodes)
def check_can_progress(node, timeout=3):
with node.client() as c:
r = c.get("/node/commit")
@ -102,16 +122,16 @@ def test_retire_backup(network, args):
@reqs.description("Retiring the primary")
@reqs.can_kill_n_nodes(1)
def test_retire_primary(network, args):
pre_count = count_nodes(node_configs(network), network)
primary, backup = network.find_primary_and_any_backup()
network.consortium.retire_node(primary, primary)
LOG.debug(
f"Waiting {network.election_duration}s for a new primary to be elected..."
)
time.sleep(network.election_duration)
new_primary, new_term = network.find_primary()
assert new_primary.node_id != primary.node_id
new_primary, new_term = network.wait_for_new_primary(primary.node_id)
LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}")
check_can_progress(backup)
network.nodes.remove(primary)
post_count = count_nodes(node_configs(network), network)
assert pre_count == post_count + 1
primary.stop()
return network
@ -141,7 +161,7 @@ if __name__ == "__main__":
parser.add_argument(
"-p",
"--package",
help="The enclave package to load (e.g., libsimplebank)",
help="The enclave package to load (e.g., liblogging)",
default="liblogging",
)

Просмотреть файл

@ -82,12 +82,6 @@ def test_share_resilience(network, args, from_snapshot=False):
)
submitted_shares_count += 1
# In theory, check_commit should be sufficient to guarantee that the new primary
# will know about all the recovery shares submitted so far. However, because of
# https://github.com/microsoft/CCF/issues/589, we have to wait for all nodes
# to have committed all transactions.
recovered_network.wait_for_all_nodes_to_catch_up(primary)
# Here, we kill the current primary instead of just suspending it.
# However, because of https://github.com/microsoft/CCF/issues/99#issuecomment-630875387,
# the new primary will most likely be the previous primary, which defies the point of this test.

56
tests/rotation.py Normal file
Просмотреть файл

@ -0,0 +1,56 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.e2e_args
import infra.network
import infra.proc
import suite.test_requirements as reqs
import reconfiguration
from loguru import logger as LOG
@reqs.description("Suspend and resume primary")
@reqs.at_least_n_nodes(3)
def test_suspend_primary(network, args):
primary, _ = network.find_primary()
primary.suspend()
new_primary, new_term = network.wait_for_new_primary(primary.node_id)
LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}")
reconfiguration.check_can_progress(new_primary)
primary.resume()
reconfiguration.check_can_progress(new_primary)
return network
def run(args):
hosts = ["localhost", "localhost"]
with infra.network.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_join(args)
# Replace primary repeatedly and check the network still operates
for _ in range(10):
reconfiguration.test_add_node(network, args)
reconfiguration.test_retire_primary(network, args)
reconfiguration.test_add_node(network, args)
# Suspend primary repeatedly and check the network still operates
for _ in range(10):
test_suspend_primary(network, args)
if __name__ == "__main__":
def add(parser):
parser.add_argument(
"-p",
"--package",
help="The enclave package to load (e.g., liblogging)",
default="liblogging",
)
args = infra.e2e_args.cli_args(add)
args.package = args.app_script and "liblua_generic" or "liblogging"
run(args)