Initial support for creating KV tables within a transaction (#1507)

This commit is contained in:
Eddy Ashton 2020-08-24 15:09:06 +01:00 коммит произвёл GitHub
Родитель 0c4c648527
Коммит 29eeb309f5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 959 добавлений и 76 удалений

Просмотреть файл

@ -257,6 +257,7 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/kv/test/kv_contention.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/kv/test/kv_serialisation.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/kv/test/kv_snapshot.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/kv/test/kv_dynamic_tables.cpp
)
use_client_mbedtls(kv_test)
target_link_libraries(

Просмотреть файл

@ -58,6 +58,18 @@ namespace kv
SECURITY_DOMAIN_MAX
};
static inline SecurityDomain get_security_domain(const std::string& name)
{
constexpr auto public_domain_prefix = "public:";
if (name.rfind(public_domain_prefix, 0) == 0)
{
return SecurityDomain::PUBLIC;
}
return SecurityDomain::PRIVATE;
}
// Note that failed = 0, and all other values are variants of PASS, which
// allows DeserialiseSuccess to be used as a boolean in code that does not
// need any detail about what happened on success
@ -352,6 +364,8 @@ namespace kv
Version version, const std::vector<uint8_t>& raw_ledger_key) = 0;
};
using EncryptorPtr = std::shared_ptr<AbstractTxEncryptor>;
class AbstractTxView
{
public:
@ -365,7 +379,7 @@ namespace kv
};
class AbstractStore;
class AbstractMap
class AbstractMap : public std::enable_shared_from_this<AbstractMap>
{
public:
class Snapshot
@ -415,6 +429,9 @@ namespace kv
virtual ~AbstractStore() {}
virtual void lock() = 0;
virtual void unlock() = 0;
virtual Version next_version() = 0;
virtual TxID next_txid() = 0;
@ -423,9 +440,15 @@ namespace kv
virtual Version commit_version() = 0;
virtual std::shared_ptr<AbstractMap> get_map(
kv::Version v, const std::string& map_name) = 0;
virtual void add_dynamic_map(
kv::Version v, const std::shared_ptr<AbstractMap>& map) = 0;
virtual bool is_map_replicated(const std::string& map_name) = 0;
virtual std::shared_ptr<Consensus> get_consensus() = 0;
virtual std::shared_ptr<TxHistory> get_history() = 0;
virtual std::shared_ptr<AbstractTxEncryptor> get_encryptor() = 0;
virtual EncryptorPtr get_encryptor() = 0;
virtual DeserialiseSuccess deserialise(
const std::vector<uint8_t>& data,
bool public_only = false,
@ -444,5 +467,4 @@ namespace kv
virtual size_t commit_gap() = 0;
};
}

Просмотреть файл

