This commit is contained in:
Amaury Chamayou 2023-07-12 18:47:13 +01:00 коммит произвёл GitHub
Родитель 3773c70d57
Коммит a554a30e01
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 166 добавлений и 88 удалений

Просмотреть файл

@ -1 +1 @@
THIS looks like a job for Threading Canar!y!1!
THIS looks like a job for Threading Canar!y!1!..

Просмотреть файл

@ -41,7 +41,8 @@ namespace kv
const MapCollection& new_maps,
const std::optional<Version>& new_maps_conflict_version,
bool track_read_versions,
bool track_deletes_on_missing_keys)
bool track_deletes_on_missing_keys,
const std::optional<Version>& expected_rollback_count = std::nullopt)
{
// All maps with pending writes are locked, transactions are prepared
// and possibly committed, and then all maps with pending writes are
@ -70,7 +71,27 @@ namespace kv
}
bool ok = true;
if (has_writes)
if (expected_rollback_count.has_value() && !changes.empty())
{
// expected_rollback_count is only set on signature transactions
// which always contain some writes, and on which all the maps
// point to the same store.
auto store = changes.begin()->second.map->get_store();
if (store != nullptr)
{
// Note that this is done when holding the lock on at least some maps
// through the combination of the changes not being empty, and the
// acquisition of the map locks on line 69. This guarantees atomicity
// with respect to rollbacks, which would acquire the map lock on all
// maps at once to truncate their roll. The net result is that the
// transaction becomes a noop if a rollback occurred between it being
// committed, and the side effects being applied.
ok = store->check_rollback_count(expected_rollback_count.value());
}
}
if (ok && has_writes)
{
for (auto it = views.begin(); it != views.end(); ++it)
{

Просмотреть файл

@ -391,14 +391,21 @@ namespace kv
// never conflict.
class ReservedTx : public CommittableTx
{
private:
Version rollback_count = 0;
public:
ReservedTx(
AbstractStore* _store, Term read_term, const TxID& reserved_tx_id) :
AbstractStore* _store,
Term read_term,
const TxID& reserved_tx_id,
Version rollback_count_) :
CommittableTx(_store)
{
version = reserved_tx_id.version;
pimpl->commit_view = reserved_tx_id.term;
pimpl->read_txid = TxID(read_term, reserved_tx_id.version - 1);
rollback_count = rollback_count_;
}
// Used by frontend to commit reserved transactions
@ -420,7 +427,8 @@ namespace kv
pimpl->created_maps,
version,
track_read_versions,
track_deletes_on_missing_keys);
track_deletes_on_missing_keys,
rollback_count);
success = c.has_value();
if (!success)

Просмотреть файл

@ -418,7 +418,9 @@ namespace kv
const std::vector<uint8_t>& hash_at_snapshot) = 0;
virtual std::vector<uint8_t> get_raw_leaf(uint64_t index) = 0;
virtual void append(const std::vector<uint8_t>& data) = 0;
virtual void append_entry(const crypto::Sha256Hash& digest) = 0;
virtual void append_entry(
const crypto::Sha256Hash& digest,
std::optional<kv::Term> expected_term = std::nullopt) = 0;
virtual void rollback(
const kv::TxID& tx_id, kv::Term term_of_next_version_) = 0;
virtual void compact(Version v) = 0;
@ -721,6 +723,7 @@ namespace kv
const TxID& txid,
std::unique_ptr<PendingTx> pending_tx,
bool globally_committable) = 0;
virtual bool check_rollback_count(Version count) = 0;
virtual std::unique_ptr<AbstractSnapshot> snapshot_unsafe_maps(
Version v) = 0;

Просмотреть файл

