[multi-thread] Reset history after verifying snapshot (#2180)

This commit is contained in:
Julien Maffre 2021-02-23 23:51:31 +00:00 коммит произвёл GitHub
Родитель b230f23895
Коммит 6c2eb4caf4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 120 добавлений и 78 удалений

Просмотреть файл

@ -70,6 +70,7 @@ namespace kv
MapHooks map_hooks;
std::shared_ptr<Consensus> consensus = nullptr;
std::shared_ptr<TxHistory> history = nullptr;
std::shared_ptr<ccf::ProgressTracker> progress_tracker = nullptr;
EncryptorPtr encryptor = nullptr;

Просмотреть файл

@ -482,7 +482,7 @@ namespace ccf
crypto::KeyPairBase& kp_,
size_t sig_tx_interval_ = 0,
size_t sig_ms_interval_ = 0,
bool signature_timer = true) :
bool signature_timer = false) :
store(store_),
id(id_),
kp(kp_),

Просмотреть файл

@ -35,6 +35,21 @@
namespace ccf
{
inline std::shared_ptr<kv::Store> make_store(
const ConsensusType& consensus_type)
{
if (consensus_type == ConsensusType::CFT)
{
return std::make_shared<kv::Store>(
aft::replicate_type_raft, aft::replicated_tables_raft);
}
else
{
return std::make_shared<kv::Store>(
aft::replicate_type_bft, aft::replicated_tables_bft);
}
}
struct NetworkTables
{
std::shared_ptr<kv::Store> tables;
@ -103,12 +118,7 @@ namespace ccf
NewViewsMap new_views_map;
NetworkTables(const ConsensusType& consensus_type = ConsensusType::CFT) :
tables(
(consensus_type == ConsensusType::CFT) ?
std::make_shared<kv::Store>(
aft::replicate_type_raft, aft::replicated_tables_raft) :
std::make_shared<kv::Store>(
aft::replicate_type_bft, aft::replicated_tables_bft)),
tables(make_store(consensus_type)),
members(Tables::MEMBERS),
member_certs(Tables::MEMBER_CERT_DERS),
member_digests(Tables::MEMBER_DIGESTS),

Просмотреть файл

@ -158,8 +158,6 @@ namespace ccf
//
NodeInfoNetwork node_info_network;
std::shared_ptr<kv::Store> recovery_store;
std::shared_ptr<kv::TxHistory> recovery_history;
std::shared_ptr<kv::AbstractTxEncryptor> recovery_encryptor;
kv::Version recovery_v;
crypto::Sha256Hash recovery_root;
@ -168,25 +166,37 @@ namespace ccf
RecoveredEncryptedLedgerSecrets recovery_ledger_secrets;
consensus::Index ledger_idx = 0;
//
// JWT key auto-refresh
//
std::shared_ptr<JwtKeyAutoRefresh> jwt_key_auto_refresh;
struct StartupSnapshotInfo
{
std::vector<uint8_t>& raw;
consensus::Index seqno;
consensus::Index evidence_seqno;
bool has_evidence = false;
// Store used to verify a snapshot (either created fresh when a node joins
// from a snapshot or points to the main store when recovering from a
// snapshot)
std::shared_ptr<kv::Store> store = nullptr;
// The snapshot to startup from (on join or recovery) is only valid once a
// signature ledger entry confirms that the snapshot evidence was
// committed
bool has_evidence = false;
bool is_evidence_committed = false;
StartupSnapshotInfo(
const std::shared_ptr<kv::Store>& store_,
std::vector<uint8_t>& raw_,
consensus::Index seqno_,
consensus::Index evidence_seqno_) :
raw(raw_),
seqno(seqno_),
evidence_seqno(evidence_seqno_)
evidence_seqno(evidence_seqno_),
store(store_)
{}
bool is_snapshot_verified()
@ -201,12 +211,47 @@ namespace ccf
};
std::unique_ptr<StartupSnapshotInfo> startup_snapshot_info = nullptr;
void initialise_startup_snapshot()
std::shared_ptr<kv::AbstractTxEncryptor> make_encryptor()
{
#ifdef USE_NULL_ENCRYPTOR
return std::make_shared<kv::NullTxEncryptor>();
#else
return std::make_shared<NodeEncryptor>(network.ledger_secrets);
#endif
}
void initialise_startup_snapshot(bool recovery = false)
{
std::shared_ptr<kv::Store> snapshot_store;
if (!recovery)
{
// Create a new store to verify the snapshot only
network.ledger_secrets = std::make_shared<LedgerSecrets>();
snapshot_store = make_store(network.consensus_type);
auto snapshot_history = std::make_shared<MerkleTxHistory>(
*snapshot_store.get(),
self,
*node_sign_kp,
sig_tx_interval,
sig_ms_interval,
false /* No signature timer on snapshot_history */);
auto snapshot_encryptor = make_encryptor();
snapshot_store->set_history(snapshot_history);
snapshot_store->set_encryptor(snapshot_encryptor);
}
else
{
snapshot_store = network.tables;
}
LOG_INFO_FMT(
"Deserialising public snapshot ({})", config.startup_snapshot.size());
kv::ConsensusHookPtrs hooks;
auto rc = network.tables->deserialise_snapshot(
auto rc = snapshot_store->deserialise_snapshot(
config.startup_snapshot, hooks, &view_history, true);
if (rc != kv::ApplyResult::PASS)
{
@ -216,22 +261,18 @@ namespace ccf
LOG_INFO_FMT(
"Public snapshot deserialised at seqno {}",
network.tables->current_version());
snapshot_store->current_version());
ledger_idx = network.tables->current_version();
ledger_idx = snapshot_store->current_version();
last_recovered_signed_idx = ledger_idx;
startup_snapshot_info = std::make_unique<StartupSnapshotInfo>(
snapshot_store,
config.startup_snapshot,
ledger_idx,
config.startup_snapshot_evidence_seqno);
}
//
// JWT key auto-refresh
//
std::shared_ptr<JwtKeyAutoRefresh> jwt_key_auto_refresh;
public:
NodeState(
ringbuffer::AbstractWriterFactory& writer_factory,
@ -355,16 +396,7 @@ namespace ccf
if (!config.startup_snapshot.empty())
{
setup_history();
// It is necessary to give an encryptor to the store for it to
// deserialise the public domain when recovering the public ledger
network.ledger_secrets = std::make_shared<LedgerSecrets>();
setup_encryptor();
setup_snapshotter();
initialise_startup_snapshot();
sm.advance(State::verifyingSnapshot);
}
else
@ -397,7 +429,7 @@ namespace ccf
if (from_snapshot)
{
initialise_startup_snapshot();
initialise_startup_snapshot(true);
snapshotter->set_last_snapshot_idx(ledger_idx);
}
@ -698,12 +730,21 @@ namespace ccf
void recover_public_ledger_entry(const std::vector<uint8_t>& ledger_entry)
{
std::lock_guard<SpinLock> guard(lock);
if (
!sm.check(State::readingPublicLedger) &&
!sm.check(State::verifyingSnapshot))
std::shared_ptr<kv::Store> store;
if (sm.check(State::readingPublicLedger))
{
// In recovery, use the main store to deserialise public entries
store = network.tables;
}
else if (sm.check(State::verifyingSnapshot))
{
store = startup_snapshot_info->store;
}
else
{
throw std::logic_error(fmt::format(
"Node should be in state {} or {} to recover public ledger entry",
"Node should be in state {} or {} to access public store",
State::readingPublicLedger,
State::verifyingSnapshot));
}
@ -712,12 +753,12 @@ namespace ccf
"Deserialising public ledger entry ({})", ledger_entry.size());
// When reading the public ledger, deserialise in the real store
auto r = network.tables->apply(ledger_entry, ConsensusType::CFT, true);
auto r = store->apply(ledger_entry, ConsensusType::CFT, true);
auto result = r->execute();
if (result == kv::ApplyResult::FAIL)
{
LOG_FAIL_FMT("Failed to deserialise entry in public ledger");
network.tables->rollback(ledger_idx - 1);
store->rollback(ledger_idx - 1);
if (sm.check(State::verifyingSnapshot))
{
throw std::logic_error(
@ -737,8 +778,8 @@ namespace ccf
if (result == kv::ApplyResult::PASS_SIGNATURE)
{
// If the ledger entry is a signature, it is safe to compact the store
network.tables->compact(ledger_idx);
auto tx = network.tables->create_tx();
store->compact(ledger_idx);
auto tx = store->create_tx();
GenesisGenerator g(network, tx);
auto last_sig = g.get_last_signature();
@ -775,17 +816,20 @@ namespace ccf
startup_snapshot_info->is_evidence_committed = true;
}
// Inform snapshotter of all signature entries so that this node can
// continue generating snapshots at the correct interval once the
// recovery is complete
snapshotter->record_committable(ledger_idx);
snapshotter->commit(ledger_idx);
if (sm.check(State::readingPublicLedger))
{
// Inform snapshotter of all signature entries so that this node can
// continue generating snapshots at the correct interval once the
// recovery is complete
snapshotter->record_committable(ledger_idx);
snapshotter->commit(ledger_idx);
}
}
else if (
result == kv::ApplyResult::PASS_SNAPSHOT_EVIDENCE &&
startup_snapshot_info)
{
auto tx = network.tables->create_read_only_tx();
auto tx = store->create_read_only_tx();
auto snapshot_evidence = tx.ro(network.snapshot_evidence);
if (ledger_idx == startup_snapshot_info->evidence_seqno)
@ -821,7 +865,6 @@ namespace ccf
throw std::logic_error("Snapshot evidence was not committed in ledger");
}
network.tables->clear();
ledger_truncate(startup_snapshot_info->seqno);
sm.advance(State::pending);
@ -989,7 +1032,8 @@ namespace ccf
recovery_v));
}
auto h = dynamic_cast<MerkleTxHistory*>(recovery_history.get());
auto h =
dynamic_cast<MerkleTxHistory*>(recovery_store->get_history().get());
if (h->get_replicated_state_root() != recovery_root)
{
throw std::logic_error(fmt::format(
@ -998,7 +1042,6 @@ namespace ccf
}
network.tables->swap_private_maps(*recovery_store.get());
recovery_history.reset();
recovery_store.reset();
// Raft should deserialise all security domains when network is opened
@ -1103,7 +1146,7 @@ namespace ccf
recovery_store = std::make_shared<kv::Store>(
true /* Check transactions in order */,
true /* Make use of historical secrets */);
recovery_history = std::make_shared<MerkleTxHistory>(
auto recovery_history = std::make_shared<MerkleTxHistory>(
*recovery_store.get(),
self,
*node_sign_kp,
@ -1111,12 +1154,7 @@ namespace ccf
sig_ms_interval,
false /* No signature timer on recovery_history */);
#ifdef USE_NULL_ENCRYPTOR
recovery_encryptor = std::make_shared<kv::NullTxEncryptor>();
#else
recovery_encryptor =
std::make_shared<NodeEncryptor>(network.ledger_secrets);
#endif
auto recovery_encryptor = make_encryptor();
recovery_store->set_history(recovery_history);
recovery_store->set_encryptor(recovery_encryptor);
@ -1770,14 +1808,13 @@ namespace ccf
void setup_history()
{
// This function can be called once the node has started up and before
// it has joined the service.
history = std::make_shared<MerkleTxHistory>(
*network.tables.get(),
self,
*node_sign_kp,
sig_tx_interval,
sig_ms_interval);
sig_ms_interval,
true);
network.tables->set_history(history);
}
@ -1785,14 +1822,8 @@ namespace ccf
void setup_encryptor()
{
// This function makes use of ledger secrets and should be called once
// the node has joined the service (either via start_network() or
// join_network())
#ifdef USE_NULL_ENCRYPTOR
encryptor = std::make_shared<kv::NullTxEncryptor>();
#else
encryptor = std::make_shared<NodeEncryptor>(network.ledger_secrets);
#endif
// the node has joined the service
encryptor = make_encryptor();
network.tables->set_encryptor(encryptor);
}
@ -1815,7 +1846,7 @@ namespace ccf
void setup_snapshotter()
{
snapshotter = std::make_shared<Snapshotter>(
writer_factory, network, config.snapshot_tx_interval);
writer_factory, network.tables, config.snapshot_tx_interval);
}
void setup_tracker_store()

Просмотреть файл

@ -8,6 +8,7 @@
#include "ds/logger.h"
#include "ds/spin_lock.h"
#include "ds/thread_messaging.h"
#include "entities.h"
#include "kv/kv_types.h"
#include "kv/tx.h"
#include "node/network_state.h"
@ -27,7 +28,7 @@ namespace ccf
ringbuffer::WriterPtr to_host;
SpinLock lock;
NetworkState& network;
std::shared_ptr<kv::Store> store;
// Snapshots are never generated by default (e.g. during public recovery)
size_t snapshot_tx_interval = max_tx_interval;
@ -90,11 +91,10 @@ namespace ccf
{
auto snapshot_version = snapshot->get_version();
auto serialised_snapshot =
network.tables->serialise_snapshot(std::move(snapshot));
auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
auto tx = network.tables->create_tx();
auto evidence = tx.rw(network.snapshot_evidence);
auto tx = store->create_tx();
auto evidence = tx.rw<SnapshotEvidence>(Tables::SNAPSHOT_EVIDENCE);
auto snapshot_hash = crypto::Sha256Hash(serialised_snapshot);
evidence->put(0, {snapshot_hash, snapshot_version});
@ -130,10 +130,10 @@ namespace ccf
public:
Snapshotter(
ringbuffer::AbstractWriterFactory& writer_factory,
NetworkState& network_,
std::shared_ptr<kv::Store>& store_,
size_t snapshot_tx_interval_) :
to_host(writer_factory.create_writer_to_outside()),
network(network_),
store(store_),
snapshot_tx_interval(snapshot_tx_interval_)
{
next_snapshot_indices.push_back(last_snapshot_idx);
@ -199,7 +199,7 @@ namespace ccf
auto msg =
std::make_unique<threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
msg->data.self = shared_from_this();
msg->data.snapshot = network.tables->snapshot(idx);
msg->data.snapshot = store->snapshot(idx);
static uint32_t generation_count = 0;
threading::ThreadMessaging::thread_messaging.add_task(
@ -278,4 +278,4 @@ namespace ccf
}
}
};
}
}

Просмотреть файл

@ -71,7 +71,7 @@ TEST_CASE("Regular snapshotting")
issue_transactions(network, snapshot_tx_interval * interval_count);
auto snapshotter = std::make_shared<ccf::Snapshotter>(
*writer_factory, network, snapshot_tx_interval);
*writer_factory, network.tables, snapshot_tx_interval);
REQUIRE_FALSE(snapshotter->record_committable(snapshot_tx_interval - 1));
REQUIRE(snapshotter->record_committable(snapshot_tx_interval));
@ -115,7 +115,7 @@ TEST_CASE("Commit snapshot evidence")
issue_transactions(network, snapshot_tx_interval);
auto snapshotter = std::make_shared<ccf::Snapshotter>(
*writer_factory, network, snapshot_tx_interval);
*writer_factory, network.tables, snapshot_tx_interval);
INFO("Generate snapshot");
{
@ -161,7 +161,7 @@ TEST_CASE("Rollback before evidence is committed")
issue_transactions(network, snapshot_tx_interval);
auto snapshotter = std::make_shared<ccf::Snapshotter>(
*writer_factory, network, snapshot_tx_interval);
*writer_factory, network.tables, snapshot_tx_interval);
INFO("Generate snapshot");
{