Historical queries: Add support for fetching sets of seqnos (#3221)

This commit is contained in:
Eddy Ashton 2021-12-01 09:25:42 +00:00 коммит произвёл GitHub
Родитель 40275162ab
Коммит f0a7d405ec
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 2087 добавлений и 332 удалений

Просмотреть файл

@ -268,6 +268,7 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/thread_messaging.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/thread_messaging.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/lru.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/lru.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/hex.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/hex.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/contiguous_set.cpp
) )
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT}) target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})

Просмотреть файл

@ -338,7 +338,7 @@
"info": { "info": {
"description": "This CCF sample app implements a simple logging application, securely recording messages at client-specified IDs. It demonstrates most of the features available to CCF apps.", "description": "This CCF sample app implements a simple logging application, securely recording messages at client-specified IDs. It demonstrates most of the features available to CCF apps.",
"title": "CCF Sample Logging App", "title": "CCF Sample Logging App",
"version": "1.2.0" "version": "1.3.0"
}, },
"openapi": "3.0.0", "openapi": "3.0.0",
"paths": { "paths": {
@ -714,6 +714,45 @@
] ]
} }
}, },
"/log/private/historical/sparse": {
"get": {
"parameters": [
{
"in": "query",
"name": "seqnos",
"required": true,
"schema": {
"$ref": "#/components/schemas/string"
}
},
{
"in": "query",
"name": "id",
"required": true,
"schema": {
"$ref": "#/components/schemas/uint64"
}
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/LoggingGetHistoricalRange__Out"
}
}
},
"description": "Default response description"
}
},
"security": [
{
"jwt": []
}
]
}
},
"/log/private/historical_receipt": { "/log/private/historical_receipt": {
"get": { "get": {
"parameters": [ "parameters": [

Просмотреть файл

@ -5,6 +5,7 @@
#include "ccf/receipt.h" #include "ccf/receipt.h"
#include "ccf/tx_id.h" #include "ccf/tx_id.h"
#include "consensus/ledger_enclave_types.h" #include "consensus/ledger_enclave_types.h"
#include "ds/contiguous_set.h"
#include "kv/store.h" #include "kv/store.h"
#include "node/history.h" #include "node/history.h"
#include "node/tx_receipt.h" #include "node/tx_receipt.h"
@ -52,6 +53,8 @@ namespace ccf::historical
using ExpiryDuration = std::chrono::seconds; using ExpiryDuration = std::chrono::seconds;
using SeqNoCollection = ds::ContiguousSet<ccf::SeqNo>;
/** Stores the progress of historical query requests. /** Stores the progress of historical query requests.
* *
* A request will generally need to be made multiple times (with the same * A request will generally need to be made multiple times (with the same
@ -152,6 +155,24 @@ namespace ccf::historical
virtual std::vector<StatePtr> get_state_range( virtual std::vector<StatePtr> get_state_range(
RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno) = 0; RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno) = 0;
/** Retrieve stores for a set of given indices.
*/
virtual std::vector<StorePtr> get_stores_for(
RequestHandle handle,
const SeqNoCollection& seqnos,
ExpiryDuration seconds_until_expiry) = 0;
virtual std::vector<StorePtr> get_stores_for(
RequestHandle handle, const SeqNoCollection& seqnos) = 0;
/** Retrieve states for a set of given indices.
*/
virtual std::vector<StatePtr> get_states_for(
RequestHandle handle,
const SeqNoCollection& seqnos,
ExpiryDuration seconds_until_expiry) = 0;
virtual std::vector<StatePtr> get_states_for(
RequestHandle handle, const SeqNoCollection& seqnos) = 0;
/** Drop state for the given handle. /** Drop state for the given handle.
* *
* May be used to free up space once a historical query has been resolved, * May be used to free up space once a historical query has been resolved,

Просмотреть файл

@ -1072,6 +1072,185 @@ namespace loggingapp
ccf::endpoints::ExecuteOutsideConsensus::Locally) ccf::endpoints::ExecuteOutsideConsensus::Locally)
.install(); .install();
static constexpr auto get_historical_sparse_path =
"/log/private/historical/sparse";
auto get_historical_sparse = [&, this](
ccf::endpoints::EndpointContext& ctx) {
// Parse arguments from query
const auto parsed_query =
http::parse_query(ctx.rpc_ctx->get_request_query());
std::string error_reason;
size_t id;
if (!http::get_query_value(parsed_query, "id", id, error_reason))
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidQueryParameterValue,
std::move(error_reason));
return;
}
std::vector<size_t> seqnos;
{
std::string seqnos_s;
if (!http::get_query_value(
parsed_query, "seqnos", seqnos_s, error_reason))
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidQueryParameterValue,
std::move(error_reason));
return;
}
const auto terms = nonstd::split(seqnos_s, ",");
for (const auto& term : terms)
{
size_t val;
const auto [p, ec] = std::from_chars(term.begin(), term.end(), val);
if (ec != std::errc() || p != term.end())
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidQueryParameterValue,
fmt::format("Unable to parse '{}' as a seqno", term));
return;
}
seqnos.push_back(val);
}
}
// End of range must be committed
if (consensus == nullptr)
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
"Node is not fully operational");
return;
}
std::sort(seqnos.begin(), seqnos.end());
const auto final_seqno = seqnos.back();
const auto view_of_final_seqno = consensus->get_view(final_seqno);
const auto committed_seqno = consensus->get_committed_seqno();
const auto committed_view = consensus->get_view(committed_seqno);
const auto tx_status = ccf::evaluate_tx_status(
view_of_final_seqno,
final_seqno,
view_of_final_seqno,
committed_view,
committed_seqno);
if (tx_status != ccf::TxStatus::Committed)
{
ctx.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidInput,
fmt::format(
"Only committed transactions can be queried. Transaction {}.{} "
"is {}",
view_of_final_seqno,
final_seqno,
ccf::tx_status_to_str(tx_status)));
return;
}
// NB: Currently ignoring pagination, as this endpoint is temporary
// Use hash of request as RequestHandle. WARNING: This means identical
// requests from different users will collide, and overwrite each
// other's progress!
auto make_handle = [](size_t begin, size_t end, size_t id) {
auto size = sizeof(begin) + sizeof(end) + sizeof(id);
std::vector<uint8_t> v(size);
auto data = v.data();
serialized::write(data, size, begin);
serialized::write(data, size, end);
serialized::write(data, size, id);
return std::hash<decltype(v)>()(v);
};
ccf::historical::RequestHandle handle;
{
std::hash<size_t> h;
handle = h(id);
for (const auto& seqno : seqnos)
{
ds::hashutils::hash_combine(handle, seqno, h);
}
}
// Fetch the requested range
auto& historical_cache = context.get_historical_state();
ccf::historical::SeqNoCollection seqno_collection(
seqnos.begin(), seqnos.end());
auto stores = historical_cache.get_stores_for(handle, seqno_collection);
if (stores.empty())
{
ctx.rpc_ctx->set_response_status(HTTP_STATUS_ACCEPTED);
static constexpr size_t retry_after_seconds = 3;
ctx.rpc_ctx->set_response_header(
http::headers::RETRY_AFTER, retry_after_seconds);
ctx.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::TEXT);
ctx.rpc_ctx->set_response_body(fmt::format(
"Historical transactions are not yet available, fetching now"));
return;
}
// Process the fetched Stores
LoggingGetHistoricalRange::Out response;
for (const auto& store : stores)
{
auto historical_tx = store->create_read_only_tx();
auto records_handle =
historical_tx.template ro<RecordsMap>(PRIVATE_RECORDS);
const auto v = records_handle->get(id);
if (v.has_value())
{
LoggingGetHistoricalRange::Entry e;
e.seqno = store->current_txid().version;
e.id = id;
e.msg = v.value();
response.entries.push_back(e);
}
// This response do not include any entry when the given key wasn't
// modified at this seqno. It could instead indicate that the store
// was checked with an empty tombstone object, but this approach gives
// smaller responses
}
// Construct the HTTP response
nlohmann::json j_response = response;
ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK);
ctx.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON);
ctx.rpc_ctx->set_response_body(j_response.dump());
// ALSO: Assume this response makes it all the way to the client, and
// they're finished with it, so we can drop the retrieved state. In a
// real app this may be driven by a separate client request or an LRU
historical_cache.drop_cached_states(handle);
};
make_endpoint(
get_historical_sparse_path,
HTTP_GET,
get_historical_sparse,
auth_policies)
.set_auto_schema<void, LoggingGetHistoricalRange::Out>()
.add_query_parameter<std::string>("seqnos")
.add_query_parameter<size_t>("id")
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.set_execute_outside_consensus(
ccf::endpoints::ExecuteOutsideConsensus::Locally)
.install();
auto record_admin_only = [this]( auto record_admin_only = [this](
ccf::endpoints::EndpointContext& ctx, ccf::endpoints::EndpointContext& ctx,
nlohmann::json&& params) { nlohmann::json&& params) {
@ -1191,7 +1370,7 @@ namespace loggingapp
"This CCF sample app implements a simple logging application, securely " "This CCF sample app implements a simple logging application, securely "
"recording messages at client-specified IDs. It demonstrates most of " "recording messages at client-specified IDs. It demonstrates most of "
"the features available to CCF apps."; "the features available to CCF apps.";
logger_handlers.openapi_info.document_version = "1.2.0"; logger_handlers.openapi_info.document_version = "1.3.0";
} }
}; };
} }

348
src/ds/contiguous_set.h Normal file
Просмотреть файл

@ -0,0 +1,348 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include <numeric>
#include <vector>
namespace ds
{
// Dense representation of an ordered set of values, assuming it contains
// some contiguous ranges of adjacent values. Stores a sequence of ranges,
// rather than individual values.
template <typename T>
class ContiguousSet
{
public:
// Ranges are represented by their first value, and a count of additional
// values. This disallows negative ranges
using Range = std::pair<T, size_t>;
using Ranges = std::vector<Range>;
// Define an iterator for accessing each contained element, rather than the
// ranges
template <typename RangeIt>
struct TIterator
{
RangeIt it;
size_t offset = 0;
TIterator(RangeIt i, size_t o = 0) : it(i), offset(o) {}
bool operator==(const TIterator& other) const
{
return (it == other.it && offset == other.offset);
}
bool operator!=(const TIterator& other) const
{
return !(*this == other);
}
TIterator& operator++()
{
++offset;
if (offset > it->second)
{
++it;
offset = 0;
}
return (*this);
}
TIterator operator++(int)
{
auto temp(*this);
++(*this);
return temp;
}
T operator*() const
{
return it->first + offset;
}
};
using ConstIterator = TIterator<typename Ranges::const_iterator>;
private:
Ranges ranges;
template <typename It>
void populate_ranges(It first, It end)
{
if (!std::is_sorted(first, end))
{
throw std::logic_error("Range must be sorted");
}
ranges.clear();
while (first != end)
{
auto next = std::adjacent_find(
first, end, [](const T& a, const T& b) { return a + 1 != b; });
if (next == end)
{
ranges.emplace_back(*first, size_t(std::distance(first, end)) - 1);
break;
}
ranges.emplace_back(*first, size_t(std::distance(first, next)));
first = std::next(next);
}
}
void maybe_merge_with_following(typename Ranges::iterator it)
{
if (it != ranges.end())
{
auto next_it = std::next(it);
if (next_it != ranges.end())
{
if (it->first + it->second + 1 == next_it->first)
{
it->second = it->second + 1 + next_it->second;
ranges.erase(next_it);
}
}
}
}
void maybe_merge_with_following(typename Ranges::reverse_iterator it)
{
if (it != ranges.rend())
{
maybe_merge_with_following(std::next(it).base());
}
}
typename Ranges::const_iterator find_internal(const T& t) const
{
Range estimated_range{t, 0};
auto it = std::lower_bound(ranges.begin(), ranges.end(), estimated_range);
if (it != ranges.end())
{
// If lower_bound found {t, n}, then return that result
if (it->first == t)
{
return it;
}
}
// else, most of the time, we found {x, n}, where x > t. Check if there
// is a previous range, and if that contains t
if (it != ranges.begin())
{
it = std::prev(it);
const T& from = it->first;
const T additional = it->second;
if (from + additional >= t)
{
return it;
}
}
return ranges.end();
}
public:
ContiguousSet() = default;
template <typename It>
ContiguousSet(It first, It end)
{
populate_ranges(first, end);
}
ContiguousSet(const T& from, size_t additional)
{
ranges.emplace_back(from, additional);
}
bool operator==(const ContiguousSet& other) const
{
return ranges == other.ranges;
}
bool operator!=(const ContiguousSet& other) const
{
return !(*this == other);
}
const Ranges& get_ranges() const
{
return ranges;
}
size_t size() const
{
return std::accumulate(
ranges.begin(), ranges.end(), 0u, [](size_t n, const Range& r) {
return n + r.second + 1;
});
}
bool empty() const
{
return ranges.empty();
}
bool insert(const T& t)
{
// Search backwards, to find the range with the highest starting point
// lower than this value. Offset by one, to find ranges adjacent to this
// value. eg - if inserting 5 into [{2, 1}, {6, 2}, {10, 2}], we want to
// find {6, 2}, and extend this range down by 1
const Range estimated_range(t + 1, 0);
auto it = std::lower_bound(
ranges.rbegin(), ranges.rend(), estimated_range, std::greater<>());
if (it != ranges.rend())
{
const T& from = it->first;
const T additional = it->second;
if (from <= t && t <= from + additional)
{
// Already present
return false;
}
else if (from + additional + 1 == t)
{
// Adjacent to the end of the existing range
it->second++;
maybe_merge_with_following(it);
return true;
}
else if (t + 1 == from)
{
// Precedes directly, extend this range by 1
it->first = t;
it->second++;
if (it != ranges.rend())
{
maybe_merge_with_following(std::next(it));
}
return true;
}
// Else fall through to emplace new entry
}
auto emplaced_it = ranges.emplace(it.base(), t, 0);
maybe_merge_with_following(emplaced_it);
return true;
}
bool erase(const T& t)
{
Range estimated_range{t, 0};
auto it = std::lower_bound(
ranges.begin(),
ranges.end(),
estimated_range,
// Custom comparator - ignore the second element
[](const Range& left, const Range& right) {
return left.first < right.first;
});
if (it != ranges.begin() && t != it->first)
{
it = std::prev(it);
}
if (it != ranges.end())
{
const T& from = it->first;
const T additional = it->second;
if (from <= t && t <= from + additional)
{
// Contained within this range
if (from == t)
{
if (additional == 0u)
{
// Remove range entirely
ranges.erase(it);
return true;
}
else
{
// Shrink start of range
++it->first;
--it->second;
return true;
}
}
else if (t == from + additional)
{
// Shrink end of range
--it->second;
return true;
}
else
{
const auto before = t - it->first - 1;
const auto after = it->first + it->second - t - 1;
it->second = before;
auto next_it = std::next(it);
ranges.emplace(next_it, t + 1, after);
return true;
}
}
}
return false;
}
void extend(const T& from, size_t additional)
{
for (auto n = from; n <= from + additional; ++n)
{
const auto b = insert(n);
}
}
bool contains(const T& t) const
{
return find_internal(t) != end();
}
ConstIterator find(const T& t) const
{
auto it = find_internal(t);
if (it != ranges.end())
{
return ConstIterator(it, t - it->first);
}
return end();
}
void clear()
{
ranges.clear();
}
T front() const
{
return ranges.front().first;
}
T back() const
{
const auto back = ranges.back();
return back.first + back.second;
}
ConstIterator begin() const
{
return ConstIterator(ranges.begin());
}
ConstIterator end() const
{
return ConstIterator(ranges.end());
}
};
}

Просмотреть файл

@ -0,0 +1,410 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "../contiguous_set.h"
#include <doctest/doctest.h>
#include <iostream>
#include <random>
template <typename T>
void test(T from, T to)
{
// Include a random fraction of the range
std::vector<T> sample;
for (auto i = from; i < to; ++i)
{
if (rand() % 3 != 0)
{
sample.emplace_back(i);
}
}
// Insert them in random order
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(sample.begin(), sample.end(), g);
ds::ContiguousSet<T> cs;
for (const auto& n : sample)
{
REQUIRE(cs.insert(n));
}
// Confirm that all are present, and retrieved in-order
REQUIRE(cs.size() == sample.size());
REQUIRE(cs.get_ranges().size() <= cs.size());
std::sort(sample.begin(), sample.end());
auto sample_it = sample.begin();
auto cs_it = cs.begin();
for (T n = from; n <= to; ++n)
{
if (sample_it != sample.end() && *sample_it == n)
{
REQUIRE(cs.contains(n));
REQUIRE(cs.find(n) != cs.end());
REQUIRE(n == *cs_it);
++sample_it;
++cs_it;
}
else
{
REQUIRE_FALSE(cs.contains(n));
REQUIRE(cs.find(n) == cs.end());
}
}
REQUIRE(cs_it == cs.end());
}
TEST_CASE_TEMPLATE(
"Contiguous set API" * doctest::test_suite("contiguousset"), T, size_t, int)
{
ds::ContiguousSet<T> cs;
const auto& ccs = cs;
T a, b, c;
if constexpr (std::is_same_v<T, size_t>)
{
a = 0;
b = 10;
c = 20;
}
else if constexpr (std::is_same_v<T, int>)
{
a = -10;
b = 0;
c = 10;
}
REQUIRE_FALSE(cs.erase(a));
REQUIRE_FALSE(cs.erase(b));
REQUIRE_FALSE(cs.erase(c));
REQUIRE(cs.size() == 0);
REQUIRE(ccs.size() == 0);
REQUIRE(cs.begin() == cs.end());
REQUIRE(ccs.begin() == ccs.end());
REQUIRE(cs.insert(b));
// ccs.insert({}); // insert is non-const
REQUIRE(cs.size() == 1);
REQUIRE(ccs.size() == 1);
REQUIRE(cs.begin() != cs.end());
REQUIRE(ccs.begin() != ccs.end());
REQUIRE(cs.front() == cs.back());
REQUIRE(ccs.front() == ccs.back());
// Insert again makes no change
REQUIRE_FALSE(cs.insert(b));
REQUIRE(cs.size() == 1);
REQUIRE(ccs.size() == 1);
REQUIRE(cs.begin() != cs.end());
REQUIRE(ccs.begin() != ccs.end());
REQUIRE(cs.front() == cs.back());
REQUIRE(ccs.front() == ccs.back());
{
ds::ContiguousSet<T> cs2(ccs);
REQUIRE(cs == cs2);
REQUIRE(cs2.erase(b));
REQUIRE(cs != cs2);
REQUIRE(cs2.insert(b));
REQUIRE(cs == cs2);
}
REQUIRE(cs.insert(a));
REQUIRE(cs.size() == 2);
REQUIRE(ccs.size() == 2);
REQUIRE(cs.begin() != cs.end());
REQUIRE(ccs.begin() != ccs.end());
REQUIRE(cs.front() != cs.back());
REQUIRE(ccs.front() != ccs.back());
REQUIRE(cs.contains(b));
REQUIRE(cs.contains(a));
REQUIRE_FALSE(cs.contains(c));
REQUIRE(cs.insert(c));
REQUIRE(cs.size() == 3);
REQUIRE(ccs.size() == 3);
REQUIRE(cs.begin() != cs.end());
REQUIRE(ccs.begin() != ccs.end());
REQUIRE(cs.front() != cs.back());
REQUIRE(ccs.front() != ccs.back());
REQUIRE(cs.contains(b));
REQUIRE(cs.contains(a));
REQUIRE(cs.contains(c));
REQUIRE(cs.erase(a));
REQUIRE_FALSE(cs.erase(a));
REQUIRE(cs.size() == 2);
REQUIRE(ccs.size() == 2);
REQUIRE(cs.begin() != cs.end());
REQUIRE(ccs.begin() != ccs.end());
REQUIRE(cs.front() != cs.back());
REQUIRE(ccs.front() != ccs.back());
REQUIRE(cs.contains(b));
REQUIRE_FALSE(cs.contains(a));
REQUIRE(cs.contains(c));
REQUIRE(cs.erase(c));
REQUIRE_FALSE(cs.erase(c));
REQUIRE(cs.size() == 1);
REQUIRE(ccs.size() == 1);
REQUIRE(cs.begin() != cs.end());
REQUIRE(ccs.begin() != ccs.end());
REQUIRE(cs.front() == cs.back());
REQUIRE(ccs.front() == ccs.back());
REQUIRE(cs.contains(b));
REQUIRE_FALSE(cs.contains(a));
REQUIRE_FALSE(cs.contains(c));
{
auto it = cs.begin();
REQUIRE(*it == b);
++it;
REQUIRE(it == cs.end());
}
{
auto it = cs.begin();
REQUIRE(*it == b);
it++;
REQUIRE(it == cs.end());
}
{
auto it = ccs.begin();
REQUIRE(*it == b);
++it;
REQUIRE(it == ccs.end());
}
{
auto it = ccs.begin();
REQUIRE(*it == b);
it++;
REQUIRE(it == ccs.end());
}
{
size_t count = 0;
for (auto n : cs)
{
REQUIRE(count++ == 0);
REQUIRE(n == b);
}
}
{
size_t count = 0;
for (auto n : ccs)
{
REQUIRE(count++ == 0);
REQUIRE(n == b);
}
}
REQUIRE(cs.erase(b));
// ccs.erase(b); // erase is non-const
REQUIRE(cs.size() == 0);
REQUIRE(cs.begin() == cs.end());
REQUIRE(cs.insert(b));
REQUIRE(cs.size() == 1);
REQUIRE(cs.begin() != cs.end());
cs.clear();
REQUIRE(cs.size() == 0);
REQUIRE(cs.begin() == cs.end());
REQUIRE_FALSE(cs.contains(b));
}
TEST_CASE("foo" * doctest::test_suite("contiguousset"))
{
ds::ContiguousSet<size_t> cs;
cs.insert(5);
cs.insert(6);
cs.insert(7);
cs.insert(8);
REQUIRE(cs.get_ranges().size() == 1);
}
TEST_CASE("Contiguous set explicit test" * doctest::test_suite("contiguousset"))
{
ds::ContiguousSet<size_t> cs;
REQUIRE(cs.insert(10));
REQUIRE(cs.insert(8));
REQUIRE(cs.insert(12));
REQUIRE(cs.size() == 3);
REQUIRE(cs.get_ranges().size() == 3);
REQUIRE(cs.find(10) != cs.end());
REQUIRE(cs.find(10).it->first == 10);
REQUIRE(cs.find(10).offset == 0);
REQUIRE(cs.find(9) == cs.end());
REQUIRE(cs.insert(11));
REQUIRE(cs.size() == 4);
REQUIRE(cs.get_ranges().size() == 2);
REQUIRE(cs.find(10) != cs.end());
REQUIRE(cs.find(10).it->first == 10);
REQUIRE(cs.find(10).offset == 0);
REQUIRE(cs.find(9) == cs.end());
REQUIRE(cs.insert(9));
REQUIRE(cs.size() == 5);
REQUIRE(cs.get_ranges().size() == 1);
REQUIRE(cs.find(10) != cs.end());
REQUIRE(cs.find(10).it->first == 8);
REQUIRE(cs.find(10).offset == 2);
REQUIRE(cs.find(9) != cs.end());
REQUIRE(cs.find(9).it->first == 8);
REQUIRE(cs.find(9).offset == 1);
REQUIRE(cs.erase(11));
REQUIRE_FALSE(cs.erase(11));
REQUIRE(cs.size() == 4);
REQUIRE(cs.get_ranges().size() == 2);
REQUIRE(cs.find(10) != cs.end());
REQUIRE(cs.erase(10));
REQUIRE_FALSE(cs.erase(10));
REQUIRE(cs.size() == 3);
REQUIRE(cs.get_ranges().size() == 2);
REQUIRE(cs.find(10) == cs.end());
REQUIRE(cs.erase(12));
REQUIRE_FALSE(cs.erase(12));
REQUIRE(cs.size() == 2);
REQUIRE(cs.get_ranges().size() == 1);
REQUIRE(cs.erase(8));
REQUIRE_FALSE(cs.erase(8));
REQUIRE(cs.size() == 1);
REQUIRE(cs.get_ranges().size() == 1);
REQUIRE(cs.insert(5));
REQUIRE(cs.insert(8));
REQUIRE(cs.insert(10));
REQUIRE(cs.insert(11));
REQUIRE(cs.size() == 5);
REQUIRE(cs.get_ranges().size() == 2);
REQUIRE(cs.find(5) != cs.end());
REQUIRE(cs.find(5).it->first == 5);
REQUIRE(cs.find(5).offset == 0);
REQUIRE(cs.find(9) != cs.end());
REQUIRE(cs.find(9).it->first == 8);
REQUIRE(cs.find(9).offset == 1);
cs.clear();
REQUIRE(cs.size() == 0);
REQUIRE(cs.get_ranges().size() == 0);
}
TEST_CASE("Contiguous set correctness" * doctest::test_suite("contiguousset"))
{
for (auto i = 0; i < 10; ++i)
{
test<size_t>(0, 20);
test<int>(0, 20);
test<int>(-20, 20);
}
}
TEST_CASE("Contiguous set extend" * doctest::test_suite("contiguousset"))
{
ds::ContiguousSet<size_t> cs;
// Distinct range at beginning
cs.extend(5, 1);
REQUIRE(cs.size() == 2);
REQUIRE(cs.get_ranges().size() == 1);
// Distinct range in middle
cs.extend(10, 1);
REQUIRE(cs.size() == 4);
REQUIRE(cs.get_ranges().size() == 2);
// Distinct range at end
cs.extend(15, 1);
REQUIRE(cs.size() == 6);
REQUIRE(cs.get_ranges().size() == 3);
SUBCASE("Distinct ranges")
{
cs.extend(1, 1);
REQUIRE(cs.size() == 8);
REQUIRE(cs.get_ranges().size() == 4);
cs.extend(8, 0);
REQUIRE(cs.size() == 9);
REQUIRE(cs.get_ranges().size() == 5);
cs.extend(13, 0);
REQUIRE(cs.size() == 10);
REQUIRE(cs.get_ranges().size() == 6);
cs.extend(20, 1);
REQUIRE(cs.size() == 12);
REQUIRE(cs.get_ranges().size() == 7);
}
SUBCASE("Overlapping ranges")
{
cs.extend(3, 1);
REQUIRE(cs.size() == 8);
REQUIRE(cs.get_ranges().size() == 3);
cs.extend(2, 4);
REQUIRE(cs.size() == 9);
REQUIRE(cs.get_ranges().size() == 3);
cs.extend(7, 1);
REQUIRE(cs.size() == 11);
REQUIRE(cs.get_ranges().size() == 3);
cs.extend(7, 2);
REQUIRE(cs.size() == 12);
REQUIRE(cs.get_ranges().size() == 2);
cs.extend(7, 3);
REQUIRE(cs.size() == 12);
REQUIRE(cs.get_ranges().size() == 2);
REQUIRE_FALSE(cs.contains(1));
for (auto n = 2; n <= 11; ++n)
{
REQUIRE(cs.contains(n));
}
for (auto n = 12; n <= 14; ++n)
{
REQUIRE_FALSE(cs.contains(n));
}
REQUIRE(cs.contains(15));
REQUIRE(cs.contains(16));
REQUIRE_FALSE(cs.contains(17));
cs.extend(9, 6);
REQUIRE(cs.size() == 15);
REQUIRE(cs.get_ranges().size() == 1);
}
SUBCASE("Overlapping and containing ranges")
{
cs.extend(9, 3);
REQUIRE(cs.size() == 8);
REQUIRE(cs.get_ranges().size() == 3);
cs.extend(2, 11);
REQUIRE(cs.size() == 14);
REQUIRE(cs.get_ranges().size() == 2);
cs.extend(1, 20);
REQUIRE(cs.size() == 21);
REQUIRE(cs.get_ranges().size() == 1);
}
}

Просмотреть файл

@ -23,6 +23,10 @@ namespace kv::test
private: private:
std::vector<BatchVector::value_type> replica; std::vector<BatchVector::value_type> replica;
ConsensusType consensus_type; ConsensusType consensus_type;
ccf::TxID committed_txid = {};
ccf::View current_view = 0;
ccf::SeqNo last_signature = 0;
public: public:
aft::ViewHistory view_history; aft::ViewHistory view_history;
@ -39,9 +43,18 @@ namespace kv::test
{ {
replica.push_back(entry); replica.push_back(entry);
const auto& [v, data, committable, hooks] = entry;
// Simplification: all entries are replicated in the same term // Simplification: all entries are replicated in the same term
view_history.update(std::get<0>(entry), view); view_history.update(v, view);
if (committable)
{
// All committable indices are instantly committed
committed_txid = {view, v};
} }
}
current_view = view;
return true; return true;
} }
@ -83,7 +96,7 @@ namespace kv::test
std::pair<ccf::View, ccf::SeqNo> get_committed_txid() override std::pair<ccf::View, ccf::SeqNo> get_committed_txid() override
{ {
return {0, 0}; return {committed_txid.view, committed_txid.seqno};
} }
std::optional<SignableTxIndices> get_signable_txid() override std::optional<SignableTxIndices> get_signable_txid() override
@ -92,13 +105,13 @@ namespace kv::test
SignableTxIndices r; SignableTxIndices r;
r.term = txid.first; r.term = txid.first;
r.version = txid.second; r.version = txid.second;
r.previous_version = 0; r.previous_version = last_signature;
return r; return r;
} }
ccf::SeqNo get_committed_seqno() override ccf::SeqNo get_committed_seqno() override
{ {
return 0; return committed_txid.seqno;
} }
std::optional<NodeId> primary() override std::optional<NodeId> primary() override
@ -128,7 +141,7 @@ namespace kv::test
ccf::View get_view() override ccf::View get_view() override
{ {
return 0; return current_view;
} }
std::vector<ccf::SeqNo> get_view_history(ccf::SeqNo seqno) override std::vector<ccf::SeqNo> get_view_history(ccf::SeqNo seqno) override
@ -203,6 +216,11 @@ namespace kv::test
{ {
return consensus_type; return consensus_type;
} }
void set_last_signature_at(ccf::SeqNo seqno)
{
last_signature = seqno;
}
}; };
class BackupStubConsensus : public StubConsensus class BackupStubConsensus : public StubConsensus

Просмотреть файл

@ -17,6 +17,12 @@
#include <memory> #include <memory>
#include <set> #include <set>
#ifdef ENABLE_HISTORICAL_VERBOSE_LOGGING
# define HISTORICAL_LOG(...) LOG_INFO_FMT(__VA_ARGS__)
#else
# define HISTORICAL_LOG(...)
#endif
namespace ccf::historical namespace ccf::historical
{ {
static std::optional<ccf::PrimarySignature> get_signature( static std::optional<ccf::PrimarySignature> get_signature(
@ -96,8 +102,8 @@ namespace ccf::historical
struct Request struct Request
{ {
ccf::SeqNo first_requested_seqno = 0; ccf::SeqNo first_requested_seqno = 0;
ccf::SeqNo last_requested_seqno = 0; SeqNoCollection requested_seqnos;
std::vector<StoreDetailsPtr> requested_stores; std::map<ccf::SeqNo, StoreDetailsPtr> requested_stores;
std::chrono::milliseconds time_to_expiry; std::chrono::milliseconds time_to_expiry;
bool include_receipts; bool include_receipts;
@ -105,8 +111,7 @@ namespace ccf::historical
// Entries from outside the requested range (such as the next signature) // Entries from outside the requested range (such as the next signature)
// may be needed to produce receipts. They are stored here, distinct from // may be needed to produce receipts. They are stored here, distinct from
// user-requested stores. // user-requested stores.
std::optional<std::pair<ccf::SeqNo, StoreDetailsPtr>> std::map<ccf::SeqNo, StoreDetailsPtr> supporting_signatures;
supporting_signature;
// Only set when recovering ledger secrets // Only set when recovering ledger secrets
std::unique_ptr<LedgerSecretRecoveryInfo> ledger_secret_recovery_info = std::unique_ptr<LedgerSecretRecoveryInfo> ledger_secret_recovery_info =
@ -116,20 +121,16 @@ namespace ccf::historical
StoreDetailsPtr get_store_details(ccf::SeqNo seqno) const StoreDetailsPtr get_store_details(ccf::SeqNo seqno) const
{ {
if (seqno >= first_requested_seqno && seqno <= last_requested_seqno) auto it = requested_stores.find(seqno);
if (it != requested_stores.end())
{ {
const auto offset = seqno - first_requested_seqno; return it->second;
if (static_cast<size_t>(offset) < requested_stores.size())
{
return requested_stores[offset];
}
} }
if ( auto supporting_it = supporting_signatures.find(seqno);
supporting_signature.has_value() && if (supporting_it != supporting_signatures.end())
supporting_signature->first == seqno)
{ {
return supporting_signature->second; return supporting_it->second;
} }
return nullptr; return nullptr;
@ -143,93 +144,83 @@ namespace ccf::historical
// 2 3 4 5 // 2 3 4 5
// and then we adjust to: // and then we adjust to:
// 4 5 // 4 5
// we don't need to fetch anything new; this is a subrange, we just need // we don't need to fetch anything new; this is a subrange. But if we
// to shift where these are in our requested_stores vector. But if we
// adjust to: // adjust to:
// 0 1 2 3 4 5 6 // 0 1 2 3 4 5 6
// we need to shift _and_ start fetching 0, 1, and 6. // we need to start fetching 0, 1, and 6.
std::set<SeqNoRange> adjust_range( SeqNoCollection adjust_ranges(
ccf::SeqNo start_seqno, const SeqNoCollection& new_seqnos, bool should_include_receipts)
size_t num_following_indices,
bool should_include_receipts)
{ {
HISTORICAL_LOG(
"Adjusting ranges, previously {}, new {} ({} vs {})",
requested_seqnos.size(),
new_seqnos.size(),
include_receipts,
should_include_receipts);
if ( if (
start_seqno == first_requested_seqno && new_seqnos == requested_seqnos &&
(num_following_indices + 1) == requested_stores.size() &&
should_include_receipts == include_receipts) should_include_receipts == include_receipts)
{ {
// This is precisely the range we're already tracking - do nothing // This is precisely the request we're already tracking - do nothing
HISTORICAL_LOG("Already have this range");
return {}; return {};
} }
std::set<SeqNoRange> ret; std::set<SeqNo> newly_requested;
std::optional<SeqNoRange> current_range = std::nullopt; std::map<ccf::SeqNo, StoreDetailsPtr> new_stores;
std::vector<StoreDetailsPtr> new_stores(num_following_indices + 1);
for (auto seqno = start_seqno; seqno <= for (auto seqno : new_seqnos)
static_cast<ccf::SeqNo>(start_seqno + num_following_indices);
++seqno)
{ {
auto existing_details = get_store_details(seqno); auto existing_details = get_store_details(seqno);
if (existing_details == nullptr) if (existing_details == nullptr)
{ {
if (current_range.has_value()) newly_requested.insert(seqno);
{ new_stores[seqno] = std::make_shared<StoreDetails>();
if (current_range->second + 1 == seqno) HISTORICAL_LOG("{} is new", seqno);
{
current_range->second = seqno;
} }
else else
{ {
ret.insert(*current_range); new_stores[seqno] = std::move(existing_details);
current_range.reset(); HISTORICAL_LOG("Found {} already", seqno);
} }
} }
if (!current_range.has_value())
{
current_range = std::make_pair(seqno, seqno);
}
new_stores[seqno - start_seqno] = std::make_shared<StoreDetails>();
}
else
{
new_stores[seqno - start_seqno] = std::move(existing_details);
}
}
if (current_range.has_value())
{
ret.insert(*current_range);
}
requested_stores = std::move(new_stores); requested_stores = std::move(new_stores);
first_requested_seqno = start_seqno; first_requested_seqno = new_seqnos.front();
last_requested_seqno = first_requested_seqno + num_following_indices;
// If the final entry in the new range is known and not a signature,
// then we may need a subsequent signature to support it (or an earlier
// entry received out-of-order!) So start fetching subsequent entries to
// find supporting signature. It's possible this was the supporting
// entry we already had, or a signature in the range we already had, but
// working that out is tricky so be pessimistic and refetch instead.
supporting_signature.reset();
if (should_include_receipts)
{
const auto last_details = get_store_details(last_requested_seqno);
if (last_details->store != nullptr && !last_details->is_signature)
{
const auto next_seqno = last_requested_seqno + 1;
supporting_signature =
std::make_pair(next_seqno, std::make_shared<StoreDetails>());
ret.emplace(next_seqno, next_seqno);
}
}
// If the range has changed, forget what ledger secrets we may have been // If the range has changed, forget what ledger secrets we may have been
// fetching - the caller can begin asking for them again // fetching - the caller can begin asking for them again
ledger_secret_recovery_info = nullptr; ledger_secret_recovery_info = nullptr;
const auto newly_requested_receipts =
should_include_receipts && !include_receipts;
requested_seqnos = new_seqnos;
include_receipts = should_include_receipts; include_receipts = should_include_receipts;
return ret; HISTORICAL_LOG(
"Clearing {} supporting signatures", supporting_signatures.size());
supporting_signatures.clear();
if (newly_requested_receipts)
{
// If requesting signatures, populate receipts for each entry that we
// already have. Normally this would be done when each entry was
// received, but in the case that we have the entries already and only
// request signatures now, we delay that work to now.
for (auto seqno : new_seqnos)
{
const auto next_seqno = populate_receipts(seqno);
if (next_seqno.has_value())
{
newly_requested.insert(*next_seqno);
supporting_signatures[*next_seqno] =
std::make_shared<StoreDetails>();
}
}
}
return SeqNoCollection(newly_requested.begin(), newly_requested.end());
} }
enum class PopulateReceiptsResult enum class PopulateReceiptsResult
@ -243,18 +234,28 @@ namespace ccf::historical
FetchNext, FetchNext,
}; };
PopulateReceiptsResult populate_receipts(ccf::SeqNo new_seqno) std::optional<ccf::SeqNo> populate_receipts(ccf::SeqNo new_seqno)
{ {
HISTORICAL_LOG(
"Looking at {}, and populating receipts from it", new_seqno);
auto new_details = get_store_details(new_seqno); auto new_details = get_store_details(new_seqno);
if (new_details->store != nullptr)
{
if (new_details->is_signature) if (new_details->is_signature)
{ {
HISTORICAL_LOG("{} is a signature", new_seqno);
// Iterate through earlier indices. If this signature covers them // Iterate through earlier indices. If this signature covers them
// then create a receipt for them // then create a receipt for them
const auto sig = get_signature(new_details->store); const auto sig = get_signature(new_details->store);
ccf::MerkleTreeHistory tree(get_tree(new_details->store).value()); ccf::MerkleTreeHistory tree(get_tree(new_details->store).value());
for (auto seqno = first_requested_seqno; seqno < new_seqno; ++seqno) for (auto seqno : requested_seqnos)
{ {
if (seqno >= new_seqno)
{
break;
}
if (tree.in_range(seqno)) if (tree.in_range(seqno))
{ {
auto details = get_store_details(seqno); auto details = get_store_details(seqno);
@ -268,83 +269,157 @@ namespace ccf::historical
sig->node, sig->node,
sig->cert); sig->cert);
details->transaction_id = {sig->view, seqno}; details->transaction_id = {sig->view, seqno};
HISTORICAL_LOG(
"Assigned a sig for {} after given signature at {}",
seqno,
new_seqno);
} }
} }
} }
} }
else
{
HISTORICAL_LOG("{} is not a signature", new_seqno);
const auto sig_it = supporting_signatures.find(new_seqno);
if (sig_it != supporting_signatures.end())
{
// This was a search for a supporting signature, but this entry is
// _not_ a signature - fetch the next
// NB: We skip any entries we already have here. It is possible we
// are fetching 10, previously had entries at 13, 14, 15, and the
// signature for all of these is at 20. The supporting signature
// for 10 tries 11, then 12. Next, it should try 16, not 13.
auto next_seqno = new_seqno + 1;
while (requested_seqnos.contains(next_seqno))
{
++next_seqno;
}
HISTORICAL_LOG(
"{} was a supporting signature attempt, fetch next {}",
new_seqno,
next_seqno);
return {next_seqno};
}
else if (new_details->receipt == nullptr) else if (new_details->receipt == nullptr)
{ {
HISTORICAL_LOG(
"{} also has no receipt - looking for later signature",
new_seqno);
// Iterate through later indices, see if there's a signature that // Iterate through later indices, see if there's a signature that
// covers this one // covers this one
const auto& untrusted_digest = new_details->entry_digest; const auto& untrusted_digest = new_details->entry_digest;
bool sig_seen = false; bool sig_seen = false;
for (auto seqno = new_seqno + 1; seqno <= last_requested_seqno; std::optional<ccf::SeqNo> end_of_matching_range = std::nullopt;
for (const auto& [first_seqno, additional] :
requested_seqnos.get_ranges())
{
if (first_seqno + additional < new_seqno)
{
HISTORICAL_LOG(
"Ignoring range starting at {} - too early", first_seqno);
continue;
}
if (!end_of_matching_range.has_value())
{
end_of_matching_range = first_seqno + additional;
}
for (auto seqno = first_seqno;
seqno <= first_seqno + additional;
++seqno) ++seqno)
{ {
if (seqno <= new_seqno)
{
HISTORICAL_LOG("Ignoring {} - too early", seqno);
continue;
}
auto details = get_store_details(seqno); auto details = get_store_details(seqno);
if (details != nullptr) if (details != nullptr)
{ {
if (details->store != nullptr && details->is_signature) if (details->store != nullptr && details->is_signature)
{ {
const auto sig = get_signature(details->store); const auto sig = get_signature(details->store);
ccf::MerkleTreeHistory tree(get_tree(details->store).value()); ccf::MerkleTreeHistory tree(
get_tree(details->store).value());
if (tree.in_range(new_seqno)) if (tree.in_range(new_seqno))
{ {
auto proof = tree.get_proof(new_seqno); auto proof = tree.get_proof(new_seqno);
details->receipt = std::make_shared<TxReceipt>( new_details->receipt = std::make_shared<TxReceipt>(
sig->sig, sig->sig,
proof.get_root(), proof.get_root(),
proof.get_path(), proof.get_path(),
sig->node, sig->node,
sig->cert); sig->cert);
details->transaction_id = {sig->view, new_seqno}; new_details->transaction_id = {sig->view, new_seqno};
return std::nullopt;
} }
// Break here - if this signature doesn't cover us, no later // Break here - if this signature doesn't cover us, no
// one can // later one can
sig_seen = true; sig_seen = true;
HISTORICAL_LOG(
"Found a sig for {} at {}", new_seqno, seqno);
break; break;
} }
} }
} }
if (!sig_seen && supporting_signature.has_value()) if (sig_seen)
{ {
const auto& [seqno, details] = *supporting_signature; break;
}
}
if (!sig_seen)
{
auto sig_it = supporting_signatures.lower_bound(new_seqno);
if (sig_it != supporting_signatures.end())
{
const auto& [sig_seqno, details] = *sig_it;
HISTORICAL_LOG(
"Considering a supporting signature for {} at {}",
new_seqno,
sig_seqno);
if (details->store != nullptr && details->is_signature) if (details->store != nullptr && details->is_signature)
{ {
const auto sig = get_signature(details->store); const auto sig = get_signature(details->store);
ccf::MerkleTreeHistory tree(get_tree(details->store).value()); ccf::MerkleTreeHistory tree(
get_tree(details->store).value());
if (tree.in_range(new_seqno)) if (tree.in_range(new_seqno))
{ {
auto proof = tree.get_proof(new_seqno); auto proof = tree.get_proof(new_seqno);
details->receipt = std::make_shared<TxReceipt>( new_details->receipt = std::make_shared<TxReceipt>(
sig->sig, sig->sig,
proof.get_root(), proof.get_root(),
proof.get_path(), proof.get_path(),
sig->node, sig->node,
sig->cert); sig->cert);
details->transaction_id = {sig->view, new_seqno}; new_details->transaction_id = {sig->view, new_seqno};
}
} }
} }
} }
// If still have no receipt, and this non-signature is the last // If still have no receipt, after considering every larger value
// requested seqno, or a previous attempt at finding supporting // we have, and the best-guess at a supporting signature, then we
// signature, request the _next_ seqno to find supporting signature // may need to fetch another supporting signature. Request the
if (new_details->receipt == nullptr) // first entry after the range
{
if ( if (
new_seqno == last_requested_seqno || new_details->receipt == nullptr &&
(supporting_signature.has_value() && end_of_matching_range.has_value())
supporting_signature->first == new_seqno))
{ {
return PopulateReceiptsResult::FetchNext; HISTORICAL_LOG(
"Still nothing, better fetch {}",
end_of_matching_range.value() + 1);
return {end_of_matching_range.value() + 1};
}
} }
} }
} }
return PopulateReceiptsResult::Continue; return std::nullopt;
} }
}; };
@ -465,8 +540,12 @@ namespace ccf::historical
{ {
// Newly have all required secrets - begin fetching the actual // Newly have all required secrets - begin fetching the actual
// entries // entries
for (const auto& [first_requested_seqno, num_following] :
request.requested_seqnos.get_ranges())
{
fetch_entries_range( fetch_entries_range(
request.first_requested_seqno, request.last_requested_seqno); first_requested_seqno, first_requested_seqno + num_following);
}
} }
// In either case, done with this request, try the next // In either case, done with this request, try the next
@ -508,24 +587,15 @@ namespace ccf::historical
if (request.include_receipts) if (request.include_receipts)
{ {
const auto result = request.populate_receipts(seqno); const auto next_seqno = request.populate_receipts(seqno);
switch (result) if (next_seqno.has_value())
{
case (Request::PopulateReceiptsResult::Continue):
{ {
request.supporting_signatures.erase(seqno);
fetch_entry_at(*next_seqno);
request.supporting_signatures[*next_seqno] =
std::make_shared<StoreDetails>();
}
++request_it; ++request_it;
break;
}
case (Request::PopulateReceiptsResult::FetchNext):
{
const auto next_seqno = seqno + 1;
fetch_entry_at(next_seqno);
request.supporting_signature =
std::make_pair(next_seqno, std::make_shared<StoreDetails>());
++request_it;
break;
}
}
} }
} }
else else
@ -566,15 +636,9 @@ namespace ccf::historical
return true; return true;
} }
std::vector<StatePtr> get_state_range_internal( SeqNoCollection collection_from_single_range(
RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
ccf::SeqNo start_seqno,
ccf::SeqNo end_seqno,
ExpiryDuration seconds_until_expiry,
bool include_receipts)
{ {
std::lock_guard<std::mutex> guard(requests_lock);
if (end_seqno < start_seqno) if (end_seqno < start_seqno)
{ {
throw std::logic_error(fmt::format( throw std::logic_error(fmt::format(
@ -583,7 +647,17 @@ namespace ccf::historical
start_seqno)); start_seqno));
} }
const auto num_following_indices = end_seqno - start_seqno; SeqNoCollection c(start_seqno, end_seqno - start_seqno);
return c;
}
std::vector<StatePtr> get_states_internal(
RequestHandle handle,
const SeqNoCollection& seqnos,
ExpiryDuration seconds_until_expiry,
bool include_receipts)
{
std::lock_guard<std::mutex> guard(requests_lock);
const auto ms_until_expiry = const auto ms_until_expiry =
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::duration_cast<std::chrono::milliseconds>(
@ -594,14 +668,14 @@ namespace ccf::historical
{ {
// This is a new handle - insert a newly created Request for it // This is a new handle - insert a newly created Request for it
it = requests.emplace_hint(it, handle, Request()); it = requests.emplace_hint(it, handle, Request());
HISTORICAL_LOG("First time I've seen handle {}", handle);
} }
Request& request = it->second; Request& request = it->second;
// Update this Request to represent the currently requested range, // Update this Request to represent the currently requested ranges,
// returning any newly requested indices // returning any newly requested indices
auto new_index_ranges = request.adjust_range( auto new_seqnos = request.adjust_ranges(seqnos, include_receipts);
start_seqno, num_following_indices, include_receipts);
// If the earliest target entry cannot be deserialised with the earliest // If the earliest target entry cannot be deserialised with the earliest
// known ledger secret, record the target seqno and begin fetching the // known ledger secret, record the target seqno and begin fetching the
@ -623,9 +697,9 @@ namespace ccf::historical
// If we have sufficiently early secrets, begin fetching any newly // If we have sufficiently early secrets, begin fetching any newly
// requested entries. If we don't fall into this branch, they'll only // requested entries. If we don't fall into this branch, they'll only
// begin to be fetched once the secret arrives. // begin to be fetched once the secret arrives.
for (const auto& [from, to] : new_index_ranges) for (const auto& [start_seqno, additional] : new_seqnos.get_ranges())
{ {
fetch_entries_range(from, to); fetch_entries_range(start_seqno, start_seqno + additional);
} }
} }
@ -634,17 +708,20 @@ namespace ccf::historical
std::vector<StatePtr> trusted_states; std::vector<StatePtr> trusted_states;
for (ccf::SeqNo seqno = start_seqno; seqno <= for (auto seqno : seqnos)
static_cast<ccf::SeqNo>(start_seqno + num_following_indices);
++seqno)
{ {
auto target_details = request.get_store_details(seqno); auto target_details = request.get_store_details(seqno);
if (target_details == nullptr)
{
throw std::logic_error("Request isn't tracking state for seqno");
}
if ( if (
target_details->current_stage == RequestStage::Trusted && target_details->current_stage == RequestStage::Trusted &&
(!request.include_receipts || target_details->receipt != nullptr)) (!request.include_receipts || target_details->receipt != nullptr))
{ {
// Have this store, associated txid and receipt and trust it - add it // Have this store, associated txid and receipt and trust it - add
// to return list // it to return list
StatePtr state = std::make_shared<State>( StatePtr state = std::make_shared<State>(
target_details->store, target_details->store,
target_details->receipt, target_details->receipt,
@ -680,6 +757,16 @@ namespace ccf::historical
} }
} }
std::vector<StorePtr> states_to_stores(const std::vector<StatePtr>& states)
{
std::vector<StorePtr> stores;
for (size_t i = 0; i < states.size(); i++)
{
stores.push_back(states[i]->store);
}
return stores;
}
public: public:
StateCache( StateCache(
kv::Store& store, kv::Store& store,
@ -737,14 +824,11 @@ namespace ccf::historical
ccf::SeqNo end_seqno, ccf::SeqNo end_seqno,
ExpiryDuration seconds_until_expiry) override ExpiryDuration seconds_until_expiry) override
{ {
auto range = get_state_range_internal( return states_to_stores(get_states_internal(
handle, start_seqno, end_seqno, seconds_until_expiry, false); handle,
std::vector<StorePtr> stores; collection_from_single_range(start_seqno, end_seqno),
for (size_t i = 0; i < range.size(); i++) seconds_until_expiry,
{ false));
stores.push_back(range[i]->store);
}
return stores;
} }
std::vector<StorePtr> get_store_range( std::vector<StorePtr> get_store_range(
@ -762,9 +846,11 @@ namespace ccf::historical
ccf::SeqNo end_seqno, ccf::SeqNo end_seqno,
ExpiryDuration seconds_until_expiry) override ExpiryDuration seconds_until_expiry) override
{ {
auto range = get_state_range_internal( return get_states_internal(
handle, start_seqno, end_seqno, seconds_until_expiry, true); handle,
return range; collection_from_single_range(start_seqno, end_seqno),
seconds_until_expiry,
true);
} }
std::vector<StatePtr> get_state_range( std::vector<StatePtr> get_state_range(
@ -776,6 +862,39 @@ namespace ccf::historical
handle, start_seqno, end_seqno, default_expiry_duration); handle, start_seqno, end_seqno, default_expiry_duration);
} }
std::vector<StorePtr> get_stores_for(
RequestHandle handle,
const SeqNoCollection& seqnos,
ExpiryDuration seconds_until_expiry) override
{
return states_to_stores(
get_states_internal(handle, seqnos, seconds_until_expiry, false));
}
std::vector<StorePtr> get_stores_for(
RequestHandle handle, const SeqNoCollection& seqnos) override
{
return get_stores_for(handle, seqnos, default_expiry_duration);
}
std::vector<StatePtr> get_states_for(
RequestHandle handle,
const SeqNoCollection& seqnos,
ExpiryDuration seconds_until_expiry) override
{
if (seqnos.empty())
{
throw std::runtime_error("Cannot request empty range");
}
return get_states_internal(handle, seqnos, seconds_until_expiry, true);
}
std::vector<StatePtr> get_states_for(
RequestHandle handle, const SeqNoCollection& seqnos) override
{
return get_states_for(handle, seqnos, default_expiry_duration);
}
void set_default_expiry_duration(ExpiryDuration duration) override void set_default_expiry_duration(ExpiryDuration duration) override
{ {
default_expiry_duration = duration; default_expiry_duration = duration;
@ -903,7 +1022,7 @@ namespace ccf::historical
const auto is_signature = const auto is_signature =
deserialise_result == kv::ApplyResult::PASS_SIGNATURE; deserialise_result == kv::ApplyResult::PASS_SIGNATURE;
LOG_DEBUG_FMT( HISTORICAL_LOG(
"Processing historical store at {} ({})", "Processing historical store at {} ({})",
seqno, seqno,
(size_t)deserialise_result); (size_t)deserialise_result);

Просмотреть файл

@ -55,7 +55,11 @@ namespace ccf
COMPACT COMPACT
}; };
#ifdef OVERRIDE_MAX_HISTORY_LEN
constexpr int MAX_HISTORY_LEN = OVERRIDE_MAX_HISTORY_LEN;
#else
constexpr int MAX_HISTORY_LEN = 1000; constexpr int MAX_HISTORY_LEN = 1000;
#endif
static std::ostream& operator<<(std::ostream& os, HashOp flag) static std::ostream& operator<<(std::ostream& os, HashOp flag)
{ {

Просмотреть файл

@ -192,6 +192,36 @@ namespace ccf
return {}; return {};
} }
std::vector<historical::StorePtr> get_stores_for(
historical::RequestHandle handle,
const historical::SeqNoCollection& seqnos,
historical::ExpiryDuration seconds_until_expiry)
{
return {};
}
std::vector<historical::StorePtr> get_stores_for(
historical::RequestHandle handle,
const historical::SeqNoCollection& seqnos)
{
return {};
}
std::vector<historical::StatePtr> get_states_for(
historical::RequestHandle handle,
const historical::SeqNoCollection& seqnos,
historical::ExpiryDuration seconds_until_expiry)
{
return {};
}
std::vector<historical::StatePtr> get_states_for(
historical::RequestHandle handle,
const historical::SeqNoCollection& seqnos)
{
return {};
}
bool drop_cached_states(historical::RequestHandle handle) bool drop_cached_states(historical::RequestHandle handle)
{ {
return true; return true;

Просмотреть файл

@ -1,6 +1,10 @@
// Copyright (c) Microsoft Corporation. All rights reserved. // Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License. // Licensed under the Apache 2.0 License.
#define OVERRIDE_MAX_HISTORY_LEN 4
// Uncomment this to aid debugging
//#define ENABLE_HISTORICAL_VERBOSE_LOGGING
#include "node/historical_queries.h" #include "node/historical_queries.h"
#include "crypto/rsa_key_pair.h" #include "crypto/rsa_key_pair.h"
@ -163,6 +167,15 @@ kv::Version write_transactions_and_signature(
kv_store.get_history()->emit_signature(); kv_store.get_history()->emit_signature();
auto consensus =
dynamic_cast<kv::test::StubConsensus*>(kv_store.get_consensus().get());
REQUIRE(consensus != nullptr);
REQUIRE(consensus->get_committed_seqno() == kv_store.current_version());
consensus->set_last_signature_at(kv_store.current_version());
kv_store.compact(kv_store.current_version());
return kv_store.current_version(); return kv_store.current_version();
} }
@ -218,6 +231,20 @@ void validate_business_transaction(
REQUIRE(private_count == 1); REQUIRE(private_count == 1);
} }
void validate_business_transaction(
ccf::historical::StatePtr state, ccf::SeqNo seqno)
{
REQUIRE(state != nullptr);
validate_business_transaction(state->store, seqno);
REQUIRE(state->receipt != nullptr);
const auto state_txid = state->transaction_id;
const auto store_txid = state->store->current_txid();
REQUIRE(state_txid.view == store_txid.term);
REQUIRE(state_txid.seqno == store_txid.version);
}
std::map<ccf::SeqNo, std::vector<uint8_t>> construct_host_ledger( std::map<ccf::SeqNo, std::vector<uint8_t>> construct_host_ledger(
std::shared_ptr<kv::Consensus> c) std::shared_ptr<kv::Consensus> c)
{ {
@ -345,7 +372,7 @@ TEST_CASE("StateCache point queries")
auto state_at_seqno = cache.get_state_at(high_handle, high_seqno); auto state_at_seqno = cache.get_state_at(high_handle, high_seqno);
REQUIRE(state_at_seqno != nullptr); REQUIRE(state_at_seqno != nullptr);
validate_business_transaction(state_at_seqno->store, high_seqno); validate_business_transaction(state_at_seqno, high_seqno);
} }
{ {
@ -593,6 +620,71 @@ TEST_CASE("StateCache get store vs get state")
cache.drop_cached_states(default_handle); cache.drop_cached_states(default_handle);
} }
} }
{
INFO("Switching between range store requests and range state requests");
{
REQUIRE(cache.get_store_range(default_handle, seqno_a, seqno_b).empty());
REQUIRE(provide_ledger_entry_range(seqno_a, seqno_b));
REQUIRE_FALSE(
cache.get_store_range(default_handle, seqno_a, seqno_b).empty());
REQUIRE(cache.get_state_range(default_handle, seqno_a, seqno_b).empty());
REQUIRE(provide_ledger_entry_range(seqno_b + 1, signature_transaction));
auto states = cache.get_state_range(default_handle, seqno_a, seqno_b);
REQUIRE_FALSE(states.empty());
for (auto& state : states)
{
REQUIRE(state != nullptr);
REQUIRE(state->receipt != nullptr);
}
cache.drop_cached_states(default_handle);
}
{
REQUIRE(cache.get_state_range(default_handle, seqno_a, seqno_b).empty());
REQUIRE(provide_ledger_entry_range(seqno_a, signature_transaction));
auto states = cache.get_state_range(default_handle, seqno_a, seqno_b);
REQUIRE_FALSE(states.empty());
for (auto& state : states)
{
REQUIRE(state != nullptr);
REQUIRE(state->receipt != nullptr);
}
REQUIRE_FALSE(
cache.get_store_range(default_handle, seqno_a, seqno_b).empty());
states = cache.get_state_range(default_handle, seqno_a, seqno_b);
REQUIRE_FALSE(states.empty());
for (auto& state : states)
{
REQUIRE(state != nullptr);
REQUIRE(state->receipt != nullptr);
}
cache.drop_cached_states(default_handle);
}
{
REQUIRE(
cache.get_store_range(default_handle, seqno_a, signature_transaction)
.empty());
REQUIRE(provide_ledger_entry_range(seqno_a, signature_transaction));
REQUIRE_FALSE(
cache.get_store_range(default_handle, seqno_a, signature_transaction)
.empty());
auto states =
cache.get_state_range(default_handle, seqno_a, signature_transaction);
REQUIRE_FALSE(states.empty());
for (auto& state : states)
{
REQUIRE(state != nullptr);
REQUIRE(state->receipt != nullptr);
}
cache.drop_cached_states(default_handle);
}
}
} }
TEST_CASE("StateCache range queries") TEST_CASE("StateCache range queries")
@ -671,11 +763,10 @@ TEST_CASE("StateCache range queries")
const auto range_size = to_provide.size(); const auto range_size = to_provide.size();
REQUIRE(stores.size() == range_size); REQUIRE(stores.size() == range_size);
for (size_t i = 0; i < stores.size(); ++i) for (auto& store : stores)
{ {
auto& store = stores[i];
REQUIRE(store != nullptr); REQUIRE(store != nullptr);
const auto seqno = range_start + i; const auto seqno = store->current_version();
// Don't validate anything about signature transactions, just the // Don't validate anything about signature transactions, just the
// business transactions between them // business transactions between them
@ -715,6 +806,138 @@ TEST_CASE("StateCache range queries")
} }
} }
TEST_CASE("StateCache sparse queries")
{
auto state = create_and_init_state();
auto& kv_store = *state.kv_store;
std::vector<kv::Version> signature_versions;
const auto begin_seqno = kv_store.current_version() + 1;
{
INFO("Build some interesting state in the store");
for (size_t batch_size : {10, 5, 2, 20, 5})
{
signature_versions.push_back(
write_transactions_and_signature(kv_store, batch_size));
}
}
const auto end_seqno = kv_store.current_version();
ccf::historical::StateCache cache(
kv_store, state.ledger_secrets, std::make_shared<StubWriter>());
auto ledger = construct_host_ledger(state.kv_store->get_consensus());
auto provide_ledger_entry = [&](size_t i) {
bool accepted = cache.handle_ledger_entry(i, ledger.at(i));
return accepted;
};
auto signing_version = [&signature_versions](kv::Version seqno) {
const auto begin = signature_versions.begin();
const auto end = signature_versions.end();
const auto exact_it = std::find(begin, end, seqno);
if (exact_it != end)
{
return seqno;
}
const auto next_sig_it = std::upper_bound(begin, end, seqno);
REQUIRE(next_sig_it != end);
return *next_sig_it;
};
std::random_device rd;
std::mt19937 g(rd());
auto next_handle = 0;
auto fetch_and_validate_sparse_set =
[&](const ccf::historical::SeqNoCollection& seqnos) {
const auto this_handle = next_handle++;
{
auto stores = cache.get_stores_for(this_handle, seqnos);
REQUIRE(stores.empty());
}
// Cache is robust to receiving these out-of-order, so stress that by
// submitting out-of-order
std::vector<ccf::SeqNo> to_provide;
for (auto it = seqnos.begin(); it != seqnos.end(); ++it)
{
to_provide.emplace_back(*it);
}
std::shuffle(to_provide.begin(), to_provide.end(), g);
for (const auto seqno : to_provide)
{
// Some of these may be unrequested since they overlapped with the
// previous range so are already known. Provide them all blindly for
// simplicity, and make no assertion on the return code.
provide_ledger_entry(seqno);
}
{
auto stores = cache.get_stores_for(this_handle, seqnos);
REQUIRE(!stores.empty());
const auto range_size = to_provide.size();
REQUIRE(stores.size() == range_size);
for (auto& store : stores)
{
REQUIRE(store != nullptr);
const auto seqno = store->current_version();
// Don't validate anything about signature transactions, just the
// business transactions between them
if (
std::find(
signature_versions.begin(), signature_versions.end(), seqno) ==
signature_versions.end())
{
validate_business_transaction(store, seqno);
}
}
}
};
{
INFO("Fetch a single explicit sparse set");
ccf::historical::SeqNoCollection seqnos;
seqnos.insert(4);
seqnos.insert(5);
seqnos.insert(7);
seqnos.insert(9);
seqnos.insert(10);
seqnos.insert(11);
seqnos.insert(12);
seqnos.insert(13);
fetch_and_validate_sparse_set(seqnos);
}
{
INFO(
"Fetch sparse sets of various sizes, including across multiple "
"signatures");
for (size_t n = 0; n < 10; ++n)
{
ccf::historical::SeqNoCollection seqnos;
for (auto seqno = begin_seqno; seqno < end_seqno; ++seqno)
{
if (rand() % 3 == 0)
{
seqnos.insert(seqno);
}
}
fetch_and_validate_sparse_set(seqnos);
}
}
}
TEST_CASE("StateCache concurrent access") TEST_CASE("StateCache concurrent access")
{ {
auto state = create_and_init_state(); auto state = create_and_init_state();
@ -794,102 +1017,62 @@ TEST_CASE("StateCache concurrent access")
} }
}); });
constexpr auto per_thread_queries = 20; constexpr auto per_thread_queries = 30;
using Clock = std::chrono::system_clock; using Clock = std::chrono::system_clock;
// Add a watchdog timeout. Even in Debug+SAN this entire test takes <3 secs, // Add a watchdog timeout. Even in Debug+SAN this entire test takes <3 secs,
// so 10 seconds for any single entry is surely deadlock // so 10 seconds for any single entry is surely deadlock
const auto too_long = std::chrono::seconds(10); const auto too_long = std::chrono::seconds(3);
auto query_random_point = [&](size_t handle) { auto fetch_until_timeout = [&](
for (size_t i = 0; i < per_thread_queries; ++i) const auto& fetch_result,
{ const auto& check_result,
const auto target_seqno = random_seqno(); const auto& error_printer) {
ccf::historical::StatePtr state;
const auto start_time = Clock::now(); const auto start_time = Clock::now();
while (true) while (true)
{ {
state = cache.get_state_at(handle, target_seqno); fetch_result();
if (state != nullptr) if (check_result())
{ {
break; break;
} }
if (Clock::now() - start_time > too_long) if (Clock::now() - start_time > too_long)
{ {
error_printer();
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return true;
};
auto default_error_printer =
[&](
size_t handle,
size_t i,
const std::vector<std::string>& previously_requested) {
std::cout << fmt::format( std::cout << fmt::format(
"Thread <{}>, i [{}]: {} - still no answer!", "Thread <{}>, i [{}]: {} - still no answer!",
handle, handle,
i, i,
target_seqno) previously_requested.back())
<< std::endl;
REQUIRE(false);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (
std::find(
signature_versions.begin(), signature_versions.end(), target_seqno) ==
signature_versions.end())
{
validate_business_transaction(state->store, target_seqno);
}
}
};
auto query_random_range = [&](size_t handle) {
std::vector<std::pair<size_t, size_t>> requested;
for (size_t i = 0; i < per_thread_queries; ++i)
{
auto range_start = random_seqno();
auto range_end = random_seqno();
if (range_start > range_end)
{
std::swap(range_start, range_end);
}
requested.push_back(std::make_pair(range_start, range_end));
std::vector<ccf::historical::StorePtr> stores;
const auto start_time = Clock::now();
while (true)
{
stores = cache.get_store_range(handle, range_start, range_end);
if (!stores.empty())
{
break;
}
if (Clock::now() - start_time > too_long)
{
std::cout << fmt::format(
"Thread <{}>, i [{}]: {}-{} - still no answer!",
handle,
i,
range_start,
range_end)
<< std::endl; << std::endl;
std::cout << fmt::format( std::cout << fmt::format(
"I've previously used handle {} to request:", handle) "I've previously used handle {} to request:", handle)
<< std::endl; << std::endl;
for (const auto& [a, b] : requested) for (const auto& s : previously_requested)
{ {
std::cout << fmt::format(" {} to {}", a, b) << std::endl; std::cout << " " << s << std::endl;
}
REQUIRE(false);
} }
};
std::this_thread::sleep_for(std::chrono::milliseconds(1)); auto validate_all_stores =
} [&](const std::vector<ccf::historical::StorePtr>& stores) {
for (auto& store : stores)
REQUIRE(stores.size() == range_end - range_start + 1);
for (size_t i = 0; i < stores.size(); ++i)
{ {
auto& store = stores[i];
REQUIRE(store != nullptr); REQUIRE(store != nullptr);
const auto seqno = store->current_version(); const auto seqno = store->current_version();
if ( if (
@ -900,22 +1083,316 @@ TEST_CASE("StateCache concurrent access")
validate_business_transaction(store, seqno); validate_business_transaction(store, seqno);
} }
} }
};
auto validate_all_states =
[&](const std::vector<ccf::historical::StatePtr>& states) {
for (auto& state : states)
{
REQUIRE(state != nullptr);
const auto seqno = state->store->current_version();
if (
std::find(
signature_versions.begin(), signature_versions.end(), seqno) ==
signature_versions.end())
{
validate_business_transaction(state, seqno);
}
} }
}; };
const auto num_threads = 20; auto query_random_point_store =
std::atomic<size_t> next_handle = 0; [&](ccf::SeqNo target_seqno, size_t handle, const auto& error_printer) {
std::vector<std::thread> random_queries; ccf::historical::StorePtr store;
for (size_t i = 0; i < num_threads; ++i) auto fetch_result = [&]() {
store = cache.get_store_at(handle, target_seqno);
};
auto check_result = [&]() { return store != nullptr; };
REQUIRE(fetch_until_timeout(fetch_result, check_result, error_printer));
REQUIRE(store != nullptr);
validate_all_stores({store});
};
auto query_random_point_state =
[&](ccf::SeqNo target_seqno, size_t handle, const auto& error_printer) {
ccf::historical::StatePtr state;
auto fetch_result = [&]() {
state = cache.get_state_at(handle, target_seqno);
};
auto check_result = [&]() { return state != nullptr; };
REQUIRE(fetch_until_timeout(fetch_result, check_result, error_printer));
REQUIRE(state != nullptr);
validate_all_states({state});
};
auto query_random_range_stores = [&](
ccf::SeqNo range_start,
ccf::SeqNo range_end,
size_t handle,
const auto& error_printer) {
std::vector<ccf::historical::StorePtr> stores;
auto fetch_result = [&]() {
stores = cache.get_store_range(handle, range_start, range_end);
};
auto check_result = [&]() { return !stores.empty(); };
REQUIRE(fetch_until_timeout(fetch_result, check_result, error_printer));
REQUIRE(stores.size() == range_end - range_start + 1);
validate_all_stores(stores);
};
auto query_random_range_states = [&](
ccf::SeqNo range_start,
ccf::SeqNo range_end,
size_t handle,
const auto& error_printer) {
std::vector<ccf::historical::StatePtr> states;
auto fetch_result = [&]() {
states = cache.get_state_range(handle, range_start, range_end);
};
auto check_result = [&]() { return !states.empty(); };
REQUIRE(fetch_until_timeout(fetch_result, check_result, error_printer));
REQUIRE(states.size() == range_end - range_start + 1);
validate_all_states(states);
};
auto query_random_sparse_set_stores =
[&](
const ccf::historical::SeqNoCollection& seqnos,
size_t handle,
const auto& error_printer) {
std::vector<ccf::historical::StorePtr> stores;
auto fetch_result = [&]() {
stores = cache.get_stores_for(handle, seqnos);
};
auto check_result = [&]() { return !stores.empty(); };
REQUIRE(fetch_until_timeout(fetch_result, check_result, error_printer));
REQUIRE(stores.size() == seqnos.size());
validate_all_stores(stores);
};
auto query_random_sparse_set_states =
[&](
const ccf::historical::SeqNoCollection& seqnos,
size_t handle,
const auto& error_printer) {
std::vector<ccf::historical::StatePtr> states;
auto fetch_result = [&]() {
states = cache.get_states_for(handle, seqnos);
};
auto check_result = [&]() { return !states.empty(); };
REQUIRE(fetch_until_timeout(fetch_result, check_result, error_printer));
REQUIRE(states.size() == seqnos.size());
validate_all_states(states);
};
auto run_n_queries = [&](size_t handle) {
std::vector<std::string> previously_requested;
for (size_t i = 0; i < per_thread_queries; ++i)
{ {
if (i % 3 == 0) auto error_printer = [&]() {
default_error_printer(handle, i, previously_requested);
};
const auto query_kind = rand() % 3;
const bool store_or_state = rand() % 2;
const auto ss = store_or_state ? "Stores" : "States";
switch (query_kind)
{ {
random_queries.emplace_back(query_random_range, ++next_handle); case 0:
{
// Fetch a single point
const auto target_seqno = random_seqno();
previously_requested.push_back(
fmt::format("Point {} [{}]", target_seqno, ss));
if (store_or_state)
{
query_random_point_store(target_seqno, handle, error_printer);
} }
else else
{ {
random_queries.emplace_back(query_random_point, ++next_handle); query_random_point_store(target_seqno, handle, error_printer);
} }
break;
}
case 1:
{
// Fetch a single range
auto range_start = random_seqno();
auto range_end = random_seqno();
if (range_start > range_end)
{
std::swap(range_start, range_end);
}
previously_requested.push_back(
fmt::format("Range {}->{} [{}]", range_start, range_end, ss));
if (store_or_state)
{
query_random_range_stores(
range_start, range_end, handle, error_printer);
}
else
{
query_random_range_states(
range_start, range_end, handle, error_printer);
}
break;
}
case 2:
{
// Fetch a sparse set of ranges
auto range_start = random_seqno();
auto range_end = random_seqno();
if (range_start > range_end)
{
std::swap(range_start, range_end);
}
ccf::historical::SeqNoCollection seqnos;
seqnos.insert(range_start);
for (auto i = range_start; i != range_end; ++i)
{
if (i % 3 != 0)
{
seqnos.insert(i);
}
}
seqnos.insert(range_end);
std::vector<std::string> range_descriptions;
for (const auto& [from, additional] : seqnos.get_ranges())
{
range_descriptions.push_back(
fmt::format("{}->{}", from, from + additional));
}
previously_requested.push_back(fmt::format(
"Ranges {} [{}]", fmt::join(range_descriptions, ", "), ss));
if (store_or_state)
{
query_random_sparse_set_stores(seqnos, handle, error_printer);
}
else
{
query_random_sparse_set_states(seqnos, handle, error_printer);
}
break;
}
default:
{
throw std::logic_error("Oops, miscounted!");
}
}
}
};
// Explicitly test some problematic cases
{
std::vector<std::string> previously_requested;
const auto i = 0;
const auto handle = 42;
auto error_printer = [&]() {
default_error_printer(handle, i, previously_requested);
};
previously_requested.push_back("A");
query_random_range_states(9, 12, handle, error_printer);
ccf::historical::SeqNoCollection seqnos;
seqnos.insert(3);
seqnos.insert(9);
seqnos.insert(12);
previously_requested.push_back("B");
query_random_sparse_set_states(seqnos, handle, error_printer);
}
{
std::vector<std::string> previously_requested;
const auto i = 0;
const auto handle = 42;
auto error_printer = [&]() {
default_error_printer(handle, i, previously_requested);
};
previously_requested.push_back("A");
query_random_range_stores(3, 23, handle, error_printer);
previously_requested.push_back("B");
query_random_range_states(14, 17, handle, error_printer);
}
{
std::vector<std::string> previously_requested;
const auto i = 0;
const auto handle = 42;
auto error_printer = [&]() {
default_error_printer(handle, i, previously_requested);
};
ccf::historical::SeqNoCollection seqnos;
seqnos.insert(4);
seqnos.insert(5);
seqnos.insert(7);
seqnos.insert(8);
seqnos.insert(10);
seqnos.insert(11);
seqnos.insert(13);
seqnos.insert(14);
seqnos.insert(16);
previously_requested.push_back("A");
query_random_sparse_set_states(seqnos, handle, error_printer);
}
{
std::vector<std::string> previously_requested;
const auto i = 0;
const auto handle = 42;
auto error_printer = [&]() {
default_error_printer(handle, i, previously_requested);
};
{
ccf::historical::SeqNoCollection seqnos;
seqnos.insert(14);
seqnos.insert(16);
seqnos.insert(17);
seqnos.insert(19);
seqnos.insert(20);
seqnos.insert(21);
previously_requested.push_back("A");
query_random_sparse_set_states(seqnos, handle, error_printer);
}
{
ccf::historical::SeqNoCollection seqnos;
seqnos.insert(6);
seqnos.insert(7);
seqnos.insert(8);
seqnos.insert(10);
seqnos.insert(11);
seqnos.insert(12);
seqnos.insert(13);
seqnos.insert(14);
seqnos.insert(16);
seqnos.insert(17);
seqnos.insert(19);
seqnos.insert(20);
seqnos.insert(22);
previously_requested.push_back("B");
query_random_sparse_set_states(seqnos, handle, error_printer);
}
}
{
std::vector<std::string> previously_requested;
const auto i = 0;
const auto handle = 42;
auto error_printer = [&]() {
default_error_printer(handle, i, previously_requested);
};
ccf::historical::SeqNoCollection seqnos;
seqnos.insert(22);
seqnos.insert(23);
previously_requested.push_back("A");
query_random_sparse_set_states(seqnos, handle, error_printer);
previously_requested.push_back("B");
query_random_range_states(20, 23, handle, error_printer);
}
srand(time(NULL));
const auto num_threads = 30;
std::vector<std::thread> random_queries;
for (size_t i = 0; i < num_threads; ++i)
{
random_queries.emplace_back(run_n_queries, i);
} }
for (auto& thread : random_queries) for (auto& thread : random_queries)
@ -995,7 +1472,7 @@ TEST_CASE("Recover historical ledger secrets")
auto historical_state = cache.get_state_at(default_handle, third_seqno); auto historical_state = cache.get_state_at(default_handle, third_seqno);
REQUIRE(historical_state != nullptr); REQUIRE(historical_state != nullptr);
validate_business_transaction(historical_state->store, third_seqno); validate_business_transaction(historical_state, third_seqno);
} }
{ {
@ -1022,7 +1499,7 @@ TEST_CASE("Recover historical ledger secrets")
auto historical_state = cache.get_state_at(default_handle, second_seqno); auto historical_state = cache.get_state_at(default_handle, second_seqno);
REQUIRE(historical_state != nullptr); REQUIRE(historical_state != nullptr);
validate_business_transaction(historical_state->store, second_seqno); validate_business_transaction(historical_state, second_seqno);
} }
{ {
@ -1043,6 +1520,6 @@ TEST_CASE("Recover historical ledger secrets")
auto historical_state = cache.get_state_at(default_handle, first_seqno); auto historical_state = cache.get_state_at(default_handle, first_seqno);
REQUIRE(historical_state != nullptr); REQUIRE(historical_state != nullptr);
validate_business_transaction(historical_state->store, first_seqno); validate_business_transaction(historical_state, first_seqno);
} }
} }

Просмотреть файл

@ -794,6 +794,106 @@ def test_historical_query_range(network, args):
return network return network
@reqs.description("Read state at multiple distinct historical points")
@reqs.supports_methods("log/private", "log/private/historical/sparse")
def test_historical_query_sparse(network, args):
idx = 142
seqnos = []
primary, _ = network.find_primary()
with primary.client("user0") as c:
# Submit many transactions, overwriting the same ID
# Need to submit through network.txs so these can be verified at shutdown, but also need to submit one at a
# time to retrieve the submitted transactions
msgs = {}
n_entries = 100
for _ in range(n_entries):
network.txs.issue(
network,
repeat=True,
idx=idx,
wait_for_sync=False,
log_capture=[],
send_public=False,
)
_, tx = network.txs.get_last_tx(idx=idx)
msg = tx["msg"]
seqno = tx["seqno"]
view = tx["view"]
msgs[seqno] = msg
seqnos.append(seqno)
ccf.commit.wait_for_commit(c, seqno=seqnos[-1], view=view, timeout=3)
def get_sparse(client, target_id, seqnos, timeout=3):
seqnos_s = ",".join(str(n) for n in seqnos)
LOG.info(f"Getting historical entries: {seqnos_s}")
logs = []
start_time = time.time()
end_time = start_time + timeout
entries = {}
path = (
f"/app/log/private/historical/sparse?id={target_id}&seqnos={seqnos_s}"
)
while time.time() < end_time:
r = client.get(path, log_capture=logs)
if r.status_code == http.HTTPStatus.OK:
j_body = r.body.json()
for entry in j_body["entries"]:
assert entry["id"] == target_id, entry
entries[entry["seqno"]] = entry["msg"]
duration = time.time() - start_time
LOG.info(
f"Done! Fetched {len(entries)} entries in {duration:0.2f}s"
)
return entries, duration
elif r.status_code == http.HTTPStatus.ACCEPTED:
# Ignore retry-after header, retry soon
time.sleep(0.1)
continue
else:
LOG.error("Printing historical/sparse logs on unexpected status")
flush_info(logs, None)
raise ValueError(
f"Unexpected status code from historical sparse query: {r.status_code}"
)
LOG.error("Printing historical/sparse logs on timeout")
flush_info(logs, None)
raise TimeoutError(
f"Historical sparse query not available after {timeout}s"
)
entries_all, _ = get_sparse(c, idx, seqnos)
seqnos_a = [s for s in seqnos if random.random() < 0.7]
entries_a, _ = get_sparse(c, idx, seqnos_a)
seqnos_b = [s for s in seqnos if random.random() < 0.5]
entries_b, _ = get_sparse(c, idx, seqnos_b)
small_range = len(seqnos) // 20
seqnos_c = seqnos[:small_range] + seqnos[-small_range:]
entries_c, _ = get_sparse(c, idx, seqnos_c)
def check_presence(expected, entries, seqno):
if seqno in expected:
assert seqno in entries, f"Missing result for {seqno}"
assert (
entries[seqno] == msgs[seqno]
), f"{entries[seqno]} != {msgs[seqno]}"
for seqno in seqnos:
check_presence(seqnos, entries_all, seqno)
check_presence(seqnos_a, entries_a, seqno)
check_presence(seqnos_b, entries_b, seqno)
check_presence(seqnos_c, entries_c, seqno)
return network
def escaped_query_tests(c, endpoint): def escaped_query_tests(c, endpoint):
samples = [ samples = [
{"this": "that"}, {"this": "that"},
@ -1336,6 +1436,7 @@ def run(args):
network = test_random_receipts(network, args, False) network = test_random_receipts(network, args, False)
if args.package == "samples/apps/logging/liblogging": if args.package == "samples/apps/logging/liblogging":
network = test_receipts(network, args) network = test_receipts(network, args)
network = test_historical_query_sparse(network, args)
network = test_historical_receipts(network, args) network = test_historical_receipts(network, args)

Просмотреть файл

@ -79,6 +79,8 @@ class LoggingTxs:
idx=None, idx=None,
wait_for_sync=True, wait_for_sync=True,
log_capture=None, log_capture=None,
send_private=True,
send_public=True,
): ):
self.network = network self.network = network
remote_node, _ = network.find_primary(log_capture=log_capture) remote_node, _ = network.find_primary(log_capture=log_capture)
@ -100,6 +102,7 @@ class LoggingTxs:
if target_idx is None: if target_idx is None:
target_idx = self.idx target_idx = self.idx
if send_private:
priv_msg = f"Private message at idx {target_idx} [{len(self.priv[target_idx])}]" priv_msg = f"Private message at idx {target_idx} [{len(self.priv[target_idx])}]"
rep_priv = c.post( rep_priv = c.post(
"/app/log/private", "/app/log/private",
@ -111,12 +114,16 @@ class LoggingTxs:
log_capture=log_capture, log_capture=log_capture,
) )
self.priv[target_idx].append( self.priv[target_idx].append(
{"msg": priv_msg, "seqno": rep_priv.seqno, "view": rep_priv.view} {
"msg": priv_msg,
"seqno": rep_priv.seqno,
"view": rep_priv.view,
}
) )
wait_point = rep_priv
pub_msg = ( if send_public:
f"Public message at idx {target_idx} [{len(self.pub[target_idx])}]" pub_msg = f"Public message at idx {target_idx} [{len(self.pub[target_idx])}]"
)
rep_pub = c.post( rep_pub = c.post(
"/app/log/public", "/app/log/public",
{ {
@ -129,12 +136,13 @@ class LoggingTxs:
self.pub[target_idx].append( self.pub[target_idx].append(
{"msg": pub_msg, "seqno": rep_pub.seqno, "view": rep_pub.view} {"msg": pub_msg, "seqno": rep_pub.seqno, "view": rep_pub.view}
) )
wait_point = rep_pub
if number_txs and wait_for_sync: if number_txs and wait_for_sync:
check_commit(rep_pub, result=True) check_commit(wait_point, result=True)
if wait_for_sync: if wait_for_sync:
network.wait_for_all_nodes_to_commit( network.wait_for_all_nodes_to_commit(
tx_id=TxID(rep_pub.view, rep_pub.seqno) tx_id=TxID(wait_point.view, wait_point.seqno)
) )
def verify( def verify(