@ -7,6 +7,7 @@
#include "kv_types.h"
#include "map.h"
#include "snapshot.h"
#include "tx.h"
#include "view_containers.h"
#include <fmt/format.h>
@ -17,13 +18,16 @@ namespace kv
{
private:
// All collections of Map must be ordered so that we lock their contained
// maps in a stable order. The order here is by map name
using Maps = std::map<std::string, std::unique_ptr<AbstractMap>>;
// maps in a stable order. The order here is by map name. The version
// indicates the version at which the Map was created, or kv::NoVersion for
// 'static' maps created by Store.create
using Maps = std::
map<std::string, std::pair<kv::Version, std::shared_ptr<AbstractMap>>>;
Maps maps;
std::shared_ptr<Consensus> consensus = nullptr;
std::shared_ptr<TxHistory> history = nullptr;
std::shared_ptr<AbstractTxEncryptor> encryptor = nullptr;
EncryptorPtr encryptor = nullptr;
Version version = 0;
Version compacted = 0;
Term term = 0;
@ -45,9 +49,11 @@ namespace kv
// Tables, but its versioning invariants are ignored.
const bool strict_versions = true;
DeserialiseSuccess commit_deserialised(OrderedViews& views, Version& v)
DeserialiseSuccess commit_deserialised(
OrderedViews& views, Version& v, const MapCollection& new_maps)
{
auto c = apply_views(views, [v]() { return v; });
auto c = apply_views(
views, [v]() { return v; }, new_maps);
if (!c.has_value())
{
LOG_FAIL_FMT("Failed to commit deserialised Tx at version {}", v);
@ -61,6 +67,15 @@ namespace kv
return DeserialiseSuccess::PASS;
}
bool has_map_internal(const std::string& name)
{
auto search = maps.find(name);
if (search != maps.end())
return true;
return false;
}
public:
void clone_schema(Store& from)
{
@ -69,9 +84,11 @@ namespace kv
if ((maps.size() != 0) || (version != 0))
throw std::logic_error("Cannot clone schema on a non-empty store");
for (auto& [name, map] : from.maps)
for (auto& [name, pair] : from.maps)
{
maps[name] = std::unique_ptr<AbstractMap>(map->clone(this));
auto& [v, map] = pair;
maps[name] =
std::make_pair(v, std::unique_ptr<AbstractMap>(map->clone(this)));
}
}
@ -108,12 +125,12 @@ namespace kv
history = history_;
}
void set_encryptor(std::shared_ptr<AbstractTxEncryptor> encryptor_)
void set_encryptor(const EncryptorPtr& encryptor_)
{
encryptor = encryptor_;
}
std::shared_ptr<AbstractTxEncryptor> get_encryptor() override
EncryptorPtr get_encryptor() override
{
return encryptor;
}
@ -138,7 +155,7 @@ namespace kv
auto search = maps.find(name);
if (search != maps.end())
{
auto result = dynamic_cast<M*>(search->second.get());
auto result = dynamic_cast<M*>(search->second.second.get());
if (result == nullptr)
return nullptr;
@ -199,25 +216,68 @@ namespace kv
{
std::lock_guard<SpinLock> mguard(maps_lock);
auto search = maps.find(name);
if (search != maps.end())
if (has_map_internal(name))
throw std::logic_error("Map already exists");
auto replicated = true;
if (replicate_type == kv::ReplicateType::NONE)
auto result = std::make_shared<M>(
this, name, security_domain, is_map_replicated(name));
maps[name] = std::make_pair(NoVersion, result);
return *result;
}
std::shared_ptr<AbstractMap> get_map(
kv::Version v, const std::string& map_name) override
{
auto search = maps.find(map_name);
if (search != maps.end())
{
replicated = false;
}
else if (replicate_type == kv::ReplicateType::SOME)
{
if (replicated_tables.find(name) == replicated_tables.end())
const auto& [map_creation_version, map_ptr] = search->second;
if (v >= map_creation_version || map_creation_version == NoVersion)
{
replicated = false;
return map_ptr;
}
}
auto result = new M(this, name, security_domain, replicated);
maps[name] = std::unique_ptr<AbstractMap>(result);
return *result;
return nullptr;
}
void add_dynamic_map(
kv::Version v, const std::shared_ptr<AbstractMap>& map) override
{
const auto map_name = map->get_name();
if (get_map(v, map_name) != nullptr)
{
throw std::logic_error(fmt::format(
"Can't add dynamic map - already have a map named {}", map_name));
}
maps[map_name] = std::make_pair(v, map);
}
bool is_map_replicated(const std::string& name) override
{
switch (replicate_type)
{
case (kv::ReplicateType::ALL):
{
return true;
}
case (kv::ReplicateType::NONE):
{
return false;
}
case (kv::ReplicateType::SOME):
{
return replicated_tables.find(name) != replicated_tables.end();
}
default:
{
throw std::logic_error("Unhandled ReplicateType value");
}
}
}
std::unique_ptr<AbstractSnapshot> snapshot(Version v) override
@ -241,14 +301,16 @@ namespace kv
{
std::lock_guard<SpinLock> mguard(maps_lock);
for (auto& map : maps)
for (auto& it : maps)
{
map.second->lock();
auto& [_, map] = it.second;
map->lock();
}
for (auto& map : maps)
for (auto& it : maps)
{
snapshot->add_map_snapshot(map.second->snapshot(v));
auto& [_, map] = it.second;
snapshot->add_map_snapshot(map->snapshot(v));
}
auto h = get_history();
@ -257,9 +319,10 @@ namespace kv
snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
}
for (auto& map : maps)
for (auto& it : maps)
{
map.second->unlock();
auto& [_, map] = it.second;
map->unlock();
}
}
@ -287,6 +350,14 @@ namespace kv
}
auto v = v_.value();
std::lock_guard<SpinLock> mguard(maps_lock);
for (auto& it : maps)
{
auto& [_, map] = it.second;
map->lock();
}
std::vector<uint8_t> hash_at_snapshot;
auto h = get_history();
if (h)
@ -294,19 +365,32 @@ namespace kv
hash_at_snapshot = d.deserialise_raw();
}
std::lock_guard<SpinLock> mguard(maps_lock);
OrderedViews views;
MapCollection new_maps;
for (auto r = d.start_map(); r.has_value(); r = d.start_map())
{
const auto map_name = r.value();
std::shared_ptr<AbstractMap> map = nullptr;
auto search = maps.find(map_name);
if (search == maps.end())
{
LOG_FAIL_FMT("Failed to deserialise snapshot at version {}", v);
LOG_DEBUG_FMT("No such map in store {}", map_name);
return DeserialiseSuccess::FAILED;
map = std::make_shared<kv::untyped::Map>(
this,
map_name,
get_security_domain(map_name),
is_map_replicated(map_name));
new_maps[map_name] = map;
LOG_DEBUG_FMT(
"Creating map {} while deserialising snapshot at version {}",
map_name,
v);
}
else
{
map = search->second.second;
}
auto view_search = views.find(map_name);
@ -317,14 +401,18 @@ namespace kv
return DeserialiseSuccess::FAILED;
}
auto deserialise_snapshot_view =
search->second->deserialise_snapshot(d);
auto deserialise_snapshot_view = map->deserialise_snapshot(d);
// Take ownership of the produced view, store it to be committed
// later
views[map_name] = {
search->second.get(),
std::unique_ptr<AbstractTxView>(deserialise_snapshot_view)};
map, std::unique_ptr<AbstractTxView>(deserialise_snapshot_view)};
}
for (auto& it : maps)
{
auto& [_, map] = it.second;
map->unlock();
}
if (!d.end())
@ -336,7 +424,8 @@ namespace kv
// Each map is committed at a different version, independently of the
// overall snapshot version. The commit versions for each map are
// contained in the snapshot and applied when the snapshot is committed.
auto c = apply_views(views, []() { return NoVersion; });
auto c = apply_views(
views, []() { return NoVersion; }, new_maps);
if (!c.has_value())
{
LOG_FAIL_FMT("Failed to commit deserialised snapshot at version {}", v);
@ -373,19 +462,22 @@ namespace kv
return;
}
for (auto& map : maps)
for (auto& it : maps)
{
map.second->lock();
auto& [_, map] = it.second;
map->lock();
}
for (auto& map : maps)
for (auto& it : maps)
{
map.second->compact(v);
auto& [_, map] = it.second;
map->compact(v);
}
for (auto& map : maps)
for (auto& it : maps)
{
map.second->unlock();
auto& [_, map] = it.second;
map->unlock();
}
{
@ -405,9 +497,10 @@ namespace kv
}
}
for (auto& map : maps)
for (auto& it : maps)
{
map.second->post_compact();
auto& [_, map] = it.second;
map->post_compact();
}
}
@ -434,14 +527,37 @@ namespace kv
v,
commit_version()));
for (auto& map : maps)
map.second->lock();
for (auto& it : maps)
{
auto& [_, map] = it.second;
map->lock();
}
for (auto& map : maps)
map.second->rollback(v);
auto it = maps.begin();
while (it != maps.end())
{
auto& [map_creation_version, map] = it->second;
// Rollback this map whether we're forgetting about it or not. Anyone
// else still holding it should see it has rolled back
map->rollback(v);
if (map_creation_version > v)
{
// Map was created more recently; its creation is being forgotten.
// Erase our knowledge of it
map->unlock();
it = maps.erase(it);
}
else
{
++it;
}
}
for (auto& map : maps)
map.second->unlock();
for (auto& it : maps)
{
auto& [_, map] = it.second;
map->unlock();
}
std::lock_guard<SpinLock> vguard(version_lock);
version = v;
@ -515,20 +631,29 @@ namespace kv
// Deserialised transactions express read dependencies as versions,
// rather than with the actual value read. As a result, they don't
// need snapshot isolation on the map state, and so do not need to
// lock all the maps before creating the transaction.
// lock each of the maps before creating the transaction.
std::lock_guard<SpinLock> mguard(maps_lock);
OrderedViews views;
MapCollection new_maps;
for (auto r = d.start_map(); r.has_value(); r = d.start_map())
{
const auto map_name = r.value();
auto search = maps.find(map_name);
if (search == maps.end())
auto map = get_map(v, map_name);
if (map == nullptr)
{
LOG_FAIL_FMT("Failed to deserialise transaction at version {}", v);
LOG_DEBUG_FMT("No such map in store {}", map_name);
return DeserialiseSuccess::FAILED;
auto map_shared = std::make_shared<kv::untyped::Map>(
this,
map_name,
get_security_domain(map_name),
is_map_replicated(map_name));
map = map_shared;
new_maps[map_name] = map_shared;
LOG_DEBUG_FMT(
"Creating map {} while deserialising transaction at version {}",
map_name,
v);
}
auto view_search = views.find(map_name);
@ -543,12 +668,11 @@ namespace kv
// otherwise the view will be considered as having a committed
// version
auto deserialise_version = (commit ? v : NoVersion);
auto deserialised_view =
search->second->deserialise(d, deserialise_version);
auto deserialised_view = map->deserialise(d, deserialise_version);
// Take ownership of the produced view, store it to be applied
// later
views[map_name] = {search->second.get(),
views[map_name] = {map,
std::unique_ptr<AbstractTxView>(deserialised_view)};
}
@ -562,7 +686,7 @@ namespace kv
if (commit)
{
success = commit_deserialised(views, v);
success = commit_deserialised(views, v, new_maps);
if (success == DeserialiseSuccess::FAILED)
{
return success;
@ -658,7 +782,13 @@ namespace kv
if (search == that.maps.end())
return false;
if (*it->second != *search->second)
auto& [this_v, this_map] = it->second;
auto& [that_v, that_map] = search->second;
if (this_v != that_v)
return false;
if (*this_map != *that_map)
return false;
}
@ -793,6 +923,16 @@ namespace kv
}
}
void lock() override
{
maps_lock.lock();
}
void unlock() override
{
maps_lock.unlock();
}
Version next_version() override
{
std::lock_guard<SpinLock> vguard(version_lock);
@ -844,8 +984,9 @@ namespace kv
using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
std::vector<MapEntry> entries;
for (auto& [name, map] : maps)
for (auto& [name, pair] : maps)
{
auto& [_, map] = pair;
if (map->get_security_domain() == SecurityDomain::PRIVATE)
{
map->lock();
@ -854,8 +995,9 @@ namespace kv
}
auto entry = entries.begin();
for (auto& [name, map] : store.maps)
for (auto& [name, pair] : store.maps)
{
auto& [_, map] = pair;
if (map->get_security_domain() == SecurityDomain::PRIVATE)
{
if (entry == entries.end())
@ -892,5 +1034,20 @@ namespace kv
rhs->unlock();
}
}
ReadOnlyTx create_read_only_tx()
{
return ReadOnlyTx(this);
}
Tx create_tx()
{
return Tx(this);
}
ReservedTx create_reserved_tx(Version v)
{
return ReservedTx(this, v);
}
};
}

