AFT: receive view-change message & send new-view tx (#1861)

This commit is contained in:
Alex 2020-11-06 14:43:53 +00:00 коммит произвёл GitHub
Родитель 9e7106f50b
Коммит 3576d0b8b4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 498 добавлений и 157 удалений

Просмотреть файл

@ -50,3 +50,4 @@ jobs:
suffix: 'Release'
artifact_name: 'SGX_Release'
ctest_filter: '-LE "benchmark|perf"'

Просмотреть файл

@ -3,6 +3,7 @@
#pragma once
#include "consensus/aft/raft_types.h"
#include "node/view_change.h"
#include <chrono>
#include <map>
@ -14,16 +15,25 @@ namespace aft
{
struct ViewChange
{
std::set<kv::NodeId> received_view_changes;
ViewChange(kv::Consensus::View view_, kv::Consensus::SeqNo seqno_) :
view(view_),
seqno(seqno_),
new_view_sent(false)
{}
kv::Consensus::View view;
kv::Consensus::SeqNo seqno;
bool new_view_sent;
std::map<kv::NodeId, ccf::ViewChangeRequest> received_view_changes;
};
public:
ViewChangeTracker(
kv::NodeId my_node_id_,
kv::Consensus::View current_view,
std::shared_ptr<ccf::ProgressTrackerStore> store_,
std::chrono::milliseconds time_between_attempts_) :
my_node_id(my_node_id_),
last_view_change_sent(current_view),
store(store_),
last_view_change_sent(0),
time_between_attempts(time_between_attempts_)
{}
@ -31,10 +41,7 @@ namespace aft
{
if (time > time_between_attempts + time_previous_view_change_increment)
{
ViewChange vc;
vc.received_view_changes.emplace(my_node_id);
++last_view_change_sent;
view_changes.emplace(last_view_change_sent, std::move(vc));
time_previous_view_change_increment = time;
return true;
}
@ -58,12 +65,76 @@ namespace aft
last_view_change_sent = view;
}
enum class ResultAddView
{
OK,
APPEND_NEW_VIEW_MESSAGE
};
ResultAddView add_request_view_change(
ccf::ViewChangeRequest& v,
kv::NodeId from,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
uint32_t node_count)
{
auto it = view_changes.find(view);
if (it == view_changes.end())
{
ViewChange view_change(view, seqno);
std::tie(it, std::ignore) =
view_changes.emplace(view, std::move(view_change));
}
it->second.received_view_changes.emplace(from, v);
if (
should_send_new_view(
it->second.received_view_changes.size(), node_count) &&
it->second.new_view_sent == false)
{
it->second.new_view_sent = true;
return ResultAddView::APPEND_NEW_VIEW_MESSAGE;
}
return ResultAddView::OK;
}
void write_view_change_confirmation_append_entry(kv::Consensus::View view)
{
auto it = view_changes.find(view);
if (it == view_changes.end())
{
throw std::logic_error(fmt::format(
"Cannot write unknown view-change to ledger, view:{}", view));
}
auto& vc = it->second;
ccf::ViewChangeConfirmation nv(vc.view, vc.seqno);
for (auto it : vc.received_view_changes)
{
nv.view_change_messages.emplace(it.first, it.second);
}
store->write_view_change_confirmation(nv);
}
void clear()
{
view_changes.clear();
}
private:
kv::NodeId my_node_id;
std::shared_ptr<ccf::ProgressTrackerStore> store;
std::map<kv::Consensus::View, ViewChange> view_changes;
std::chrono::milliseconds time_previous_view_change_increment =
std::chrono::milliseconds(0);
kv::Consensus::View last_view_change_sent = 0;
const std::chrono::milliseconds time_between_attempts;
bool should_send_new_view(size_t received_requests, size_t node_count) const
{
return received_requests == ccf::get_message_threshold(node_count);
}
};
}

Просмотреть файл

@ -88,7 +88,7 @@ namespace aft
std::shared_ptr<aft::State> state;
std::shared_ptr<Executor> executor;
std::shared_ptr<aft::RequestTracker> request_tracker;
ViewChangeTracker view_change_tracker;
std::unique_ptr<aft::ViewChangeTracker> view_change_tracker;
// Timeouts
std::chrono::milliseconds request_timeout;
@ -111,8 +111,8 @@ namespace aft
// Indices that are eligible for global commit, from a Node's perspective
std::deque<Index> committable_indices;
// When this is set, only public domain is deserialised when receving append
// entries
// When this is set, only public domain is deserialised when receiving
// append entries
bool public_only = false;
// Randomness
@ -141,6 +141,7 @@ namespace aft
std::shared_ptr<aft::State> state_,
std::shared_ptr<Executor> executor_,
std::shared_ptr<aft::RequestTracker> request_tracker_,
std::unique_ptr<aft::ViewChangeTracker> view_change_tracker_,
std::chrono::milliseconds request_timeout_,
std::chrono::milliseconds election_timeout_,
std::chrono::milliseconds view_change_timeout_,
@ -156,8 +157,7 @@ namespace aft
state(state_),
executor(executor_),
request_tracker(request_tracker_),
view_change_tracker(
state_->my_node_id, starting_view_change, view_change_timeout_),
view_change_tracker(std::move(view_change_tracker_)),
request_timeout(request_timeout_),
election_timeout(election_timeout_),
@ -176,6 +176,10 @@ namespace aft
{
leader_id = NoNode;
if (view_change_tracker != nullptr)
{
view_change_tracker->set_current_view_change(starting_view_change);
}
if (consensus_type == ConsensusType::BFT)
{
@ -224,6 +228,13 @@ namespace aft
return replica_state == Follower;
}
NodeId get_primary(kv::Consensus::View view)
{
// This will not work once we have reconfiguration support
// https://github.com/microsoft/CCF/issues/1852
return (view - starting_view_change) % active_nodes().size();
}
Index last_committable_index() const
{
return committable_indices.empty() ? state->commit_idx :
@ -424,8 +435,7 @@ namespace aft
for (auto& [index, data, is_globally_committable] : entries)
{
bool globally_committable =
is_globally_committable || consensus_type == ConsensusType::BFT;
bool globally_committable = is_globally_committable;
if (index != state->last_idx + 1)
return false;
@ -528,9 +538,6 @@ namespace aft
void periodic(std::chrono::milliseconds elapsed)
{
std::lock_guard<SpinLock> guard(state->lock);
timeout_elapsed += elapsed;
if (consensus_type == ConsensusType::BFT)
{
auto time = threading::ThreadMessaging::thread_messaging
@ -538,26 +545,25 @@ namespace aft
request_tracker->tick(time);
if (
!view_change_tracker.is_view_change_in_progress(time) &&
!view_change_tracker->is_view_change_in_progress(time) &&
is_follower() && (has_bft_timeout_occurred(time)) &&
view_change_tracker.should_send_view_change(time))
view_change_tracker->should_send_view_change(time))
{
// We have not seen a request executed within an expected period of
// time. We should invoke a view-change.
//
kv::Consensus::View new_view = view_change_tracker.get_target_view();
kv::Consensus::View new_view = view_change_tracker->get_target_view();
kv::Consensus::SeqNo seqno;
crypto::Sha256Hash root;
std::unique_ptr<ccf::ViewChange> vc;
std::unique_ptr<ccf::ViewChangeRequest> vc;
auto progress_tracker = store->get_progress_tracker();
std::tie(vc, seqno, root) =
std::tie(vc, seqno) =
progress_tracker->get_view_change_message(new_view);
size_t vc_size = vc->get_serialized_size();
RequestViewChangeMsg vcm = {
{bft_view_change, state->my_node_id}, new_view, seqno, root};
{bft_view_change, state->my_node_id}, new_view, seqno};
std::vector<uint8_t> m;
m.resize(sizeof(RequestViewChangeMsg) + vc_size);
@ -580,8 +586,20 @@ namespace aft
ccf::NodeMsgType::consensus_msg, send_to, m);
}
}
if (
get_primary(new_view) == id() &&
aft::ViewChangeTracker::ResultAddView::APPEND_NEW_VIEW_MESSAGE ==
view_change_tracker->add_request_view_change(
*vc, id(), new_view, seqno, node_count()))
{
append_new_view(new_view);
}
}
}
std::lock_guard<SpinLock> guard(state->lock);
timeout_elapsed += elapsed;
if (replica_state == Leader)
{
@ -635,13 +653,26 @@ namespace aft
return;
}
ccf::ViewChange v = ccf::ViewChange::deserialize(data, size);
ccf::ViewChangeRequest v =
ccf::ViewChangeRequest::deserialize(data, size);
LOG_INFO_FMT(
"Received view change from:{}, view:{}", r.from_node, r.view);
auto progress_tracker = store->get_progress_tracker();
progress_tracker->apply_view_change_message(
v, r.from_node, r.view, r.seqno, r.root);
if (!progress_tracker->apply_view_change_message(
v, r.from_node, r.view, r.seqno))
{
return;
}
if (
get_primary(r.view) == id() &&
aft::ViewChangeTracker::ResultAddView::APPEND_NEW_VIEW_MESSAGE ==
view_change_tracker->add_request_view_change(
v, r.from_node, r.view, r.seqno, node_count()))
{
append_new_view(r.view);
}
}
bool is_first_request = true;
@ -672,6 +703,16 @@ namespace aft
entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
}
void append_new_view(kv::Consensus::View view)
{
state->current_view = view;
become_leader();
view_change_tracker->write_view_change_confirmation_append_entry(view);
view_change_tracker->clear();
request_tracker->clear();
}
bool has_bft_timeout_occurred(std::chrono::milliseconds time)
{
auto oldest_entry = request_tracker->oldest_entry();
@ -1010,6 +1051,12 @@ namespace aft
{
break;
}
case kv::DeserialiseSuccess::NEW_VIEW:
{
view_change_tracker->clear();
request_tracker->clear();
break;
}
case kv::DeserialiseSuccess::PASS_BACKUP_SIGNATURE_SEND_ACK:
{
@ -1623,13 +1670,18 @@ namespace aft
LOG_INFO_FMT(
"Becoming candidate {}: {}", state->my_node_id, state->current_view);
if (consensus_type != ConsensusType::BFT)
{
for (auto it = nodes.begin(); it != nodes.end(); ++it)
{
channels->create_channel(
it->first, it->second.node_info.hostname, it->second.node_info.port);
it->first,
it->second.node_info.hostname,
it->second.node_info.port);
send_request_vote(it->first);
}
}
}
void become_leader()
{
@ -1691,8 +1743,12 @@ namespace aft
LOG_INFO_FMT(
"Becoming follower {}: {}", state->my_node_id, state->current_view);
if (consensus_type != ConsensusType::BFT)
{
channels->close_all_outgoing();
}
}
void become_retired()
{

Просмотреть файл

@ -17,5 +17,6 @@ namespace aft
ccf::Tables::AFT_REQUESTS,
ccf::Tables::SIGNATURES,
ccf::Tables::BACKUP_SIGNATURES,
ccf::Tables::NONCES};
ccf::Tables::NONCES,
ccf::Tables::NEW_VIEWS};
}

Просмотреть файл

@ -202,7 +202,6 @@ namespace aft
{
kv::Consensus::View view = 0;
kv::Consensus::SeqNo seqno = 0;
crypto::Sha256Hash root;
};
struct RequestVote : RaftHeader

Просмотреть файл

@ -57,6 +57,7 @@ public:
std::make_shared<aft::State>(node_id),
nullptr,
std::make_shared<aft::RequestTracker>(),
nullptr,
ms(10),
ms(i * 100),
ms(i * 100));

Просмотреть файл

@ -43,6 +43,7 @@ DOCTEST_TEST_CASE("Single node startup" * doctest::test_suite("single"))
std::make_shared<aft::State>(node_id),
nullptr,
nullptr,
nullptr,
ms(10),
election_timeout,
ms(1000));
@ -87,6 +88,7 @@ DOCTEST_TEST_CASE("Single node commit" * doctest::test_suite("single"))
std::make_shared<aft::State>(node_id),
nullptr,
nullptr,
nullptr,
ms(10),
election_timeout,
ms(1000));
@ -140,6 +142,7 @@ DOCTEST_TEST_CASE(
std::make_shared<aft::State>(node_id0),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -155,6 +158,7 @@ DOCTEST_TEST_CASE(
std::make_shared<aft::State>(node_id1),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -170,6 +174,7 @@ DOCTEST_TEST_CASE(
std::make_shared<aft::State>(node_id2),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));
@ -346,6 +351,7 @@ DOCTEST_TEST_CASE(
std::make_shared<aft::State>(node_id0),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -362,6 +368,7 @@ DOCTEST_TEST_CASE(
std::make_shared<aft::State>(node_id1),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -377,6 +384,7 @@ DOCTEST_TEST_CASE(
std::make_shared<aft::State>(node_id2),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));
@ -523,6 +531,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
std::make_shared<aft::State>(node_id0),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -539,6 +548,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
std::make_shared<aft::State>(node_id1),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -555,6 +565,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
std::make_shared<aft::State>(node_id2),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));
@ -683,6 +694,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
std::make_shared<aft::State>(node_id0),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -699,6 +711,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
std::make_shared<aft::State>(node_id1),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -899,6 +912,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
std::make_shared<aft::State>(node_id0),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -915,6 +929,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
std::make_shared<aft::State>(node_id1),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -931,6 +946,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
std::make_shared<aft::State>(node_id2),
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));

Просмотреть файл

@ -130,7 +130,8 @@ namespace kv
PASS_SIGNATURE = 2,
PASS_BACKUP_SIGNATURE = 3,
PASS_BACKUP_SIGNATURE_SEND_ACK = 4,
PASS_NONCES = 5
PASS_NONCES = 5,
NEW_VIEW = 6
};
enum ReplicateType

Просмотреть файл

@ -888,6 +888,26 @@ namespace kv
h->append(data.data(), data.size());
success = DeserialiseSuccess::PASS_NONCES;
}
else if (changes.find(ccf::Tables::NEW_VIEWS) != changes.end())
{
LOG_INFO_FMT("Applying new view");
success = commit_deserialised(changes, v, new_maps);
if (success == DeserialiseSuccess::FAILED)
{
return success;
}
if (!progress_tracker->apply_new_view(consensus->primary()))
{
LOG_FAIL_FMT("apply_new_view Failed");
LOG_DEBUG_FMT("NewView in transaction {} failed to verify", v);
return DeserialiseSuccess::FAILED;
}
auto h = get_history();
h->append(data.data(), data.size());
success = DeserialiseSuccess::NEW_VIEW;
}
else if (changes.find(ccf::Tables::AFT_REQUESTS) == changes.end())
{
// we have deserialised an entry that didn't belong to the bft

Просмотреть файл

@ -98,6 +98,7 @@ namespace ccf
// Consensus specific tables
static constexpr auto AFT_REQUESTS = "public:ccf.gov.aft.requests";
static constexpr auto NEW_VIEWS = "public:ccf.internal.new_views";
};
}

Просмотреть файл

@ -95,6 +95,7 @@ namespace ccf
aft::RequestsMap bft_requests_map;
BackupSignaturesMap backup_signatures_map;
aft::RevealedNoncesMap revealed_nonces_map;
NewViewsMap new_views_map;
NetworkTables(const ConsensusType& consensus_type = ConsensusType::CFT) :
tables(
@ -130,7 +131,8 @@ namespace ccf
snapshot_evidence(Tables::SNAPSHOT_EVIDENCE),
bft_requests_map(Tables::AFT_REQUESTS),
backup_signatures_map(Tables::BACKUP_SIGNATURES),
revealed_nonces_map(Tables::NONCES)
revealed_nonces_map(Tables::NONCES),
new_views_map(Tables::NEW_VIEWS)
{}
/** Returns a tuple of all tables that are possibly accessible from scripts

Просмотреть файл

@ -154,6 +154,7 @@ namespace ccf
std::shared_ptr<kv::TxHistory> history;
std::shared_ptr<ccf::ProgressTracker> progress_tracker;
std::shared_ptr<ccf::ProgressTrackerStoreAdapter> tracker_store;
std::shared_ptr<kv::AbstractTxEncryptor> encryptor;
ShareManager& share_manager;
@ -1631,8 +1632,12 @@ namespace ccf
{
setup_n2n_channels();
setup_cmd_forwarder();
setup_tracker_store();
auto request_tracker = std::make_shared<aft::RequestTracker>();
auto view_change_tracker = std::make_unique<aft::ViewChangeTracker>(
tracker_store,
std::chrono::milliseconds(consensus_config.raft_election_timeout));
auto shared_state = std::make_shared<aft::State>(self);
auto raft = std::make_unique<RaftType>(
network.consensus_type,
@ -1647,6 +1652,7 @@ namespace ccf
shared_state,
std::make_shared<aft::ExecutorImpl>(shared_state, rpc_map, rpcsessions),
request_tracker,
std::move(view_change_tracker),
std::chrono::milliseconds(consensus_config.raft_request_timeout),
std::chrono::milliseconds(consensus_config.raft_election_timeout),
std::chrono::milliseconds(consensus_config.pbft_view_change_timeout),
@ -1762,16 +1768,24 @@ namespace ccf
{
if (network.consensus_type == ConsensusType::BFT)
{
auto store = std::make_unique<ccf::ProgressTrackerStoreAdapter>(
setup_tracker_store();
progress_tracker =
std::make_shared<ccf::ProgressTracker>(tracker_store, self);
network.tables->set_progress_tracker(progress_tracker);
}
}
void setup_tracker_store()
{
if (tracker_store == nullptr)
{
tracker_store = std::make_shared<ccf::ProgressTrackerStoreAdapter>(
*network.tables.get(),
*node_sign_kp,
network.nodes,
network.backup_signatures_map,
network.revealed_nonces_map);
progress_tracker =
std::make_shared<ccf::ProgressTracker>(std::move(store), self);
network.tables->set_progress_tracker(progress_tracker);
network.revealed_nonces_map,
network.new_views_map);
}
}

Просмотреть файл

@ -19,13 +19,13 @@ namespace ccf
{
public:
ProgressTracker(
std::unique_ptr<ProgressTrackerStore> store_, kv::NodeId id_) :
store(std::move(store_)),
std::shared_ptr<ProgressTrackerStore> store_, kv::NodeId id_) :
store(store_),
id(id_),
entropy(tls::create_entropy())
{}
std::unique_ptr<ProgressTrackerStore> store;
std::shared_ptr<ProgressTrackerStore> store;
kv::TxHistory::Result add_signature(
kv::TxID tx_id,
@ -555,10 +555,7 @@ namespace ccf
return highest_commit_level;
}
std::tuple<
std::unique_ptr<ViewChange>,
kv::Consensus::SeqNo,
crypto::Sha256Hash>
std::tuple<std::unique_ptr<ViewChangeRequest>, kv::Consensus::SeqNo>
get_view_change_message(kv::Consensus::View view)
{
auto it = certificates.find(highest_prepared_level.version);
@ -571,27 +568,24 @@ namespace ccf
}
auto& cert = it->second;
auto m = std::make_unique<ViewChange>();
auto m = std::make_unique<ViewChangeRequest>();
for (const auto& sig : cert.sigs)
{
m->signatures.push_back(sig.second);
}
store->sign_view_change(
*m, view, highest_prepared_level.version, cert.root);
return std::make_tuple(
std::move(m), highest_prepared_level.version, cert.root);
store->sign_view_change_request(*m, view, highest_prepared_level.version);
return std::make_tuple(std::move(m), highest_prepared_level.version);
}
bool apply_view_change_message(
ViewChange& view_change,
ViewChangeRequest& view_change,
kv::NodeId from,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root)
kv::Consensus::SeqNo seqno)
{
if (!store->verify_view_change(view_change, from, view, seqno, root))
if (!store->verify_view_change_request(view_change, from, view, seqno))
{
LOG_FAIL_FMT("Failed to verify view-change from:{}", from);
return false;
@ -611,16 +605,6 @@ namespace ccf
return false;
}
if (it->second.root != root)
{
LOG_FAIL_FMT(
"Roots do not match, view-change from:{}, view:{}, seqno:{}",
from,
view,
seqno);
return false;
}
bool verified_signatures = true;
for (auto& sig : view_change.signatures)
@ -650,6 +634,39 @@ namespace ccf
return verified_signatures;
}
bool apply_new_view(kv::NodeId from) const
{
auto new_view = store->get_new_view();
CCF_ASSERT(new_view.has_value(), "new view does not have a value");
kv::Consensus::View view = new_view->view;
kv::Consensus::SeqNo seqno = new_view->seqno;
for (auto& vcp : new_view->view_change_messages)
{
kv::NodeId id = vcp.first;
ccf::ViewChangeRequest& vc = vcp.second;
if (!store->verify_view_change_request(vc, id, view, seqno))
{
LOG_FAIL_FMT(
"Failed to verify view-change id:{},view:{}, seqno:{}",
id,
view,
seqno);
return false;
}
}
if (!store->verify_view_change_request_confirmation(
new_view.value(), from))
{
LOG_INFO_FMT("Failed to verify from:{}", from);
return false;
}
return true;
}
private:
kv::NodeId id;
std::shared_ptr<tls::Entropy> entropy;
@ -708,15 +725,6 @@ namespace ccf
return std::equal(n_1.h.begin(), n_1.h.end(), n_2.h.begin());
}
uint32_t get_message_threshold(uint32_t node_count)
{
uint32_t f = 0;
for (; 3 * f + 1 < node_count; ++f)
;
return 2 * f + 1;
}
bool can_send_sig_ack(
CommitCert& cert, const kv::TxID& tx_id, uint32_t node_count)
{

Просмотреть файл

@ -56,6 +56,7 @@ namespace ccf
virtual ~ProgressTrackerStore() = default;
virtual void write_backup_signatures(ccf::BackupSignatures& sig_value) = 0;
virtual std::optional<ccf::BackupSignatures> get_backup_signatures() = 0;
virtual std::optional<ccf::ViewChangeConfirmation> get_new_view() = 0;
virtual void write_nonces(aft::RevealedNonces& nonces) = 0;
virtual std::optional<aft::RevealedNonces> get_nonces() = 0;
virtual bool verify_signature(
@ -63,17 +64,19 @@ namespace ccf
crypto::Sha256Hash& root,
uint32_t sig_size,
uint8_t* sig) = 0;
virtual void sign_view_change(
ViewChange& view_change,
virtual void sign_view_change_request(
ViewChangeRequest& view_change,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root) = 0;
virtual bool verify_view_change(
ViewChange& view_change,
kv::Consensus::SeqNo seqno) = 0;
virtual bool verify_view_change_request(
ViewChangeRequest& view_change,
kv::NodeId from,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root) = 0;
kv::Consensus::SeqNo seqno) = 0;
virtual void write_view_change_confirmation(
ccf::ViewChangeConfirmation& new_view) = 0;
virtual bool verify_view_change_request_confirmation(
ccf::ViewChangeConfirmation& new_view, kv::NodeId from) = 0;
};
class ProgressTrackerStoreAdapter : public ProgressTrackerStore
@ -84,12 +87,14 @@ namespace ccf
tls::KeyPair& kp_,
ccf::Nodes& nodes_,
ccf::BackupSignaturesMap& backup_signatures_,
aft::RevealedNoncesMap& revealed_nonces_) :
aft::RevealedNoncesMap& revealed_nonces_,
ccf::NewViewsMap& new_views_) :
store(store_),
kp(kp_),
nodes(nodes_),
backup_signatures(backup_signatures_),
revealed_nonces(revealed_nonces_)
revealed_nonces(revealed_nonces_),
new_views(new_views_)
{}
void write_backup_signatures(ccf::BackupSignatures& sig_value) override
@ -119,6 +124,19 @@ namespace ccf
return sigs;
}
std::optional<ccf::ViewChangeConfirmation> get_new_view() override
{
kv::Tx tx(&store);
auto new_views_tv = tx.get_view(new_views);
auto new_view = new_views_tv->get(0);
if (!new_view.has_value())
{
LOG_FAIL_FMT("No new_view found in new_view map");
throw ccf::ccf_logic_error("No new_view found in new_view map");
}
return new_view;
}
void write_nonces(aft::RevealedNonces& nonces) override
{
kv::Tx tx(&store);
@ -173,26 +191,22 @@ namespace ccf
root.h.data(), root.h.size(), sig, sig_size);
}
void sign_view_change(
ViewChange& view_change,
void sign_view_change_request(
ViewChangeRequest& view_change,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root) override
kv::Consensus::SeqNo seqno) override
{
crypto::Sha256Hash h = hash_view_change(view_change, view, seqno, root);
crypto::Sha256Hash h = hash_view_change(view_change, view, seqno);
view_change.signature = kp.sign_hash(h.h.data(), h.h.size());
}
bool verify_view_change(
ViewChange& view_change,
bool verify_view_change_request(
ViewChangeRequest& view_change,
kv::NodeId from,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root
) override
kv::Consensus::SeqNo seqno) override
{
crypto::Sha256Hash h = hash_view_change(view_change, view, seqno, root);
crypto::Sha256Hash h = hash_view_change(view_change, view, seqno);
kv::Tx tx(&store);
auto ni_tv = tx.get_view(nodes);
@ -211,24 +225,82 @@ namespace ccf
view_change.signature.size());
}
bool verify_view_change_request_confirmation(
ViewChangeConfirmation& new_view, kv::NodeId from) override
{
kv::Tx tx(&store);
auto ni_tv = tx.get_view(nodes);
auto ni = ni_tv->get(from);
if (!ni.has_value())
{
LOG_FAIL_FMT("No node info, and therefore no cert for node {}", from);
return false;
}
tls::VerifierPtr from_cert = tls::make_verifier(ni.value().cert);
auto h = hash_new_view(new_view);
return from_cert->verify_hash(
h.h.data(),
h.h.size(),
new_view.signature.data(),
new_view.signature.size());
}
void write_view_change_confirmation(
ccf::ViewChangeConfirmation& new_view) override
{
kv::Tx tx(&store);
auto new_views_tv = tx.get_view(new_views);
crypto::Sha256Hash h = hash_new_view(new_view);
new_view.signature = kp.sign_hash(h.h.data(), h.h.size());
new_views_tv->put(0, new_view);
auto r = tx.commit();
if (r != kv::CommitSuccess::OK)
{
LOG_FAIL_FMT(
"Failed to write new_view, view:{}, seqno:{}",
new_view.view,
new_view.seqno);
throw ccf::ccf_logic_error(fmt::format(
"Failed to write new_view, view:{}, seqno:{}",
new_view.view,
new_view.seqno));
}
}
crypto::Sha256Hash hash_new_view(ccf::ViewChangeConfirmation& new_view)
{
crypto::CSha256Hash ch;
ch.update(new_view.view);
ch.update(new_view.seqno);
for (auto it : new_view.view_change_messages)
{
ch.update(it.second.signature);
}
return ch.finalize();
}
private:
kv::AbstractStore& store;
tls::KeyPair& kp;
ccf::Nodes& nodes;
ccf::BackupSignaturesMap& backup_signatures;
aft::RevealedNoncesMap& revealed_nonces;
ccf::NewViewsMap& new_views;
crypto::Sha256Hash hash_view_change(
const ViewChange& v,
const ViewChangeRequest& v,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root) const
kv::Consensus::SeqNo seqno) const
{
crypto::CSha256Hash ch;
ch.update(view);
ch.update(seqno);
ch.update(root);
for (auto& s : v.signatures)
{
@ -238,4 +310,13 @@ namespace ccf
return ch.finalize();
}
};
static constexpr uint32_t get_message_threshold(uint32_t node_count)
{
uint32_t f = 0;
for (; 3 * f + 1 < node_count; ++f)
;
return 2 * f + 1;
}
}

Просмотреть файл

@ -132,6 +132,15 @@ namespace aft
return {seqno_last_signature, time_last_signature};
}
void clear()
{
requests.clear();
requests_list.clear();
hashes_without_requests.clear();
hashes_without_requests_list.clear();
}
private:
std::multiset<Request*, RequestComp> requests;
snmalloc::DLList<Request, std::nullptr_t, true> requests_list;

Просмотреть файл

@ -19,28 +19,36 @@ public:
MAKE_MOCK1(write_backup_signatures, void(ccf::BackupSignatures&), override);
MAKE_MOCK0(
get_backup_signatures, std::optional<ccf::BackupSignatures>(), override);
MAKE_MOCK0(
get_new_view, std::optional<ccf::ViewChangeConfirmation>(), override);
MAKE_MOCK1(write_nonces, void(aft::RevealedNonces&), override);
MAKE_MOCK0(get_nonces, std::optional<aft::RevealedNonces>(), override);
MAKE_MOCK4(
verify_signature,
bool(kv::NodeId, crypto::Sha256Hash&, uint32_t, uint8_t*),
override);
MAKE_MOCK4(
sign_view_change,
MAKE_MOCK3(
sign_view_change_request,
void(
ccf::ViewChange& view_change,
ccf::ViewChangeRequest& view_change,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root),
kv::Consensus::SeqNo seqno),
override);
MAKE_MOCK5(
verify_view_change,
MAKE_MOCK4(
verify_view_change_request,
bool(
ccf::ViewChange& view_change,
ccf::ViewChangeRequest& view_change,
kv::NodeId from,
kv::Consensus::View view,
kv::Consensus::SeqNo seqno,
crypto::Sha256Hash& root),
kv::Consensus::SeqNo seqno),
override);
MAKE_MOCK2(
verify_view_change_request_confirmation,
bool(ccf::ViewChangeConfirmation& new_view, kv::NodeId from),
override);
MAKE_MOCK1(
write_view_change_confirmation,
void(ccf::ViewChangeConfirmation& new_view),
override);
};
@ -348,7 +356,8 @@ TEST_CASE("View Changes")
REQUIRE_CALL(store_mock, verify_signature(_, _, _, _))
.RETURN(true)
.TIMES(AT_LEAST(2));
REQUIRE_CALL(store_mock, sign_view_change(_, _, _, _)).TIMES(AT_LEAST(2));
REQUIRE_CALL(store_mock, sign_view_change_request(_, _, _))
.TIMES(AT_LEAST(2));
auto result = pt.record_primary(
{view, seqno}, 0, root, primary_sig, hashed_nonce, node_count);
REQUIRE(result == kv::TxHistory::Result::OK);
@ -387,7 +396,8 @@ TEST_CASE("View Changes")
REQUIRE_CALL(store_mock, verify_signature(_, _, _, _))
.RETURN(true)
.TIMES(AT_LEAST(2));
REQUIRE_CALL(store_mock, sign_view_change(_, _, _, _)).TIMES(AT_LEAST(2));
REQUIRE_CALL(store_mock, sign_view_change_request(_, _, _))
.TIMES(AT_LEAST(2));
auto result = pt.record_primary(
{view, new_seqno}, 0, root, primary_sig, hashed_nonce, node_count);
REQUIRE(result == kv::TxHistory::Result::OK);
@ -428,7 +438,8 @@ TEST_CASE("View Changes")
REQUIRE_CALL(store_mock, verify_signature(_, _, _, _))
.RETURN(true)
.TIMES(AT_LEAST(2));
REQUIRE_CALL(store_mock, sign_view_change(_, _, _, _)).TIMES(AT_LEAST(2));
REQUIRE_CALL(store_mock, sign_view_change_request(_, _, _))
.TIMES(AT_LEAST(2));
auto result = pt.record_primary(
{view, new_seqno}, 0, root, primary_sig, hashed_nonce, node_count);
REQUIRE(result == kv::TxHistory::Result::OK);
@ -459,7 +470,7 @@ TEST_CASE("Serialization")
std::vector<uint8_t> serialized;
INFO("view-change serialization");
{
ccf::ViewChange v;
ccf::ViewChangeRequest v;
for (uint32_t i = 10; i < 110; i += 10)
{
@ -482,7 +493,7 @@ TEST_CASE("Serialization")
{
const uint8_t* data = serialized.data();
size_t size = serialized.size();
ccf::ViewChange v = ccf::ViewChange::deserialize(data, size);
ccf::ViewChangeRequest v = ccf::ViewChangeRequest::deserialize(data, size);
REQUIRE(v.signatures.size() == 10);
for (uint32_t i = 1; i < 11; ++i)
@ -501,11 +512,11 @@ TEST_CASE("Serialization")
}
}
TEST_CASE("view-change-tracker tests")
TEST_CASE("view-change-tracker timeout tests")
{
INFO("Check timeout works correctly");
{
aft::ViewChangeTracker vct(0, 0, std::chrono::seconds(10));
aft::ViewChangeTracker vct(0, std::chrono::seconds(10));
REQUIRE(vct.should_send_view_change(std::chrono::seconds(1)) == false);
REQUIRE(vct.get_target_view() == 0);
REQUIRE(vct.should_send_view_change(std::chrono::seconds(11)));
@ -517,12 +528,47 @@ TEST_CASE("view-change-tracker tests")
}
}
TEST_CASE("view-change-tracker statemachine tests")
{
ccf::ViewChangeRequest v;
kv::Consensus::View view = 1;
kv::Consensus::SeqNo seqno = 1;
uint32_t node_count = 4;
INFO("Can trigger view change");
{
aft::ViewChangeTracker vct(0, std::chrono::seconds(10));
for (uint32_t i = 0; i < node_count; ++i)
{
auto r = vct.add_request_view_change(v, i, view, seqno, node_count);
if (i == 2)
{
REQUIRE(
r == aft::ViewChangeTracker::ResultAddView::APPEND_NEW_VIEW_MESSAGE);
}
else
{
REQUIRE(r == aft::ViewChangeTracker::ResultAddView::OK);
}
}
}
INFO("Can differentiate view change for different view");
{
aft::ViewChangeTracker vct(0, std::chrono::seconds(10));
for (uint32_t i = 0; i < node_count; ++i)
{
auto r = vct.add_request_view_change(v, i, i, seqno, node_count);
REQUIRE(r == aft::ViewChangeTracker::ResultAddView::OK);
}
}
}
TEST_CASE("test progress_tracker apply_view_change")
{
using trompeloeil::_;
uint32_t node_id = 1;
crypto::Sha256Hash root;
auto store = std::make_unique<StoreMock>();
StoreMock& store_mock = *store.get();
auto pt = std::make_unique<ccf::ProgressTracker>(std::move(store), node_id);
@ -537,50 +583,44 @@ TEST_CASE("test progress_tracker apply_view_change")
INFO("View-change signature does not verify");
{
REQUIRE_CALL(store_mock, verify_view_change(_, _, _, _, _)).RETURN(false);
ccf::ViewChange v;
bool result = pt->apply_view_change_message(v, 1, 1, 1, root);
REQUIRE_CALL(store_mock, verify_view_change_request(_, _, _, _))
.RETURN(false);
ccf::ViewChangeRequest v;
bool result = pt->apply_view_change_message(v, 1, 1, 1);
REQUIRE(result == false);
}
INFO("Unknown seqno");
{
REQUIRE_CALL(store_mock, verify_view_change(_, _, _, _, _)).RETURN(true);
ccf::ViewChange v;
bool result = pt->apply_view_change_message(v, 1, 1, 999, root);
REQUIRE(result == false);
}
INFO("Incorrect root");
{
REQUIRE_CALL(store_mock, verify_view_change(_, _, _, _, _)).RETURN(true);
ccf::ViewChange v;
crypto::Sha256Hash rootA;
rootA.h.fill(1);
bool result = pt->apply_view_change_message(v, 1, 1, 42, rootA);
REQUIRE_CALL(store_mock, verify_view_change_request(_, _, _, _))
.RETURN(true);
ccf::ViewChangeRequest v;
bool result = pt->apply_view_change_message(v, 1, 1, 999);
REQUIRE(result == false);
}
INFO("View-change matches - known node");
{
REQUIRE_CALL(store_mock, verify_view_change(_, _, _, _, _)).RETURN(true);
REQUIRE_CALL(store_mock, verify_view_change_request(_, _, _, _))
.RETURN(true);
REQUIRE_CALL(store_mock, verify_signature(_, _, _, _)).RETURN(true);
ccf::ViewChange v;
ccf::ViewChangeRequest v;
v.signatures.push_back(ccf::NodeSignature(0));
bool result = pt->apply_view_change_message(v, 1, 1, 42, root);
bool result = pt->apply_view_change_message(v, 1, 1, 42);
REQUIRE(result);
}
INFO("View-change matches - unknown node");
{
REQUIRE_CALL(store_mock, verify_view_change(_, _, _, _, _)).RETURN(true);
REQUIRE_CALL(store_mock, verify_view_change_request(_, _, _, _))
.RETURN(true);
REQUIRE_CALL(store_mock, verify_signature(_, _, _, _)).RETURN(false);
ccf::ViewChange v;
ccf::ViewChangeRequest v;
v.signatures.push_back(ccf::NodeSignature(5));
bool result = pt->apply_view_change_message(v, 1, 1, 42, root);
bool result = pt->apply_view_change_message(v, 1, 1, 42);
REQUIRE(result == false);
}
}

Просмотреть файл

@ -11,12 +11,12 @@
namespace ccf
{
struct ViewChange
struct ViewChangeRequest
{
std::vector<NodeSignature> signatures;
std::vector<uint8_t> signature;
ViewChange() = default;
ViewChangeRequest() = default;
size_t get_serialized_size() const
{
@ -46,9 +46,9 @@ namespace ccf
serialized::write(data, size, signature.data(), sig_size);
}
static ViewChange deserialize(const uint8_t*& data, size_t& size)
static ViewChangeRequest deserialize(const uint8_t*& data, size_t& size)
{
ViewChange v;
ViewChangeRequest v;
size_t num_sigs = serialized::read<size_t>(data, size);
for (size_t i = 0; i < num_sigs; ++i)
{
@ -60,5 +60,31 @@ namespace ccf
return v;
}
MSGPACK_DEFINE(signatures, signature);
};
DECLARE_JSON_TYPE(ViewChangeRequest);
DECLARE_JSON_REQUIRED_FIELDS(ViewChangeRequest, signatures, signature);
struct ViewChangeConfirmation
{
kv::Consensus::View view = 0;
kv::Consensus::SeqNo seqno = 0;
std::vector<uint8_t> signature;
std::map<kv::NodeId, ViewChangeRequest> view_change_messages;
ViewChangeConfirmation() = default;
ViewChangeConfirmation(
kv::Consensus::View view_, kv::Consensus::SeqNo seqno_) :
view(view_),
seqno(seqno_)
{}
MSGPACK_DEFINE(view, seqno, signature, view_change_messages);
};
DECLARE_JSON_TYPE(ViewChangeConfirmation);
DECLARE_JSON_REQUIRED_FIELDS(
ViewChangeConfirmation, view, seqno, signature, view_change_messages);
using NewViewsMap = kv::Map<ObjectId, ViewChangeConfirmation>;
}

Просмотреть файл

@ -79,19 +79,13 @@ def run(args):
# Number of nodes F to stop until network cannot make progress
nodes_to_stop = math.ceil(len(args.nodes) / 2)
if args.consensus == "bft":
# nodes_to_stop = math.ceil(len(args.nodes) / 3)
LOG.error("aaaaaaa {}", math.ceil(len(args.nodes) / 3))
# View Change implementation is in progress.
# https://github.com/microsoft/CCF/issues/1709
nodes_to_stop = 1
nodes_to_stop = math.ceil(len(args.nodes) / 3)
primary_is_known = True
for node_to_stop in range(nodes_to_stop):
# Note that for the first iteration, the primary is known in advance anyway
LOG.debug("Find freshly elected primary")
# After a view change in pbft, finding the new primary takes longer
# After a view change in bft, finding the new primary takes longer
primary, current_view = network.find_primary(
timeout=(30 if args.consensus == "bft" else 3)
)