@ -894,6 +894,10 @@ namespace kv
Version previous_rollback_count = 0;
ccf::View replication_view = 0;
std::vector<std::tuple<std::unique_ptr<PendingTx>, bool>>
contiguous_pending_txs;
auto h = get_history();
{
std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
if (txid.term != term_of_next_version && get_consensus()->is_primary())
@ -918,8 +922,6 @@ namespace kv
std::make_tuple(std::move(pending_tx), globally_committable)});
LOG_TRACE_FMT("Inserting pending tx at {}", txid.version);
auto h = get_history();
auto c = get_consensus();
for (Version offset = 1; true; ++offset)
@ -938,53 +940,62 @@ namespace kv
break;
}
auto& [pending_tx_, committable_] = search->second;
auto
[success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
pending_tx_->call();
auto data_shared =
std::make_shared<std::vector<uint8_t>>(std::move(data_));
auto hooks_shared =
std::make_shared<kv::ConsensusHookPtrs>(std::move(hooks_));
// NB: this cannot happen currently. Regular Tx only make it here if
// they did succeed, and signatures cannot conflict because they
// execute in order with a read_version that's version - 1, so even
// two contiguous signatures are fine
if (success_ != CommitResult::SUCCESS)
{
LOG_DEBUG_FMT("Failed Tx commit {}", last_replicated + offset);
}
if (h)
{
h->append_entry(ccf::entry_leaf(
*data_shared, commit_evidence_digest_, claims_digest_));
}
LOG_DEBUG_FMT(
"Batching {} ({}) during commit of {}.{}",
last_replicated + offset,
data_shared->size(),
txid.term,
txid.version);
batch.emplace_back(
last_replicated + offset, data_shared, committable_, hooks_shared);
contiguous_pending_txs.emplace_back(std::move(search->second));
pending_txs.erase(search);
}
if (batch.size() == 0)
{
return CommitResult::SUCCESS;
}
previous_rollback_count = rollback_count;
previous_last_replicated = last_replicated;
next_last_replicated = last_replicated + batch.size();
next_last_replicated = last_replicated + contiguous_pending_txs.size();
replication_view = term_of_next_version;
}
// Release version lock
if (contiguous_pending_txs.size() == 0)
{
return CommitResult::SUCCESS;
}
size_t offset = 1;
for (auto& [pending_tx_, committable_] : contiguous_pending_txs)
{
auto
[success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
pending_tx_->call();
auto data_shared =
std::make_shared<std::vector<uint8_t>>(std::move(data_));
auto hooks_shared =
std::make_shared<kv::ConsensusHookPtrs>(std::move(hooks_));
// NB: this cannot happen currently. Regular Tx only make it here if
// they did succeed, and signatures cannot conflict because they
// execute in order with a read_version that's version - 1, so even
// two contiguous signatures are fine
if (success_ != CommitResult::SUCCESS)
{
LOG_DEBUG_FMT("Failed Tx commit {}", last_replicated + offset);
}
if (h)
{
h->append_entry(
ccf::entry_leaf(
*data_shared, commit_evidence_digest_, claims_digest_),
replication_view);
}
LOG_DEBUG_FMT(
"Batching {} ({}) during commit of {}.{}",
last_replicated + offset,
data_shared->size(),
txid.term,
txid.version);
batch.emplace_back(
last_replicated + offset, data_shared, committable_, hooks_shared);
offset++;
}
if (c->replicate(batch, replication_view))
{
@ -1037,6 +1048,12 @@ namespace kv
maps_lock.unlock();
}
bool check_rollback_count(Version count) override
{
std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
return rollback_count == count;
}
std::tuple<Version, Version> next_version(bool commit_new_map) override
{
std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
@ -1234,7 +1251,7 @@ namespace kv
{
// version_lock should already been acquired in case term_of_last_version
// is incremented.
return ReservedTx(this, term_of_last_version, tx_id);
return ReservedTx(this, term_of_last_version, tx_id, rollback_count);
}
virtual void set_flag(Flag f) override

Просмотреть файл

@ -23,7 +23,6 @@ namespace kv::test
std::vector<BatchVector::value_type> replica;
ccf::TxID committed_txid = {};
ccf::View current_view = 0;
ccf::SeqNo last_signature = 0;
aft::ViewHistory view_history;

Просмотреть файл

@ -134,7 +134,9 @@ namespace ccf
version++;
}
void append_entry(const crypto::Sha256Hash& digest) override
void append_entry(
const crypto::Sha256Hash& digest,
std::optional<kv::Term> term_of_next_version = std::nullopt) override
{
version++;
}
@ -677,8 +679,15 @@ namespace ccf
std::vector<uint8_t> serialise_tree(size_t to) override
{
std::lock_guard<ccf::pal::Mutex> guard(state_lock);
return replicated_state_tree.serialise(
replicated_state_tree.begin_index(), to);
if (to <= replicated_state_tree.end_index())
{
return replicated_state_tree.serialise(
replicated_state_tree.begin_index(), to);
}
else
{
return {};
}
}
void set_term(kv::Term t) override
@ -784,10 +793,20 @@ namespace ccf
replicated_state_tree.append(rh);
}
void append_entry(const crypto::Sha256Hash& digest) override
void append_entry(
const crypto::Sha256Hash& digest,
std::optional<kv::Term> expected_term_of_next_version =
std::nullopt) override
{
log_hash(digest, APPEND);
std::lock_guard<ccf::pal::Mutex> guard(state_lock);
if (expected_term_of_next_version.has_value())
{
if (expected_term_of_next_version.value() != term_of_next_version)
{
return;
}
}
replicated_state_tree.append(digest);
}

Просмотреть файл

@ -58,6 +58,7 @@ TestState create_and_init_state(bool initialise_ledger_rekey = true)
std::make_shared<ccf::MerkleTxHistory>(*ts.kv_store, node_id, *ts.node_kp);
h->set_endorsed_certificate({});
ts.kv_store->set_history(h);
ts.kv_store->initialise_term(2);
{
INFO("Store the signing node's key");
@ -122,6 +123,7 @@ kv::Version write_transactions(kv::Store& kv_store, size_t tx_count)
REQUIRE(tx.commit() == kv::CommitResult::SUCCESS);
}
REQUIRE(kv_store.current_version() == end);
return kv_store.current_version();
}
@ -215,6 +217,7 @@ std::map<ccf::SeqNo, std::vector<uint8_t>> construct_host_ledger(
std::map<ccf::SeqNo, std::vector<uint8_t>> ledger;
auto next_ledger_entry = consensus->pop_oldest_entry();
auto version = std::get<0>(next_ledger_entry.value());
while (next_ledger_entry.has_value())
{
const auto ib = ledger.insert(std::make_pair(
@ -222,6 +225,11 @@ std::map<ccf::SeqNo, std::vector<uint8_t>> construct_host_ledger(
*std::get<1>(next_ledger_entry.value())));
REQUIRE(ib.second);
next_ledger_entry = consensus->pop_oldest_entry();
if (next_ledger_entry.has_value())
{
REQUIRE(version + 1 == std::get<0>(next_ledger_entry.value()));
version = std::get<0>(next_ledger_entry.value());
}
}
return ledger;

Просмотреть файл

@ -3,6 +3,7 @@
#include "node/history.h"
#include "ccf/app_interface.h"
#include "ccf/ds/logger.h"
#include "ccf/service/tables/nodes.h"
#include "crypto/certs.h"
#include "ds/x509_time_fmt.h"
@ -72,37 +73,38 @@ TEST_CASE("Check signature verification")
{
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv::Store primary_store;
primary_store.set_encryptor(encryptor);
kv::Store backup_store;
backup_store.set_encryptor(encryptor);
ccf::Nodes nodes(ccf::Tables::NODES);
ccf::Signatures signatures(ccf::Tables::SIGNATURES);
auto kp = crypto::make_key_pair();
const auto self_signed = kp->self_sign("CN=Node", valid_from, valid_to);
std::shared_ptr<kv::Consensus> consensus =
std::make_shared<DummyConsensus>(&backup_store);
primary_store.set_consensus(consensus);
std::shared_ptr<kv::Consensus> null_consensus =
std::make_shared<DummyConsensus>(nullptr);
backup_store.set_consensus(null_consensus);
kv::Store primary_store;
primary_store.set_encryptor(encryptor);
constexpr auto store_term = 2;
std::shared_ptr<kv::TxHistory> primary_history =
std::make_shared<ccf::MerkleTxHistory>(
primary_store, kv::test::PrimaryNodeId, *kp);
primary_history->set_endorsed_certificate(self_signed);
primary_store.set_history(primary_history);
primary_store.initialise_term(store_term);
kv::Store backup_store;
backup_store.set_encryptor(encryptor);
std::shared_ptr<kv::TxHistory> backup_history =
std::make_shared<ccf::MerkleTxHistory>(
backup_store, kv::test::FirstBackupNodeId, *kp);
backup_history->set_endorsed_certificate(self_signed);
backup_store.set_history(backup_history);
backup_store.initialise_term(store_term);
ccf::Nodes nodes(ccf::Tables::NODES);
ccf::Signatures signatures(ccf::Tables::SIGNATURES);
std::shared_ptr<kv::Consensus> consensus =
std::make_shared<DummyConsensus>(&backup_store);
primary_store.set_consensus(consensus);
std::shared_ptr<kv::Consensus> null_consensus =
std::make_shared<DummyConsensus>(nullptr);
backup_store.set_consensus(null_consensus);
INFO("Write certificate");
{
@ -135,21 +137,31 @@ TEST_CASE("Check signature verification")
TEST_CASE("Check signing works across rollback")
{
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
auto kp = crypto::make_key_pair();
const auto self_signed = kp->self_sign("CN=Node", valid_from, valid_to);
kv::Store primary_store;
primary_store.set_encryptor(encryptor);
constexpr auto store_term = 2;
std::shared_ptr<kv::TxHistory> primary_history =
std::make_shared<ccf::MerkleTxHistory>(
primary_store, kv::test::PrimaryNodeId, *kp);
primary_history->set_endorsed_certificate(self_signed);
primary_store.set_history(primary_history);
primary_store.initialise_term(store_term);
kv::Store backup_store;
std::shared_ptr<kv::TxHistory> backup_history =
std::make_shared<ccf::MerkleTxHistory>(
backup_store, kv::test::FirstBackupNodeId, *kp);
backup_history->set_endorsed_certificate(self_signed);
backup_store.set_history(backup_history);
backup_store.set_encryptor(encryptor);
backup_store.initialise_term(store_term);
ccf::Nodes nodes(ccf::Tables::NODES);
auto kp = crypto::make_key_pair();
const auto self_signed = kp->self_sign("CN=Node", valid_from, valid_to);
std::shared_ptr<kv::Consensus> consensus =
std::make_shared<DummyConsensus>(&backup_store);
primary_store.set_consensus(consensus);
@ -157,18 +169,6 @@ TEST_CASE("Check signing works across rollback")
std::make_shared<DummyConsensus>(nullptr);
backup_store.set_consensus(null_consensus);
std::shared_ptr<kv::TxHistory> primary_history =
std::make_shared<ccf::MerkleTxHistory>(
primary_store, kv::test::PrimaryNodeId, *kp);
primary_history->set_endorsed_certificate(self_signed);
primary_store.set_history(primary_history);
std::shared_ptr<kv::TxHistory> backup_history =
std::make_shared<ccf::MerkleTxHistory>(
backup_store, kv::test::FirstBackupNodeId, *kp);
backup_history->set_endorsed_certificate(self_signed);
backup_store.set_history(backup_history);
INFO("Write certificate");
{
auto txs = primary_store.create_tx();

Просмотреть файл

@ -30,8 +30,8 @@ TEST_CASE("Snapshot with merkle tree" * doctest::test_suite("snapshot"))
auto source_history = std::make_shared<ccf::MerkleTxHistory>(
source_store, source_node_id, *source_node_kp);
source_history->set_endorsed_certificate({});
source_store.set_history(source_history);
source_store.initialise_term(2);
kv::Map<std::string, std::string> string_map("public:string_map");
@ -79,7 +79,7 @@ TEST_CASE("Snapshot with merkle tree" * doctest::test_suite("snapshot"))
target_tree.append(ccf::entry_leaf(
serialised_signature,
crypto::Sha256Hash("ce:0.4:"),
crypto::Sha256Hash("ce:2.4:"),
ccf::empty_claims()));
REQUIRE(
target_tree.get_root() == source_history->get_replicated_state_root());

Просмотреть файл

@ -142,6 +142,7 @@ TEST_CASE("Regular snapshotting")
auto history = std::make_shared<ccf::MerkleTxHistory>(
*network.tables.get(), kv::test::PrimaryNodeId, *kp);
network.tables->set_history(history);
network.tables->initialise_term(2);
network.tables->set_consensus(consensus);
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
network.tables->set_encryptor(encryptor);
@ -304,6 +305,7 @@ TEST_CASE("Rollback before snapshot is committed")
auto history = std::make_shared<ccf::MerkleTxHistory>(
*network.tables.get(), kv::test::PrimaryNodeId, *kp);
network.tables->set_history(history);
network.tables->initialise_term(2);
network.tables->set_consensus(consensus);
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
network.tables->set_encryptor(encryptor);
@ -434,6 +436,7 @@ TEST_CASE("Rekey ledger while snapshot is in progress")
auto history = std::make_shared<ccf::MerkleTxHistory>(
*network.tables.get(), kv::test::PrimaryNodeId, *kp);
network.tables->set_history(history);
network.tables->initialise_term(2);
network.tables->set_consensus(consensus);
auto ledger_secrets = std::make_shared<ccf::LedgerSecrets>();
ledger_secrets->init();