Просмотреть файл

@ -0,0 +1,501 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "kv/store.h"
#include "kv/test/null_encryptor.h"
#include "kv/test/stub_consensus.h"
#include <doctest/doctest.h>
struct MapTypes
{
using StringString = kv::Map<std::string, std::string>;
using NumNum = kv::Map<size_t, size_t>;
using NumString = kv::Map<size_t, std::string>;
using StringNum = kv::Map<std::string, size_t>;
};
TEST_CASE("Basic dynamic table" * doctest::test_suite("dynamic"))
{
kv::Store kv_store;
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv_store.set_encryptor(encryptor);
constexpr auto map_name = "mapA";
INFO("Dynamically created maps can be used like normal maps");
{
auto map_a = kv_store.get<MapTypes::StringString>(map_name);
REQUIRE(map_a == nullptr);
}
{
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>(map_name);
view->put("foo", "bar");
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
}
{
INFO("Old style access");
// NB: Don't access these maps old-style, because you need to know this
// implementation detail that the map is _actually_ untyped
auto map_a_wrong = kv_store.get<MapTypes::StringString>(map_name);
REQUIRE(map_a_wrong == nullptr);
auto map_a = kv_store.get<kv::untyped::Map>(map_name);
REQUIRE(map_a != nullptr);
}
{
INFO("New style access");
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>(map_name);
const auto it = view->get("foo");
REQUIRE(it.has_value());
REQUIRE(it.value() == "bar");
}
{
INFO("Dynamic tables remain through compaction");
kv_store.compact(kv_store.current_version());
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>(map_name);
const auto it = view->get("foo");
REQUIRE(it.has_value());
REQUIRE(it.value() == "bar");
}
const auto version_before = kv_store.current_version();
constexpr auto new_map1 = "new_map1";
constexpr auto new_map2 = "new_map2";
constexpr auto new_map3 = "new_map3";
{
INFO("Multiple dynamic tables can be created in a single tx");
auto tx = kv_store.create_tx();
auto [v1, v2] = tx.get_view2<MapTypes::StringString, MapTypes::StringNum>(
new_map1, new_map2);
auto [v2a, v3] = tx.get_view2<MapTypes::StringNum, MapTypes::NumString>(
new_map2, new_map3);
REQUIRE(v2 == v2a);
v1->put("foo", "bar");
v3->put(42, "hello");
auto va = tx.get_view2<MapTypes::StringString>(map_name);
va->put("foo", "baz");
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
{
auto check_tx = kv_store.create_tx();
auto check_va = check_tx.get_view2<MapTypes::StringString>(map_name);
const auto v = check_va->get("foo");
REQUIRE(v.has_value());
REQUIRE(v.value() == "baz");
}
REQUIRE(kv_store.get<kv::untyped::Map>(new_map1) != nullptr);
REQUIRE(kv_store.get<kv::untyped::Map>(new_map3) != nullptr);
// No writes => map is not created
REQUIRE(kv_store.get<kv::untyped::Map>(new_map2) == nullptr);
}
{
INFO("Rollback can delete dynamic tables");
kv_store.rollback(version_before);
REQUIRE(kv_store.get<kv::untyped::Map>(new_map1) == nullptr);
REQUIRE(kv_store.get<kv::untyped::Map>(new_map2) == nullptr);
REQUIRE(kv_store.get<kv::untyped::Map>(new_map3) == nullptr);
// Previously created map is retained
REQUIRE(kv_store.get<kv::untyped::Map>(map_name) != nullptr);
{
INFO("Retained dynamic maps have their state rolled back");
auto check_tx = kv_store.create_tx();
auto check_va = check_tx.get_view2<MapTypes::StringString>(map_name);
const auto v = check_va->get("foo");
REQUIRE(v.has_value());
REQUIRE(v.value() == "bar");
}
}
}
TEST_CASE("Dynamic table opacity" * doctest::test_suite("dynamic"))
{
kv::Store kv_store;
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv_store.set_encryptor(encryptor);
constexpr auto map_name = "dynamic_map";
auto tx1 = kv_store.create_tx();
auto tx2 = kv_store.create_tx();
auto view1 = tx1.get_view2<MapTypes::StringString>(map_name);
view1->put("foo", "bar");
REQUIRE(view1->get("foo").value() == "bar");
auto view2 = tx2.get_view2<MapTypes::StringString>(map_name);
view2->put("foo", "baz");
REQUIRE(view2->get("foo").value() == "baz");
{
INFO("Maps are not visible externally until commit");
REQUIRE(kv_store.get<MapTypes::StringString>(map_name) == nullptr);
}
{
INFO("First transaction commits successfully");
REQUIRE(tx1.commit() == kv::CommitSuccess::OK);
}
{
INFO("Committed transaction results are persisted");
auto txx = kv_store.create_tx();
auto view = txx.get_view2<MapTypes::StringString>(map_name);
const auto v = view->get("foo");
REQUIRE(v.has_value());
REQUIRE(v.value() == "bar");
}
{
INFO("Second transaction conflicts");
REQUIRE(tx2.commit() == kv::CommitSuccess::CONFLICT);
}
{
INFO("Conflicting transaction can be rerun, on existing map");
auto tx3 = kv_store.create_tx();
auto view3 = tx3.get_view2<MapTypes::StringString>(map_name);
const auto v = view3->get("foo");
REQUIRE(v.has_value());
view3->put("foo", "baz");
REQUIRE(view3->get("foo").value() == "baz");
REQUIRE(tx3.commit() == kv::CommitSuccess::OK);
}
{
REQUIRE(kv_store.get<kv::untyped::Map>(map_name) != nullptr);
}
{
INFO("Subsequent transactions over dynamic map are persisted");
auto tx4 = kv_store.create_tx();
auto view4 = tx4.get_view2<MapTypes::StringString>(map_name);
const auto v = view4->get("foo");
REQUIRE(v.has_value());
REQUIRE(v.value() == "baz");
}
}
TEST_CASE(
"Dynamic table visibility by version" * doctest::test_suite("dynamic"))
{
kv::Store kv_store;
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv_store.set_encryptor(encryptor);
auto& static_map = kv_store.create<MapTypes::StringString>("static_map");
constexpr auto map_name = "dynamic_map";
auto tx1 = kv_store.create_tx();
auto tx2 = kv_store.create_tx();
auto tx3 = kv_store.create_tx();
auto tx4 = kv_store.create_tx();
auto view1 = tx1.get_view2<MapTypes::StringString>(map_name);
view1->put("foo", "bar");
// Map created in tx1 is not visible
auto view2 = tx2.get_view2<MapTypes::StringString>(map_name);
REQUIRE(!view2->get("foo").has_value());
// tx3 takes a read dependency at an early version, before the map is visible
auto view3_static = tx3.get_view(static_map);
REQUIRE(tx1.commit() == kv::CommitSuccess::OK);
// Even after commit, the new map is not visible to tx3 because it is reading
// from an earlier version
auto view3 = tx3.get_view2<MapTypes::StringString>(map_name);
REQUIRE(!view3->get("foo").has_value());
// Map created in tx1 is visible, because tx4 first _reads_ (creates a
// view) after tx1 has committed
auto view4 = tx4.get_view2<MapTypes::StringString>(map_name);
REQUIRE(view4->get("foo").has_value());
}
TEST_CASE("Mixed map dependencies" * doctest::test_suite("dynamic"))
{
kv::Store kv_store;
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv_store.set_encryptor(encryptor);
constexpr auto dynamic_map_a = "dynamic_map_a";
constexpr auto dynamic_map_b = "dynamic_map_b";
auto& static_map = kv_store.create<MapTypes::StringString>("static_map");
SUBCASE("Parallel independent map creation")
{
auto tx1 = kv_store.create_tx();
auto tx2 = kv_store.create_tx();
auto view1 = tx1.get_view2<MapTypes::NumString>(dynamic_map_a);
auto view2 = tx2.get_view2<MapTypes::StringNum>(dynamic_map_b);
view1->put(42, "hello");
view2->put("hello", 42);
REQUIRE(tx1.commit() == kv::CommitSuccess::OK);
REQUIRE(tx2.commit() == kv::CommitSuccess::OK);
}
SUBCASE("Map creation blocked by standard conflict")
{
constexpr auto key = "foo";
auto tx1 = kv_store.create_tx();
{
auto view1 = tx1.get_view(static_map);
const auto v = view1->get(key); // Introduce read-dependency
view1->put(key, "bar");
auto dynamic_view = tx1.get_view2<MapTypes::NumString>(dynamic_map_a);
dynamic_view->put(42, "hello world");
}
auto tx2 = kv_store.create_tx();
{
auto view2 = tx2.get_view(static_map);
const auto v = view2->get(key); // Introduce read-dependency
view2->put(key, "bar");
auto dynamic_view = tx2.get_view2<MapTypes::StringNum>(dynamic_map_b);
dynamic_view->put("hello world", 42);
}
REQUIRE(tx1.commit() == kv::CommitSuccess::OK);
REQUIRE(tx2.commit() == kv::CommitSuccess::CONFLICT);
{
auto tx3 = kv_store.create_tx();
auto [view1, view2] =
tx3.get_view2<MapTypes::NumString, MapTypes::StringNum>(
dynamic_map_a, dynamic_map_b);
const auto v1 = view1->get(42);
REQUIRE(v1.has_value());
REQUIRE(v1.value() == "hello world");
const auto v2 = view2->get("hello world");
REQUIRE_FALSE(v2.has_value());
}
REQUIRE(kv_store.get<kv::untyped::Map>(dynamic_map_a) != nullptr);
REQUIRE(kv_store.get<MapTypes::StringNum>(dynamic_map_b) == nullptr);
}
}
TEST_CASE("Dynamic map serialisation" * doctest::test_suite("dynamic"))
{
auto consensus = std::make_shared<kv::StubConsensus>();
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv::Store kv_store(consensus);
kv_store.set_encryptor(encryptor);
kv::Store kv_store_target;
kv_store_target.set_encryptor(encryptor);
const auto map_name = "new_map";
const auto key = "foo";
const auto value = "bar";
{
INFO("Commit a map creation in source store");
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>(map_name);
view->put(key, value);
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
}
{
INFO("Deserialise transaction in target store");
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(
kv_store_target.deserialise(latest_data.value()) ==
kv::DeserialiseSuccess::PASS);
auto tx_target = kv_store_target.create_tx();
auto view_target = tx_target.get_view2<MapTypes::StringString>(map_name);
const auto v = view_target->get(key);
REQUIRE(v.has_value());
REQUIRE(v.value() == value);
}
}
TEST_CASE("Dynamic map snapshot serialisation" * doctest::test_suite("dynamic"))
{
kv::Store store;
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
store.set_encryptor(encryptor);
constexpr auto map_name = "string_map";
kv::Version snapshot_version;
INFO("Create maps in original store");
{
auto tx1 = store.create_tx();
auto view_1 = tx1.get_view2<MapTypes::StringString>(map_name);
view_1->put("foo", "foo");
REQUIRE(tx1.commit() == kv::CommitSuccess::OK);
auto tx2 = store.create_tx();
auto view_2 = tx2.get_view2<MapTypes::StringString>(map_name);
view_2->put("bar", "bar");
REQUIRE(tx2.commit() == kv::CommitSuccess::OK);
snapshot_version = tx2.commit_version();
}
INFO("Create snapshot of original store");
auto snapshot = store.snapshot(snapshot_version);
auto serialised_snapshot = store.serialise_snapshot(std::move(snapshot));
INFO("Apply snapshot to create maps in new store");
{
kv::Store new_store;
new_store.set_encryptor(encryptor);
new_store.deserialise_snapshot(serialised_snapshot);
auto tx = new_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>(map_name);
const auto foo_v = view->get("foo");
REQUIRE(foo_v.has_value());
REQUIRE(foo_v.value() == "foo");
const auto bar_v = view->get("bar");
REQUIRE(bar_v.has_value());
REQUIRE(bar_v.value() == "bar");
}
}
TEST_CASE("Mid rollback safety" * doctest::test_suite("dynamic"))
{
kv::Store kv_store;
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv_store.set_encryptor(encryptor);
constexpr auto map_name = "my_new_map";
const auto version_before = kv_store.current_version();
{
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>(map_name);
view->put("foo", "bar");
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
}
{
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>(map_name);
const auto v_0 = view->get("foo");
REQUIRE(v_0.has_value());
REQUIRE(v_0.value() == "bar");
// Rollbacks may happen while a tx is executing, and these can delete the
// maps this tx is executing over
kv_store.rollback(version_before);
const auto v_1 = view->get("foo");
REQUIRE(v_0.has_value());
REQUIRE(v_0.value() == "bar");
auto view_after = tx.get_view2<MapTypes::StringString>(map_name);
REQUIRE(view_after == view);
view->put("foo", "baz");
const auto result = tx.commit();
REQUIRE(result == kv::CommitSuccess::CONFLICT);
}
}
TEST_CASE(
"Security domain is determined by map name" * doctest::test_suite("dynamic"))
{
kv::Store kv_store;
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
kv_store.set_encryptor(encryptor);
{
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>("public:foo");
view->put("foo", "bar");
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
}
{
auto tx = kv_store.create_tx();
auto view = tx.get_view2<MapTypes::StringString>("foo");
view->put("hello", "world");
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
}
{
auto public_map = kv_store.get<kv::untyped::Map>("public:foo");
REQUIRE(public_map != nullptr);
REQUIRE(public_map->get_security_domain() == kv::SecurityDomain::PUBLIC);
auto private_map = kv_store.get<kv::untyped::Map>("foo");
REQUIRE(private_map != nullptr);
REQUIRE(private_map->get_security_domain() == kv::SecurityDomain::PRIVATE);
}
{
auto tx = kv_store.create_tx();
auto [public_view, private_view] =
tx.get_view2<MapTypes::StringString, MapTypes::StringString>(
"public:foo", "foo");
// These are _different views_ over _different maps_
REQUIRE(public_view != private_view);
const auto pub_v = public_view->get("foo");
REQUIRE(pub_v.has_value());
REQUIRE(pub_v.value() == "bar");
const auto priv_v = private_view->get("hello");
REQUIRE(priv_v.has_value());
REQUIRE(priv_v.value() == "world");
}
}

