This commit is contained in:
Alex 2021-06-18 10:24:44 +01:00 коммит произвёл GitHub
Родитель 809644daa8
Коммит 11ddc7919b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 178 добавлений и 36 удалений

Просмотреть файл

@ -508,13 +508,6 @@ if(BUILD_TESTS)
--snapshot-tx-interval 10000
)
add_e2e_test(
NAME recovery_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/recovery.py
CONSENSUS cft
ADDITIONAL_ARGS --recovery 3
)
add_e2e_test(
NAME recovery_test_curve_256
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/recovery.py
@ -831,6 +824,12 @@ if(BUILD_TESTS)
CONSENSUS ${CONSENSUS}
)
add_e2e_test(
NAME recovery_test_${CONSENSUS}
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/recovery.py
CONSENSUS ${CONSENSUS}
)
add_e2e_test(
NAME vegeta_stress_${CONSENSUS}
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/vegeta_stress.py

Просмотреть файл

@ -756,6 +756,8 @@ namespace loggingapp
.set_auto_schema<void, LoggingGetHistorical::Out>()
.add_query_parameter<size_t>("id")
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.set_execute_outside_consensus(
ccf::endpoints::ExecuteOutsideConsensus::Locally)
.install();
// SNIPPET_END: get_historical
@ -809,6 +811,8 @@ namespace loggingapp
.set_auto_schema<void, LoggingGetReceipt::Out>()
.add_query_parameter<size_t>("id")
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.set_execute_outside_consensus(
ccf::endpoints::ExecuteOutsideConsensus::Locally)
.install();
// SNIPPET_END: get_historical_with_receipt
@ -1068,6 +1072,8 @@ namespace loggingapp
"to_seqno", ccf::endpoints::QueryParamPresence::OptionalParameter)
.add_query_parameter<size_t>("id")
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.set_execute_outside_consensus(
ccf::endpoints::ExecuteOutsideConsensus::Locally)
.install();
auto record_admin_only = [this](

Просмотреть файл

@ -99,6 +99,8 @@ namespace aft
current_view(0),
last_idx(0),
commit_idx(0),
cft_watermark_idx(0),
bft_watermark_idx(0),
new_view_idx(0)
{}
@ -116,5 +118,15 @@ namespace aft
ViewHistory view_history;
kv::Version new_view_idx;
std::optional<ccf::NodeId> requested_evidence_from = std::nullopt;
// When running with BFT replicas do not know which replica to trust as the
// primary during recovery startup. So what we do is just trust the first
// replica that communicated with the replica in the view that it told us is
// correct. This is a liveness issue if there is a failure during recovery
// but CCF maintains integrity because it is derived from the members
// signing the ledger the replica will see and verify before opening the
// service.
std::optional<std::tuple<ccf::NodeId, ccf::View>> initial_recovery_primary =
std::nullopt;
};
}

Просмотреть файл

