зеркало из https://github.com/microsoft/CCF.git
Add clear method to KV store (#1911)
This commit is contained in:
Родитель
5bf119fe7c
Коммит
4cffd1f83e
|
@ -17,16 +17,49 @@
|
|||
|
||||
namespace kv
|
||||
{
|
||||
class Store : public AbstractStore
|
||||
class StoreState
|
||||
{
|
||||
private:
|
||||
protected:
|
||||
// All collections of Map must be ordered so that we lock their contained
|
||||
// maps in a stable order. The order here is by map name. The version
|
||||
// indicates the version at which the Map was created.
|
||||
using Maps = std::
|
||||
map<std::string, std::pair<kv::Version, std::shared_ptr<untyped::Map>>>;
|
||||
SpinLock maps_lock;
|
||||
Maps maps;
|
||||
|
||||
SpinLock version_lock;
|
||||
Version version = 0;
|
||||
Version compacted = 0;
|
||||
Term term = 0;
|
||||
Version last_replicated = 0;
|
||||
Version last_committable = 0;
|
||||
Version rollback_count = 0;
|
||||
|
||||
std::unordered_map<Version, std::pair<PendingTx, bool>> pending_txs;
|
||||
|
||||
public:
|
||||
void clear()
|
||||
{
|
||||
std::lock_guard<SpinLock> mguard(maps_lock);
|
||||
std::lock_guard<SpinLock> vguard(version_lock);
|
||||
|
||||
maps.clear();
|
||||
pending_txs.clear();
|
||||
|
||||
version = 0;
|
||||
compacted = 0;
|
||||
term = 0;
|
||||
|
||||
last_replicated = 0;
|
||||
last_committable = 0;
|
||||
rollback_count = 0;
|
||||
}
|
||||
};
|
||||
|
||||
class Store : public AbstractStore, public StoreState
|
||||
{
|
||||
private:
|
||||
using Hooks = std::map<std::string, kv::untyped::Map::CommitHook>;
|
||||
Hooks local_hooks;
|
||||
Hooks global_hooks;
|
||||
|
@ -35,17 +68,7 @@ namespace kv
|
|||
std::shared_ptr<TxHistory> history = nullptr;
|
||||
std::shared_ptr<ccf::ProgressTracker> progress_tracker = nullptr;
|
||||
EncryptorPtr encryptor = nullptr;
|
||||
Version version = 0;
|
||||
Version compacted = 0;
|
||||
Term term = 0;
|
||||
|
||||
SpinLock maps_lock;
|
||||
SpinLock version_lock;
|
||||
|
||||
std::unordered_map<Version, std::pair<PendingTx, bool>> pending_txs;
|
||||
Version last_replicated = 0;
|
||||
Version last_committable = 0;
|
||||
Version rollback_count = 0;
|
||||
kv::ReplicateType replicate_type = kv::ReplicateType::ALL;
|
||||
std::unordered_set<std::string> replicated_tables;
|
||||
|
||||
|
|
|
@ -1403,4 +1403,56 @@ TEST_CASE("Mid-tx compaction")
|
|||
// In real operation, this transaction would be re-executed and hope to not
|
||||
// intersect a compaction
|
||||
}
|
||||
}
|
||||
|
||||
TEST_CASE("Store clear")
|
||||
{
|
||||
kv::Store kv_store;
|
||||
kv_store.set_term(42);
|
||||
|
||||
auto map_a_name = "public:A";
|
||||
auto map_b_name = "public:B";
|
||||
MapTypes::StringNum map_a(map_a_name);
|
||||
MapTypes::StringNum map_b(map_b_name);
|
||||
|
||||
INFO("Apply transactions and compact store");
|
||||
{
|
||||
size_t tx_count = 10;
|
||||
for (int i = 0; i < tx_count; i++)
|
||||
{
|
||||
auto tx = kv_store.create_tx();
|
||||
auto [view_a, view_b] = tx.get_view(map_a, map_b);
|
||||
|
||||
view_a->put("key" + std::to_string(i), 42);
|
||||
view_b->put("key" + std::to_string(i), 42);
|
||||
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
|
||||
}
|
||||
|
||||
auto current_version = kv_store.current_version();
|
||||
kv_store.compact(current_version);
|
||||
|
||||
REQUIRE(kv_store.get_map(current_version, map_a_name) != nullptr);
|
||||
REQUIRE(kv_store.get_map(current_version, map_b_name) != nullptr);
|
||||
|
||||
REQUIRE(kv_store.current_version() != 0);
|
||||
REQUIRE(kv_store.commit_version() != 0);
|
||||
auto tx_id = kv_store.current_txid();
|
||||
REQUIRE(tx_id.term != 0);
|
||||
REQUIRE(tx_id.version != 0);
|
||||
}
|
||||
|
||||
INFO("Verify that store state is cleared");
|
||||
{
|
||||
kv_store.clear();
|
||||
auto current_version = kv_store.current_version();
|
||||
|
||||
REQUIRE(kv_store.get_map(current_version, map_a_name) == nullptr);
|
||||
REQUIRE(kv_store.get_map(current_version, map_b_name) == nullptr);
|
||||
|
||||
REQUIRE(kv_store.current_version() == 0);
|
||||
REQUIRE(kv_store.commit_version() == 0);
|
||||
auto tx_id = kv_store.current_txid();
|
||||
REQUIRE(tx_id.term == 0);
|
||||
REQUIRE(tx_id.version == 0);
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче