From a554a30e01ab30386f087da7bcc1d76e5772a255 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Wed, 12 Jul 2023 18:47:13 +0100 Subject: [PATCH] Store deadlock fix (#5413) --- .threading_canary | 2 +- src/kv/apply_changes.h | 25 ++++++- src/kv/committable_tx.h | 12 +++- src/kv/kv_types.h | 5 +- src/kv/store.h | 101 ++++++++++++++++----------- src/kv/test/stub_consensus.h | 1 - src/node/history.h | 27 +++++-- src/node/test/historical_queries.cpp | 8 +++ src/node/test/history.cpp | 66 ++++++++--------- src/node/test/snapshot.cpp | 4 +- src/node/test/snapshotter.cpp | 3 + 11 files changed, 166 insertions(+), 88 deletions(-) diff --git a/.threading_canary b/.threading_canary index 6f719b358..ff262be76 100644 --- a/.threading_canary +++ b/.threading_canary @@ -1 +1 @@ -THIS looks like a job for Threading Canar!y!1! +THIS looks like a job for Threading Canar!y!1!.. diff --git a/src/kv/apply_changes.h b/src/kv/apply_changes.h index bc39fc438..ce6bf58be 100644 --- a/src/kv/apply_changes.h +++ b/src/kv/apply_changes.h @@ -41,7 +41,8 @@ namespace kv const MapCollection& new_maps, const std::optional& new_maps_conflict_version, bool track_read_versions, - bool track_deletes_on_missing_keys) + bool track_deletes_on_missing_keys, + const std::optional& expected_rollback_count = std::nullopt) { // All maps with pending writes are locked, transactions are prepared // and possibly committed, and then all maps with pending writes are @@ -70,7 +71,27 @@ namespace kv } bool ok = true; - if (has_writes) + + if (expected_rollback_count.has_value() && !changes.empty()) + { + // expected_rollback_count is only set on signature transactions + // which always contain some writes, and on which all the maps + // point to the same store. + auto store = changes.begin()->second.map->get_store(); + if (store != nullptr) + { + // Note that this is done when holding the lock on at least some maps + // through the combination of the changes not being empty, and the + // acquisition of the map locks on line 69. This guarantees atomicity + // with respect to rollbacks, which would acquire the map lock on all + // maps at once to truncate their roll. The net result is that the + // transaction becomes a noop if a rollback occurred between it being + // committed, and the side effects being applied. + ok = store->check_rollback_count(expected_rollback_count.value()); + } + } + + if (ok && has_writes) { for (auto it = views.begin(); it != views.end(); ++it) { diff --git a/src/kv/committable_tx.h b/src/kv/committable_tx.h index e7dd9ea16..065f29b59 100644 --- a/src/kv/committable_tx.h +++ b/src/kv/committable_tx.h @@ -391,14 +391,21 @@ namespace kv // never conflict. class ReservedTx : public CommittableTx { + private: + Version rollback_count = 0; + public: ReservedTx( - AbstractStore* _store, Term read_term, const TxID& reserved_tx_id) : + AbstractStore* _store, + Term read_term, + const TxID& reserved_tx_id, + Version rollback_count_) : CommittableTx(_store) { version = reserved_tx_id.version; pimpl->commit_view = reserved_tx_id.term; pimpl->read_txid = TxID(read_term, reserved_tx_id.version - 1); + rollback_count = rollback_count_; } // Used by frontend to commit reserved transactions @@ -420,7 +427,8 @@ namespace kv pimpl->created_maps, version, track_read_versions, - track_deletes_on_missing_keys); + track_deletes_on_missing_keys, + rollback_count); success = c.has_value(); if (!success) diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index d0dba597e..31aa2e373 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -418,7 +418,9 @@ namespace kv const std::vector& hash_at_snapshot) = 0; virtual std::vector get_raw_leaf(uint64_t index) = 0; virtual void append(const std::vector& data) = 0; - virtual void append_entry(const crypto::Sha256Hash& digest) = 0; + virtual void append_entry( + const crypto::Sha256Hash& digest, + std::optional expected_term = std::nullopt) = 0; virtual void rollback( const kv::TxID& tx_id, kv::Term term_of_next_version_) = 0; virtual void compact(Version v) = 0; @@ -721,6 +723,7 @@ namespace kv const TxID& txid, std::unique_ptr pending_tx, bool globally_committable) = 0; + virtual bool check_rollback_count(Version count) = 0; virtual std::unique_ptr snapshot_unsafe_maps( Version v) = 0; diff --git a/src/kv/store.h b/src/kv/store.h index 6b2768876..f334512aa 100644 --- a/src/kv/store.h +++ b/src/kv/store.h @@ -894,6 +894,10 @@ namespace kv Version previous_rollback_count = 0; ccf::View replication_view = 0; + std::vector, bool>> + contiguous_pending_txs; + auto h = get_history(); + { std::lock_guard vguard(version_lock); if (txid.term != term_of_next_version && get_consensus()->is_primary()) @@ -918,8 +922,6 @@ namespace kv std::make_tuple(std::move(pending_tx), globally_committable)}); LOG_TRACE_FMT("Inserting pending tx at {}", txid.version); - - auto h = get_history(); auto c = get_consensus(); for (Version offset = 1; true; ++offset) @@ -938,53 +940,62 @@ namespace kv break; } - auto& [pending_tx_, committable_] = search->second; - auto - [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] = - pending_tx_->call(); - auto data_shared = - std::make_shared>(std::move(data_)); - auto hooks_shared = - std::make_shared(std::move(hooks_)); - - // NB: this cannot happen currently. Regular Tx only make it here if - // they did succeed, and signatures cannot conflict because they - // execute in order with a read_version that's version - 1, so even - // two contiguous signatures are fine - if (success_ != CommitResult::SUCCESS) - { - LOG_DEBUG_FMT("Failed Tx commit {}", last_replicated + offset); - } - - if (h) - { - h->append_entry(ccf::entry_leaf( - *data_shared, commit_evidence_digest_, claims_digest_)); - } - - LOG_DEBUG_FMT( - "Batching {} ({}) during commit of {}.{}", - last_replicated + offset, - data_shared->size(), - txid.term, - txid.version); - - batch.emplace_back( - last_replicated + offset, data_shared, committable_, hooks_shared); + contiguous_pending_txs.emplace_back(std::move(search->second)); pending_txs.erase(search); } - if (batch.size() == 0) - { - return CommitResult::SUCCESS; - } - previous_rollback_count = rollback_count; previous_last_replicated = last_replicated; - next_last_replicated = last_replicated + batch.size(); + next_last_replicated = last_replicated + contiguous_pending_txs.size(); replication_view = term_of_next_version; } + // Release version lock + + if (contiguous_pending_txs.size() == 0) + { + return CommitResult::SUCCESS; + } + + size_t offset = 1; + for (auto& [pending_tx_, committable_] : contiguous_pending_txs) + { + auto + [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] = + pending_tx_->call(); + auto data_shared = + std::make_shared>(std::move(data_)); + auto hooks_shared = + std::make_shared(std::move(hooks_)); + + // NB: this cannot happen currently. Regular Tx only make it here if + // they did succeed, and signatures cannot conflict because they + // execute in order with a read_version that's version - 1, so even + // two contiguous signatures are fine + if (success_ != CommitResult::SUCCESS) + { + LOG_DEBUG_FMT("Failed Tx commit {}", last_replicated + offset); + } + + if (h) + { + h->append_entry( + ccf::entry_leaf( + *data_shared, commit_evidence_digest_, claims_digest_), + replication_view); + } + + LOG_DEBUG_FMT( + "Batching {} ({}) during commit of {}.{}", + last_replicated + offset, + data_shared->size(), + txid.term, + txid.version); + + batch.emplace_back( + last_replicated + offset, data_shared, committable_, hooks_shared); + offset++; + } if (c->replicate(batch, replication_view)) { @@ -1037,6 +1048,12 @@ namespace kv maps_lock.unlock(); } + bool check_rollback_count(Version count) override + { + std::lock_guard vguard(version_lock); + return rollback_count == count; + } + std::tuple next_version(bool commit_new_map) override { std::lock_guard vguard(version_lock); @@ -1234,7 +1251,7 @@ namespace kv { // version_lock should already been acquired in case term_of_last_version // is incremented. - return ReservedTx(this, term_of_last_version, tx_id); + return ReservedTx(this, term_of_last_version, tx_id, rollback_count); } virtual void set_flag(Flag f) override diff --git a/src/kv/test/stub_consensus.h b/src/kv/test/stub_consensus.h index 0f26bca4c..978634741 100644 --- a/src/kv/test/stub_consensus.h +++ b/src/kv/test/stub_consensus.h @@ -23,7 +23,6 @@ namespace kv::test std::vector replica; ccf::TxID committed_txid = {}; ccf::View current_view = 0; - ccf::SeqNo last_signature = 0; aft::ViewHistory view_history; diff --git a/src/node/history.h b/src/node/history.h index 1f9103854..2608aa9e9 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -134,7 +134,9 @@ namespace ccf version++; } - void append_entry(const crypto::Sha256Hash& digest) override + void append_entry( + const crypto::Sha256Hash& digest, + std::optional term_of_next_version = std::nullopt) override { version++; } @@ -677,8 +679,15 @@ namespace ccf std::vector serialise_tree(size_t to) override { std::lock_guard guard(state_lock); - return replicated_state_tree.serialise( - replicated_state_tree.begin_index(), to); + if (to <= replicated_state_tree.end_index()) + { + return replicated_state_tree.serialise( + replicated_state_tree.begin_index(), to); + } + else + { + return {}; + } } void set_term(kv::Term t) override @@ -784,10 +793,20 @@ namespace ccf replicated_state_tree.append(rh); } - void append_entry(const crypto::Sha256Hash& digest) override + void append_entry( + const crypto::Sha256Hash& digest, + std::optional expected_term_of_next_version = + std::nullopt) override { log_hash(digest, APPEND); std::lock_guard guard(state_lock); + if (expected_term_of_next_version.has_value()) + { + if (expected_term_of_next_version.value() != term_of_next_version) + { + return; + } + } replicated_state_tree.append(digest); } diff --git a/src/node/test/historical_queries.cpp b/src/node/test/historical_queries.cpp index 1fa6da881..0536a851a 100644 --- a/src/node/test/historical_queries.cpp +++ b/src/node/test/historical_queries.cpp @@ -58,6 +58,7 @@ TestState create_and_init_state(bool initialise_ledger_rekey = true) std::make_shared(*ts.kv_store, node_id, *ts.node_kp); h->set_endorsed_certificate({}); ts.kv_store->set_history(h); + ts.kv_store->initialise_term(2); { INFO("Store the signing node's key"); @@ -122,6 +123,7 @@ kv::Version write_transactions(kv::Store& kv_store, size_t tx_count) REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); } + REQUIRE(kv_store.current_version() == end); return kv_store.current_version(); } @@ -215,6 +217,7 @@ std::map> construct_host_ledger( std::map> ledger; auto next_ledger_entry = consensus->pop_oldest_entry(); + auto version = std::get<0>(next_ledger_entry.value()); while (next_ledger_entry.has_value()) { const auto ib = ledger.insert(std::make_pair( @@ -222,6 +225,11 @@ std::map> construct_host_ledger( *std::get<1>(next_ledger_entry.value()))); REQUIRE(ib.second); next_ledger_entry = consensus->pop_oldest_entry(); + if (next_ledger_entry.has_value()) + { + REQUIRE(version + 1 == std::get<0>(next_ledger_entry.value())); + version = std::get<0>(next_ledger_entry.value()); + } } return ledger; diff --git a/src/node/test/history.cpp b/src/node/test/history.cpp index 4a8298df8..9ffce5d61 100644 --- a/src/node/test/history.cpp +++ b/src/node/test/history.cpp @@ -3,6 +3,7 @@ #include "node/history.h" #include "ccf/app_interface.h" +#include "ccf/ds/logger.h" #include "ccf/service/tables/nodes.h" #include "crypto/certs.h" #include "ds/x509_time_fmt.h" @@ -72,37 +73,38 @@ TEST_CASE("Check signature verification") { auto encryptor = std::make_shared(); - kv::Store primary_store; - primary_store.set_encryptor(encryptor); - - kv::Store backup_store; - backup_store.set_encryptor(encryptor); - - ccf::Nodes nodes(ccf::Tables::NODES); - ccf::Signatures signatures(ccf::Tables::SIGNATURES); - auto kp = crypto::make_key_pair(); - const auto self_signed = kp->self_sign("CN=Node", valid_from, valid_to); - std::shared_ptr consensus = - std::make_shared(&backup_store); - primary_store.set_consensus(consensus); - std::shared_ptr null_consensus = - std::make_shared(nullptr); - backup_store.set_consensus(null_consensus); - + kv::Store primary_store; + primary_store.set_encryptor(encryptor); + constexpr auto store_term = 2; std::shared_ptr primary_history = std::make_shared( primary_store, kv::test::PrimaryNodeId, *kp); primary_history->set_endorsed_certificate(self_signed); primary_store.set_history(primary_history); + primary_store.initialise_term(store_term); + kv::Store backup_store; + backup_store.set_encryptor(encryptor); std::shared_ptr backup_history = std::make_shared( backup_store, kv::test::FirstBackupNodeId, *kp); backup_history->set_endorsed_certificate(self_signed); backup_store.set_history(backup_history); + backup_store.initialise_term(store_term); + + ccf::Nodes nodes(ccf::Tables::NODES); + ccf::Signatures signatures(ccf::Tables::SIGNATURES); + + std::shared_ptr consensus = + std::make_shared(&backup_store); + primary_store.set_consensus(consensus); + + std::shared_ptr null_consensus = + std::make_shared(nullptr); + backup_store.set_consensus(null_consensus); INFO("Write certificate"); { @@ -135,21 +137,31 @@ TEST_CASE("Check signature verification") TEST_CASE("Check signing works across rollback") { auto encryptor = std::make_shared(); + + auto kp = crypto::make_key_pair(); + const auto self_signed = kp->self_sign("CN=Node", valid_from, valid_to); + kv::Store primary_store; primary_store.set_encryptor(encryptor); constexpr auto store_term = 2; + std::shared_ptr primary_history = + std::make_shared( + primary_store, kv::test::PrimaryNodeId, *kp); + primary_history->set_endorsed_certificate(self_signed); + primary_store.set_history(primary_history); primary_store.initialise_term(store_term); kv::Store backup_store; + std::shared_ptr backup_history = + std::make_shared( + backup_store, kv::test::FirstBackupNodeId, *kp); + backup_history->set_endorsed_certificate(self_signed); + backup_store.set_history(backup_history); backup_store.set_encryptor(encryptor); backup_store.initialise_term(store_term); ccf::Nodes nodes(ccf::Tables::NODES); - auto kp = crypto::make_key_pair(); - - const auto self_signed = kp->self_sign("CN=Node", valid_from, valid_to); - std::shared_ptr consensus = std::make_shared(&backup_store); primary_store.set_consensus(consensus); @@ -157,18 +169,6 @@ TEST_CASE("Check signing works across rollback") std::make_shared(nullptr); backup_store.set_consensus(null_consensus); - std::shared_ptr primary_history = - std::make_shared( - primary_store, kv::test::PrimaryNodeId, *kp); - primary_history->set_endorsed_certificate(self_signed); - primary_store.set_history(primary_history); - - std::shared_ptr backup_history = - std::make_shared( - backup_store, kv::test::FirstBackupNodeId, *kp); - backup_history->set_endorsed_certificate(self_signed); - backup_store.set_history(backup_history); - INFO("Write certificate"); { auto txs = primary_store.create_tx(); diff --git a/src/node/test/snapshot.cpp b/src/node/test/snapshot.cpp index 367b3b6d5..139c83650 100644 --- a/src/node/test/snapshot.cpp +++ b/src/node/test/snapshot.cpp @@ -30,8 +30,8 @@ TEST_CASE("Snapshot with merkle tree" * doctest::test_suite("snapshot")) auto source_history = std::make_shared( source_store, source_node_id, *source_node_kp); source_history->set_endorsed_certificate({}); - source_store.set_history(source_history); + source_store.initialise_term(2); kv::Map string_map("public:string_map"); @@ -79,7 +79,7 @@ TEST_CASE("Snapshot with merkle tree" * doctest::test_suite("snapshot")) target_tree.append(ccf::entry_leaf( serialised_signature, - crypto::Sha256Hash("ce:0.4:"), + crypto::Sha256Hash("ce:2.4:"), ccf::empty_claims())); REQUIRE( target_tree.get_root() == source_history->get_replicated_state_root()); diff --git a/src/node/test/snapshotter.cpp b/src/node/test/snapshotter.cpp index 5c48d2ebd..5ea06f53d 100644 --- a/src/node/test/snapshotter.cpp +++ b/src/node/test/snapshotter.cpp @@ -142,6 +142,7 @@ TEST_CASE("Regular snapshotting") auto history = std::make_shared( *network.tables.get(), kv::test::PrimaryNodeId, *kp); network.tables->set_history(history); + network.tables->initialise_term(2); network.tables->set_consensus(consensus); auto encryptor = std::make_shared(); network.tables->set_encryptor(encryptor); @@ -304,6 +305,7 @@ TEST_CASE("Rollback before snapshot is committed") auto history = std::make_shared( *network.tables.get(), kv::test::PrimaryNodeId, *kp); network.tables->set_history(history); + network.tables->initialise_term(2); network.tables->set_consensus(consensus); auto encryptor = std::make_shared(); network.tables->set_encryptor(encryptor); @@ -434,6 +436,7 @@ TEST_CASE("Rekey ledger while snapshot is in progress") auto history = std::make_shared( *network.tables.get(), kv::test::PrimaryNodeId, *kp); network.tables->set_history(history); + network.tables->initialise_term(2); network.tables->set_consensus(consensus); auto ledger_secrets = std::make_shared(); ledger_secrets->init();