@ -227,6 +227,17 @@ namespace aft
view_change_tracker->set_current_view_change(starting_view_change);
}
auto progress_tracker = store->get_progress_tracker();
if (progress_tracker != nullptr)
{
progress_tracker->set_is_public_only(public_only);
}
if (request_tracker != nullptr && !public_only)
{
request_tracker->start_tracking_requests();
}
if (consensus_type == ConsensusType::BFT)
{
// Initialize view history for bft. We start on view 2 and the first
@ -326,6 +337,15 @@ namespace aft
// be deserialised
std::lock_guard<std::mutex> guard(state->lock);
public_only = false;
auto progress_tracker = store->get_progress_tracker();
if (progress_tracker != nullptr)
{
progress_tracker->set_is_public_only(public_only);
}
if (request_tracker != nullptr)
{
request_tracker->start_tracking_requests();
}
}
void force_become_leader()
@ -1246,7 +1266,14 @@ namespace aft
bool confirm_evidence = false;
if (consensus_type == ConsensusType::BFT)
{
if (active_nodes().size() == 0)
if (!state->initial_recovery_primary.has_value())
{
state->initial_recovery_primary = std::make_tuple(from, r.term);
}
if (
active_nodes().size() == 0 ||
(std::get<0>(state->initial_recovery_primary.value()) == from &&
std::get<1>(state->initial_recovery_primary.value()) == r.term))
{
// The replica is just starting up, we want to check that this replica
// is part of the network we joined but that is dependent on Byzantine
@ -1364,14 +1391,19 @@ namespace aft
// Third, check index consistency, making sure entries are not in the past
// or in the future
if (r.prev_idx < state->commit_idx)
if (
(consensus_type == ConsensusType::CFT &&
r.prev_idx < state->commit_idx) ||
r.prev_idx < state->bft_watermark_idx)
{
LOG_DEBUG_FMT(
"Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
"Recv append entries to {} from {} but prev_idx ({}), bft_watermark "
"({}) < commit_idx "
"({})",
state->my_node_id,
from,
r.prev_idx,
state->bft_watermark_idx,
state->commit_idx);
return;
}
@ -1846,7 +1878,10 @@ namespace aft
}
case kv::ApplyResult::PASS_APPLY:
{
executor->mark_request_executed(ds->get_request(), request_tracker);
if (!ds->is_public_only())
{
executor->mark_request_executed(ds->get_request(), request_tracker);
}
break;
}
case kv::ApplyResult::PASS_ENCRYPTED_PAST_LEDGER_SECRET:

Просмотреть файл

@ -287,6 +287,11 @@ namespace aft
{
return false;
}
bool is_public_only() override
{
return false;
}
};
virtual std::unique_ptr<kv::AbstractExecutionWrapper> deserialize(

Просмотреть файл

@ -164,6 +164,11 @@ namespace kv
{
return false;
}
bool is_public_only() override
{
return public_only;
}
};
class BFTExecutionWrapper : public AbstractExecutionWrapper
@ -245,6 +250,11 @@ namespace kv
{
return false;
}
virtual bool is_public_only() override
{
return public_only;
}
};
class SignatureBFTExec : public BFTExecutionWrapper
@ -567,14 +577,17 @@ namespace kv
history->append(data);
}
tx->set_change_list(std::move(changes), term);
if (!public_only)
{
tx->set_change_list(std::move(changes), term);
auto aft_requests = tx->rw<aft::RequestsMap>(ccf::Tables::AFT_REQUESTS);
auto req_v = aft_requests->get(0);
CCF_ASSERT(
req_v.has_value(),
"Deserialised append entry, but requests map is empty");
req = req_v.value();
auto aft_requests = tx->rw<aft::RequestsMap>(ccf::Tables::AFT_REQUESTS);
auto req_v = aft_requests->get(0);
CCF_ASSERT(
req_v.has_value(),
"Deserialised append entry, but requests map is empty");
req = req_v.value();
}
return ApplyResult::PASS_APPLY;
}

Просмотреть файл

@ -582,6 +582,7 @@ namespace kv
virtual aft::Request& get_request() = 0;
virtual kv::Version get_max_conflict_version() = 0;
virtual bool support_async_execution() = 0;
virtual bool is_public_only() = 0;
};
class AbstractStore

Просмотреть файл

@ -972,6 +972,20 @@ namespace ccf
g.create_service(network.identity->cert);
g.retire_active_nodes();
if (network.consensus_type == ConsensusType::BFT)
{
// BFT consensus requires a stable order of node IDs so that the
// primary node in a given view can be computed deterministically by
// all nodes in the network
// See https://github.com/microsoft/CCF/issues/1852
// Pad node id string to avoid memory alignment issues on
// node-to-node messages
auto values = tx.ro(network.values);
auto id = values->get(0);
self = NodeId(fmt::format("{:#064}", id.value()));
}
g.add_node(
self,
{node_info_network,
@ -1014,6 +1028,7 @@ namespace ccf
}
setup_consensus(true);
setup_progress_tracker();
auto_refresh_jwt_keys();
LOG_DEBUG_FMT(

Просмотреть файл

@ -20,10 +20,13 @@ namespace ccf
{
public:
ProgressTracker(
std::shared_ptr<ProgressTrackerStore> store_, const NodeId& id_) :
std::shared_ptr<ProgressTrackerStore> store_,
const NodeId& id_,
bool is_public_only_ = false) :
store(store_),
id(id_),
entropy(crypto::create_entropy())
entropy(crypto::create_entropy()),
is_public_only(is_public_only_)
{}
std::shared_ptr<ProgressTrackerStore> store;
@ -150,7 +153,8 @@ namespace ccf
if (node_count > 0 && can_send_sig_ack(cert, tx_id, node_count))
{
return kv::TxHistory::Result::SEND_SIG_RECEIPT_ACK;
return !is_public_only ? kv::TxHistory::Result::SEND_SIG_RECEIPT_ACK :
kv::TxHistory::Result::OK;
}
return kv::TxHistory::Result::OK;
}
@ -241,7 +245,9 @@ namespace ccf
}
else if (r == kv::TxHistory::Result::SEND_SIG_RECEIPT_ACK)
{
success = kv::TxHistory::Result::SEND_SIG_RECEIPT_ACK;
success = !is_public_only ?
kv::TxHistory::Result::SEND_SIG_RECEIPT_ACK :
kv::TxHistory::Result::OK;
}
}
else
@ -351,7 +357,8 @@ namespace ccf
if (can_send_reply_and_nonce(cert, node_count))
{
return kv::TxHistory::Result::SEND_REPLY_AND_NONCE;
return !is_public_only ? kv::TxHistory::Result::SEND_REPLY_AND_NONCE :
kv::TxHistory::Result::OK;
}
return kv::TxHistory::Result::OK;
}
@ -669,6 +676,12 @@ namespace ccf
return highest_commit_level;
}
void set_is_public_only(bool public_only)
{
std::unique_lock<std::mutex> guard(lock);
is_public_only = public_only;
}
private:
NodeId id;
std::shared_ptr<crypto::Entropy> entropy;
@ -676,6 +689,7 @@ namespace ccf
ccf::TxID highest_prepared_level = {0, 0};
std::map<ccf::SeqNo, CommitCert> certificates;
bool is_public_only;
mutable std::mutex lock;
kv::TxHistory::Result add_signature_internal(
@ -775,7 +789,8 @@ namespace ccf
store->write_backup_signatures(sig_value);
cert.wrote_sig_to_ledger = true;
}
return kv::TxHistory::Result::SEND_SIG_RECEIPT_ACK;
return !is_public_only ? kv::TxHistory::Result::SEND_SIG_RECEIPT_ACK :
kv::TxHistory::Result::OK;
}
return kv::TxHistory::Result::OK;
}
@ -858,11 +873,15 @@ namespace ccf
{
if (tx_id.seqno > highest_prepared_level.seqno)
{
CCF_ASSERT_FMT(
tx_id.view >= highest_prepared_level.view,
"Prepared terms are moving backwards new_term:{}, current_term:{}",
tx_id.view,
highest_prepared_level.view);
if (tx_id.view < highest_prepared_level.view)
{
LOG_INFO_FMT(
"Prepared terms are moving backwards new_term:{}, "
"current_term:{}",
tx_id.view,
highest_prepared_level.view);
return false;
}
highest_prepared_level = tx_id;
}

Просмотреть файл

@ -59,6 +59,11 @@ namespace aft
void insert(const crypto::Sha256Hash& hash, std::chrono::milliseconds time)
{
std::unique_lock<std::mutex> guard(lock);
if (!tracking_requests)
{
return;
}
if (remove(hash, hashes_without_requests, hashes_without_requests_list))
{
return;
@ -70,6 +75,10 @@ namespace aft
const crypto::Sha256Hash& hash, std::chrono::milliseconds time)
{
std::unique_lock<std::mutex> guard(lock);
if (!tracking_requests)
{
return;
}
#ifndef NDEBUG
Request r(hash);
CCF_ASSERT_FMT(
@ -83,6 +92,10 @@ namespace aft
bool remove(const crypto::Sha256Hash& hash)
{
std::unique_lock<std::mutex> guard(lock);
if (!tracking_requests)
{
return false;
}
return remove(hash, requests, requests_list);
}
@ -149,6 +162,12 @@ namespace aft
hashes_without_requests_list.clear();
}
void start_tracking_requests()
{
std::unique_lock<std::mutex> guard(lock);
tracking_requests = true;
}
private:
std::multiset<Request*, RequestComp> requests;
snmalloc::DLList<Request, std::nullptr_t, true> requests_list;
@ -160,6 +179,7 @@ namespace aft
ccf::SeqNo seqno_last_signature = ccf::SEQNO_UNKNOWN;
std::chrono::milliseconds time_last_signature =
std::chrono::milliseconds(0);
bool tracking_requests = false;
mutable std::mutex lock;
static void insert(

Просмотреть файл

@ -216,6 +216,11 @@ TEST_CASE("Request tracker")
aft::RequestTracker t;
crypto::Sha256Hash h;
h.h.fill(0);
t.insert(h, std::chrono::milliseconds(0));
REQUIRE(!t.oldest_entry().has_value());
t.start_tracking_requests();
for (uint32_t i = 0; i < 10; ++i)
{
h.h[0] = i;
@ -239,6 +244,7 @@ TEST_CASE("Request tracker")
INFO("Entry that was deleted is not tracked after it is added");
{
aft::RequestTracker t;
t.start_tracking_requests();
crypto::Sha256Hash h;
h.h.fill(0);
REQUIRE(t.oldest_entry().has_value() == false);
@ -268,6 +274,7 @@ TEST_CASE("Request tracker")
INFO("Can enter multiple items");
{
aft::RequestTracker t;
t.start_tracking_requests();
crypto::Sha256Hash h;
h.h.fill(0);
@ -307,6 +314,7 @@ TEST_CASE("Request tracker")
INFO("Verify seqno and time of last sig stored correctly");
{
aft::RequestTracker t;
t.start_tracking_requests();
auto r = t.get_seqno_time_last_request();
REQUIRE(std::get<0>(r) == ccf::SEQNO_UNKNOWN);

Просмотреть файл

@ -448,9 +448,7 @@ const actions = new Map([
[
"transition_service_to_open",
new Action(
function (args) {
checkNone(args);
},
function (args) {},
function (args) {
ccf.node.transitionServiceToOpen();

Просмотреть файл

@ -7,6 +7,7 @@ import http
import json
import random
import re
import uuid
import infra.network
import infra.proc
import infra.checker
@ -27,6 +28,7 @@ class Consortium:
common_dir,
key_generator,
share_script,
consensus,
members_info=None,
curve=None,
public_state=None,
@ -36,6 +38,7 @@ class Consortium:
self.members = []
self.key_generator = key_generator
self.share_script = share_script
self.consensus = consensus
self.members = []
self.recovery_threshold = None
self.authenticate_session = authenticate_session
@ -447,7 +450,10 @@ class Consortium:
if r.body.json()["state"] == infra.node.State.PART_OF_NETWORK.value:
is_recovery = False
proposal_body, careful_vote = self.make_proposal("transition_service_to_open")
proposal_body, careful_vote = self.make_proposal(
"transition_service_to_open",
args=None if self.consensus == "cft" else {"nonce": str(uuid.uuid4())},
)
proposal = self.get_any_active_member().propose(remote_node, proposal_body)
self.vote_using_majority(
remote_node, proposal, careful_vote, wait_for_global_commit=True

Просмотреть файл

@ -401,6 +401,7 @@ class Network:
self.common_dir,
self.key_generator,
self.share_script,
args.consensus,
initial_members_info,
args.participants_curve,
authenticate_session=not args.disable_member_session_auth,
@ -475,6 +476,7 @@ class Network:
common_dir,
self.key_generator,
self.share_script,
args.consensus,
public_state=public_state,
)

Просмотреть файл

@ -122,9 +122,12 @@ def run(args):
# Alternate between recovery with primary change and stable primary-ship,
# with and without snapshots
if i % 2 == 0:
recovered_network = test_share_resilience(
network, args, from_snapshot=True
)
if args.consensus != "bft":
recovered_network = test_share_resilience(
network, args, from_snapshot=True
)
else:
recovered_network = network
else:
recovered_network = test(network, args, from_snapshot=False)
network = recovered_network