Просмотреть файл

@ -1029,4 +1029,4 @@ TEST_CASE("Conflict resolution")
// Re-running a _committed_ transaction is exceptionally bad
REQUIRE_THROWS(tx1.commit());
REQUIRE_THROWS(tx2.commit());
}
}

Просмотреть файл

@ -14,6 +14,8 @@ namespace kv
class BaseTx : public AbstractViewContainer
{
protected:
AbstractStore* store = nullptr;
OrderedViews view_list;
bool committed = false;
bool success = false;
@ -23,6 +25,8 @@ namespace kv
kv::TxHistory::RequestID req_id;
std::map<std::string, std::shared_ptr<AbstractMap>> created_maps;
template <class M>
std::tuple<typename M::TxView*> get_tuple(M& m)
{
@ -70,7 +74,104 @@ namespace kv
"View over map {} is not an AbstractTxView", m.get_name()));
}
view_list[m.get_name()] = {
&m, std::unique_ptr<AbstractTxView>(abstract_view)};
m.shared_from_this(), std::unique_ptr<AbstractTxView>(abstract_view)};
return std::make_tuple(typed_view);
}
template <class M>
std::tuple<typename M::TxView*> get_tuple2(const std::string& map_name)
{
if (store == nullptr)
{
throw std::logic_error("New form called on old-style Tx");
}
using MapView = typename M::TxView;
// If the M is present, its AbstractTxView should be an M::TxView. This
// invariant could be broken by set_view_list, which will produce an error
// here
auto search = view_list.find(map_name);
if (search != view_list.end())
{
auto view = dynamic_cast<MapView*>(search->second.view.get());
if (view == nullptr)
{
throw std::logic_error(
fmt::format("View over map {} is not of expected type", map_name));
}
return std::make_tuple(view);
}
if (read_version == NoVersion)
{
// Grab opacity version that all Maps should be queried at.
auto txid = store->current_txid();
term = txid.term;
read_version = txid.version;
}
MapView* typed_view = nullptr;
auto abstract_map = store->get_map(read_version, map_name);
if (abstract_map == nullptr)
{
// Store doesn't know this map yet - create it dynamically
{
const auto map_it = created_maps.find(map_name);
if (map_it != created_maps.end())
{
throw std::logic_error("Created map without creating view over it");
}
}
// NB: The created maps are always untyped. Only the views over them are
// typed
auto new_map = std::make_shared<kv::untyped::Map>(
store,
map_name,
kv::get_security_domain(map_name),
store->is_map_replicated(map_name));
created_maps[map_name] = new_map;
LOG_DEBUG_FMT("Creating new map '{}'", map_name);
abstract_map = new_map;
typed_view = new_map->template create_view<MapView>(read_version);
}
else
{
auto* am = abstract_map.get();
auto typed_map = dynamic_cast<M*>(am);
if (typed_map == nullptr)
{
auto untyped_map = dynamic_cast<kv::untyped::Map*>(am);
if (untyped_map == nullptr)
{
throw std::logic_error(
fmt::format("Map {} has unexpected type", map_name));
}
else
{
typed_view =
untyped_map->template create_view<MapView>(read_version);
}
}
else
{
typed_view = typed_map->template create_view<MapView>(read_version);
}
}
auto abstract_view = dynamic_cast<AbstractTxView*>(typed_view);
if (abstract_view == nullptr)
{
throw std::logic_error(
fmt::format("View over map {} is not an AbstractTxView", map_name));
}
view_list[map_name] = {abstract_map,
std::unique_ptr<AbstractTxView>(abstract_view)};
return std::make_tuple(typed_view);
}
@ -81,6 +182,14 @@ namespace kv
return std::tuple_cat(get_tuple(m), get_tuple(ms...));
}
template <class M, class... Ms, class... Ts>
std::tuple<typename M::TxView*, typename Ms::TxView*...> get_tuple2(
const std::string& map_names, const Ts&... names)
{
return std::tuple_cat(
get_tuple2<M>(map_names), get_tuple<Ms...>(names...));
}
void reset()
{
view_list.clear();
@ -93,6 +202,7 @@ namespace kv
public:
BaseTx() : view_list() {}
BaseTx(AbstractStore* _store) : store(_store) {}
BaseTx(const BaseTx& that) = delete;
@ -152,8 +262,18 @@ namespace kv
}
auto store = view_list.begin()->second.map->get_store();
auto c =
apply_views(view_list, [store]() { return store->next_version(); });
// If this transaction may create maps, ensure that commit gets a
// consistent view of the existing maps
if (!created_maps.empty())
this->store->lock();
auto c = apply_views(
view_list, [store]() { return store->next_version(); }, created_maps);
if (!created_maps.empty())
this->store->unlock();
success = c.has_value();
if (!success)
@ -360,6 +480,12 @@ namespace kv
return std::get<0>(get_tuple(m));
}
template <class M>
typename M::TxView* get_view2(const std::string& map_name)
{
return std::get<0>(get_tuple2<M>(map_name));
}
/** Get transaction views over multiple maps.
*
* @param m Map
@ -371,5 +497,48 @@ namespace kv
{
return std::tuple_cat(get_tuple(m), get_tuple(ms...));
}
template <class M, class... Ms, class... Ts>
std::tuple<typename M::TxView*, typename Ms::TxView*...> get_view2(
const std::string& map_name, const Ts&... names)
{
return std::tuple_cat(
get_tuple2<M>(map_name), get_tuple2<Ms...>(names...));
}
};
// Used by frontend for reserved transactions. These are constructed with a
// pre-reserved Version, and _must succeed_ to fulfil this version, else
// creating a hole in the history
class ReservedTx : public Tx
{
public:
ReservedTx(AbstractStore* _store, Version reserved)
{
store = _store;
committed = false;
success = false;
read_version = reserved - 1;
version = reserved;
}
// Used by frontend to commit reserved transactions
PendingTxInfo commit_reserved()
{
if (committed)
throw std::logic_error("Transaction already committed");
if (view_list.empty())
throw std::logic_error("Reserved transaction cannot be empty");
auto c = apply_views(view_list, [this]() { return version; });
success = c.has_value();
if (!success)
throw std::logic_error("Failed to commit reserved transaction");
committed = true;
return {CommitSuccess::OK, {0, 0, 0}, serialise()};
}
};
}

Просмотреть файл

@ -11,8 +11,8 @@ namespace kv
{
struct MapView
{
// Weak pointer to source map
AbstractMap* map;
// Shared ownership over source map
std::shared_ptr<AbstractMap> map;
// Owning pointer of TxView over that map
std::unique_ptr<AbstractTxView> view;
@ -22,6 +22,10 @@ namespace kv
// stable order to avoid deadlocks. This ordered map will claim in name-order
using OrderedViews = std::map<std::string, MapView>;
// All collections of Map must be ordered so that we lock their contained
// maps in a stable order. The order here is by map name
using MapCollection = std::map<std::string, std::shared_ptr<AbstractMap>>;
struct AbstractViewContainer
{
virtual ~AbstractViewContainer() = default;
@ -32,7 +36,9 @@ namespace kv
// to their underlying Maps. Calls f() at most once, iff the writes are
// applied, to retrieve a unique Version for the write set.
static inline std::optional<Version> apply_views(
OrderedViews& views, std::function<Version()> f)
OrderedViews& views,
std::function<Version()> f,
const MapCollection& new_maps = {})
{
// All maps with pending writes are locked, transactions are prepared
// and possibly committed, and then all maps with pending writes are
@ -61,11 +67,36 @@ namespace kv
}
}
for (const auto& [map_name, map_ptr] : new_maps)
{
// Check that none of these pending maps have already been created.
// It is possible for non-conflicting other transactions to commit here
// and increment the version, so we may ask this question at different
// versions. This is fine - none can create maps (ie - change their
// conflict set with this operation) while we hold the store lock. Assume
// that the caller is currently holding store->lock()
auto store = map_ptr->get_store();
if (store->get_map(store->current_version(), map_name) != nullptr)
{
ok = false;
break;
}
}
if (ok && has_writes)
{
// Get the version number to be used for this commit.
version = f();
// Transfer ownership of these new maps to their target stores, iff we
// have writes to them
for (const auto& [map_name, map_ptr] : new_maps)
{
const auto it = views.find(map_name);
if (it != views.end() && it->second.view->has_writes())
map_ptr->get_store()->add_dynamic_map(version, map_ptr);
}
for (auto it = views.begin(); it != views.end(); ++it)
it->second.view->commit(version);
@ -76,7 +107,9 @@ namespace kv
for (auto it = views.begin(); it != views.end(); ++it)
{
if (it->second.view->has_writes())
{
it->second.map->unlock();
}
}
if (!ok)