Add a system for retrieving, verifying, and caching historical state in each node (#1267)

This commit is contained in:
Eddy Ashton 2020-06-11 09:59:30 +01:00 коммит произвёл GitHub
Родитель be1f8bdcbc
Коммит 605cf504ed
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
27 изменённых файлов: 1167 добавлений и 130 удалений

Просмотреть файл

@ -307,6 +307,11 @@ if(BUILD_TESTS)
use_client_mbedtls(encryptor_test)
target_link_libraries(encryptor_test PRIVATE secp256k1.host)
add_unit_test(
historical_queries_test ${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/historical_queries.cpp
)
target_link_libraries(historical_queries_test PRIVATE secp256k1.host)
add_unit_test(
msgpack_serialization_test
${CMAKE_CURRENT_SOURCE_DIR}/src/node/test/msgpack_serialization.cpp

Просмотреть файл

@ -486,7 +486,7 @@ namespace ccfapp
};
std::shared_ptr<ccf::UserRpcFrontend> get_rpc_handler(
NetworkTables& nwt, AbstractNotifier& notifier)
NetworkTables& nwt, ccfapp::AbstractNodeContext& context)
{
return make_shared<SmallBank>(*nwt.tables);
}

Просмотреть файл

@ -299,7 +299,7 @@ namespace ccfapp
};
std::shared_ptr<ccf::UserRpcFrontend> get_rpc_handler(
NetworkTables& network, AbstractNotifier& notifier)
NetworkTables& network, ccfapp::AbstractNodeContext& context)
{
return make_shared<JS>(network);
}

Просмотреть файл

@ -29,6 +29,7 @@ namespace loggingapp
static constexpr auto LOG_RECORD_PREFIX_CERT = "LOG_record_prefix_cert";
static constexpr auto LOG_RECORD_ANONYMOUS_CALLER = "LOG_record_anonymous";
static constexpr auto LOG_RECORD_RAW_TEXT = "LOG_record_raw_text";
static constexpr auto LOG_GET_HISTORICAL = "LOG_get_historical";
};
// SNIPPET: table_definition
@ -70,7 +71,8 @@ namespace loggingapp
public:
// SNIPPET_START: constructor
LoggerHandlers(ccf::NetworkTables& nwt, ccf::AbstractNotifier& notifier) :
LoggerHandlers(
ccf::NetworkTables& nwt, ccfapp::AbstractNodeContext& context) :
UserHandlerRegistry(nwt),
records(
nwt.tables->create<Table>("records", kv::SecurityDomain::PRIVATE)),
@ -257,6 +259,80 @@ namespace loggingapp
};
// SNIPPET_END: log_record_text
auto& historical_state = context.get_historical_state();
auto get_historical = [this, &historical_state](ccf::RequestArgs& args) {
const auto params =
nlohmann::json::parse(args.rpc_ctx->get_request_body());
const auto in = params.get<LoggingGetHistorical::In>();
// Check that the requested transaction ID is committed
{
const auto tx_view = consensus->get_view(in.seqno);
const auto committed_seqno = consensus->get_committed_seqno();
const auto committed_view = consensus->get_view(committed_seqno);
const auto tx_status = ccf::get_tx_status(
in.view, in.seqno, tx_view, committed_view, committed_seqno);
if (tx_status != ccf::TxStatus::Committed)
{
args.rpc_ctx->set_response_status(HTTP_STATUS_BAD_REQUEST);
args.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE,
http::headervalues::contenttype::TEXT);
args.rpc_ctx->set_response_body(fmt::format(
"Only committed transactions can be retrieved historically. "
"Transaction {}.{} is {}",
in.view,
in.seqno,
ccf::tx_status_to_str(tx_status)));
return;
}
}
auto historical_store = historical_state.get_store_at(in.seqno);
if (historical_store == nullptr)
{
args.rpc_ctx->set_response_status(HTTP_STATUS_ACCEPTED);
static constexpr size_t retry_after_seconds = 3;
args.rpc_ctx->set_response_header(
http::headers::RETRY_AFTER, retry_after_seconds);
args.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::TEXT);
args.rpc_ctx->set_response_body(fmt::format(
"Historical transaction {}.{} is not currently available.",
in.view,
in.seqno));
return;
}
auto* historical_map = historical_store->get(records);
if (historical_map == nullptr)
{
args.rpc_ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
args.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::TEXT);
args.rpc_ctx->set_response_body(fmt::format(
"Unable to get historical table '{}'.", records.get_name()));
return;
}
kv::Tx historical_tx;
auto view = historical_tx.get_view(*historical_map);
auto v = view->get(in.id);
if (v.has_value())
{
args.rpc_ctx->set_response_status(HTTP_STATUS_OK);
args.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::TEXT);
args.rpc_ctx->set_response_body(std::move(v.value()));
}
else
{
args.rpc_ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
}
};
install(Procs::LOG_RECORD, ccf::json_adapter(record), Write)
.set_auto_schema<LoggingRecord::In, bool>();
// SNIPPET_START: install_get
@ -282,7 +358,9 @@ namespace loggingapp
.set_auto_schema<LoggingRecord::In, bool>()
.set_require_client_identity(false);
install(Procs::LOG_RECORD_RAW_TEXT, log_record_text, Write);
install(Procs::LOG_GET_HISTORICAL, get_historical, Read);
auto& notifier = context.get_notifier();
nwt.signatures.set_global_hook(
[this,
&notifier](kv::Version version, const ccf::Signatures::Write& w) {
@ -302,9 +380,9 @@ namespace loggingapp
LoggerHandlers logger_handlers;
public:
Logger(ccf::NetworkTables& network, ccf::AbstractNotifier& notifier) :
Logger(ccf::NetworkTables& network, ccfapp::AbstractNodeContext& context) :
ccf::UserRpcFrontend(*network.tables, logger_handlers),
logger_handlers(network, notifier)
logger_handlers(network, context)
{}
};
}
@ -313,9 +391,9 @@ namespace ccfapp
{
// SNIPPET_START: rpc_handler
std::shared_ptr<ccf::UserRpcFrontend> get_rpc_handler(
ccf::NetworkTables& nwt, ccf::AbstractNotifier& notifier)
ccf::NetworkTables& nwt, ccfapp::AbstractNodeContext& context)
{
return make_shared<loggingapp::Logger>(nwt, notifier);
return make_shared<loggingapp::Logger>(nwt, context);
}
// SNIPPET_END: rpc_handler
}

Просмотреть файл

@ -39,6 +39,19 @@ namespace loggingapp
DECLARE_JSON_REQUIRED_FIELDS(LoggingGet::Out, msg);
// SNIPPET_END: macro_validation_macros
struct LoggingGetHistorical
{
struct In
{
size_t view;
size_t seqno;
size_t id;
};
};
DECLARE_JSON_TYPE(LoggingGetHistorical::In);
DECLARE_JSON_REQUIRED_FIELDS(LoggingGetHistorical::In, view, seqno, id);
// Public record/get
// Manual schemas, verified then parsed in handler
static const std::string j_record_public_in = R"!!!(

Просмотреть файл

@ -208,7 +208,7 @@ namespace ccfapp
};
std::shared_ptr<ccf::UserRpcFrontend> get_rpc_handler(
NetworkTables& network, AbstractNotifier& notifier)
NetworkTables& network, ccfapp::AbstractNodeContext& context)
{
return std::make_shared<Lua>(network);
}

Просмотреть файл

@ -101,6 +101,22 @@ void set_lua_logger()
logger::config::loggers().emplace_back(std::make_unique<LuaLogger>());
}
struct NodeContext : public ccfapp::AbstractNodeContext
{
StubNotifier notifier;
ccf::historical::StubStateCache historical_state;
AbstractNotifier& get_notifier() override
{
return notifier;
}
ccf::historical::AbstractStateCache& get_historical_state() override
{
return historical_state;
}
};
auto user_caller = kp -> self_sign("CN=name");
auto user_caller_der = tls::make_verifier(user_caller) -> der_cert_data();
std::vector<uint8_t> dummy_key_share = {1, 2, 3};
@ -108,7 +124,7 @@ std::vector<uint8_t> dummy_key_share = {1, 2, 3};
auto init_frontend(
NetworkTables& network,
GenesisGenerator& gen,
StubNotifier& notifier,
ccfapp::AbstractNodeContext& context,
const int n_users,
const int n_members)
{
@ -136,7 +152,7 @@ auto init_frontend(
gen.set_app_scripts(lua::Interpreter().invoke<nlohmann::json>(env_script));
gen.finalize();
return get_rpc_handler(network, notifier);
return get_rpc_handler(network, context);
}
void set_handler(NetworkTables& network, const string& method, const Script& h)
@ -183,9 +199,9 @@ TEST_CASE("simple lua apps")
GenesisGenerator gen(network, gen_tx);
gen.init_values();
gen.create_service({});
StubNotifier notifier;
NodeContext context;
// create network with 1 user and 3 active members
auto frontend = init_frontend(network, gen, notifier, 1, 3);
auto frontend = init_frontend(network, gen, context, 1, 3);
set_lua_logger();
auto user_session = std::make_shared<enclave::SessionContext>(
enclave::InvalidSessionId, user_caller_der);
@ -320,9 +336,9 @@ TEST_CASE("simple bank")
GenesisGenerator gen(network, gen_tx);
gen.init_values();
gen.create_service({});
StubNotifier notifier;
NodeContext context;
// create network with 1 user and 3 active members
auto frontend = init_frontend(network, gen, notifier, 1, 3);
auto frontend = init_frontend(network, gen, context, 1, 3);
set_lua_logger();
auto user_session = std::make_shared<enclave::SessionContext>(
enclave::InvalidSessionId, user_caller_der);
@ -437,9 +453,9 @@ TEST_CASE("pre-populated environment")
GenesisGenerator gen(network, gen_tx);
gen.init_values();
gen.create_service({});
StubNotifier notifier;
NodeContext context;
// create network with 1 user and 3 active members
auto frontend = init_frontend(network, gen, notifier, 1, 3);
auto frontend = init_frontend(network, gen, context, 1, 3);
set_lua_logger();
auto user_session = std::make_shared<enclave::SessionContext>(
enclave::InvalidSessionId, user_caller_der);

Просмотреть файл

@ -7,30 +7,44 @@
namespace consensus
{
using Index = uint64_t;
enum LedgerRequestPurpose : uint8_t
{
Recovery,
HistoricalQuery,
};
/// Consensus-related ringbuffer messages
enum : ringbuffer::Message
{
/// Request individual log entries. Enclave -> Host
/// Request individual ledger entries. Enclave -> Host
DEFINE_RINGBUFFER_MSG_TYPE(ledger_get),
///@{
/// Respond to log_get. Host -> Enclave
/// Respond to ledger_get. Host -> Enclave
DEFINE_RINGBUFFER_MSG_TYPE(ledger_entry),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_no_entry),
///@}
///@{
/// Modify the local log. Enclave -> Host
/// Modify the local ledger. Enclave -> Host
DEFINE_RINGBUFFER_MSG_TYPE(ledger_append),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_truncate),
///@}
};
}
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_get, consensus::Index);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_entry, std::vector<uint8_t>);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_no_entry);
consensus::ledger_get, consensus::Index, consensus::LedgerRequestPurpose);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_entry,
consensus::Index,
consensus::LedgerRequestPurpose,
std::vector<uint8_t>);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_no_entry,
consensus::Index,
consensus::LedgerRequestPurpose);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_append, std::vector<uint8_t>);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(

Просмотреть файл

@ -211,11 +211,11 @@ void populate_entries(
while (true)
{
auto ret = consensus->pop_oldest_data();
if (!ret.second)
if (!ret.has_value())
{
break;
}
entries.emplace_back(ret.first);
entries.emplace_back(ret.value());
}
}
@ -637,8 +637,8 @@ TEST_CASE("Verify prepare proof")
prepare_node_info.general_info.principal_info[0]);
auto ret = consensus->pop_oldest_data();
REQUIRE(ret.second); // deserialized OK
auto second_pre_prepare = deserialize_pre_prepare(ret.first, pbft_state);
REQUIRE(ret.has_value());
auto second_pre_prepare = deserialize_pre_prepare(ret.value(), pbft_state);
// validate the signature in the proof here
Prepared_cert new_node_prepared_cert;

Просмотреть файл

@ -2,19 +2,29 @@
// Licensed under the Apache 2.0 License.
#pragma once
#include "node/historical_queries_interface.h"
#include "node/rpc/notifier_interface.h"
#include "node/rpc/user_frontend.h"
namespace ccfapp
{
struct AbstractNodeContext
{
virtual ~AbstractNodeContext() = default;
virtual ccf::AbstractNotifier& get_notifier() = 0;
virtual ccf::historical::AbstractStateCache& get_historical_state() = 0;
};
// SNIPPET_START: rpc_handler
/** To be implemented by the application to be registered by CCF.
*
* @param network Access to the network's tables
* @param notifier Access to host notification service
* @param network Access to the network's replicated tables
* @param context Access to node and host services
*
* @return Shared pointer to the application handler instance
*/
std::shared_ptr<ccf::UserRpcFrontend> get_rpc_handler(
ccf::NetworkTables& network, ccf::AbstractNotifier& notifier);
ccf::NetworkTables& network, AbstractNodeContext& context);
// SNIPPET_END: rpc_handler
}

Просмотреть файл

@ -8,6 +8,7 @@
#include "enclave_time.h"
#include "interface.h"
#include "node/entities.h"
#include "node/historical_queries.h"
#include "node/network_state.h"
#include "node/node_state.h"
#include "node/nodetypes.h"
@ -29,7 +30,6 @@ namespace enclave
ccf::NetworkState network;
ccf::ShareManager share_manager;
std::shared_ptr<ccf::NodeToNode> n2n_channels;
ccf::Notifier notifier;
ccf::Timers timers;
std::shared_ptr<RPCMap> rpc_map;
std::shared_ptr<RPCSessions> rpcsessions;
@ -40,6 +40,27 @@ namespace enclave
StartType start_type;
ConsensusType consensus_type;
struct NodeContext : public ccfapp::AbstractNodeContext
{
ccf::Notifier notifier;
ccf::historical::StateCache historical_state_cache;
NodeContext(ccf::Notifier&& n, ccf::historical::StateCache&& hsc) :
notifier(std::move(n)),
historical_state_cache(std::move(hsc))
{}
ccf::AbstractNotifier& get_notifier() override
{
return notifier;
}
ccf::historical::AbstractStateCache& get_historical_state() override
{
return historical_state_cache;
}
} context;
public:
Enclave(
EnclaveConfig* enclave_config,
@ -51,15 +72,23 @@ namespace enclave
writer_factory(basic_writer_factory, enclave_config->writer_config),
network(consensus_type_),
n2n_channels(std::make_shared<ccf::NodeToNode>(writer_factory)),
notifier(writer_factory),
rpc_map(std::make_shared<RPCMap>()),
rpcsessions(std::make_shared<RPCSessions>(writer_factory, rpc_map)),
share_manager(network),
node(
writer_factory, network, rpcsessions, notifier, timers, share_manager),
writer_factory,
network,
rpcsessions,
context.notifier,
timers,
share_manager),
cmd_forwarder(std::make_shared<ccf::Forwarder<ccf::NodeToNode>>(
rpcsessions, n2n_channels, rpc_map)),
consensus_type(consensus_type_)
consensus_type(consensus_type_),
context(
ccf::Notifier(writer_factory),
ccf::historical::StateCache(
*network.tables, writer_factory.create_writer_to_outside()))
{
logger::config::msg() = AdminMessage::log_msg;
logger::config::writer() = writer_factory.create_writer_to_outside();
@ -70,7 +99,7 @@ namespace enclave
std::make_unique<ccf::MemberRpcFrontend>(network, node, share_manager));
REGISTER_FRONTEND(
rpc_map, users, ccfapp::get_rpc_handler(network, notifier));
rpc_map, users, ccfapp::get_rpc_handler(network, context));
REGISTER_FRONTEND(
rpc_map, nodes, std::make_unique<ccf::NodeRpcFrontend>(network, node));
@ -221,22 +250,55 @@ namespace enclave
bp,
consensus::ledger_entry,
[this](const uint8_t* data, size_t size) {
auto [body] =
const auto [index, purpose, body] =
ringbuffer::read_message<consensus::ledger_entry>(data, size);
if (node.is_reading_public_ledger())
node.recover_public_ledger_entry(body);
else if (node.is_reading_private_ledger())
node.recover_private_ledger_entry(body);
else
LOG_FAIL_FMT("Cannot recover ledger entry: Unexpected state");
switch (purpose)
{
case consensus::LedgerRequestPurpose::Recovery:
{
if (node.is_reading_public_ledger())
node.recover_public_ledger_entry(body);
else if (node.is_reading_private_ledger())
node.recover_private_ledger_entry(body);
else
LOG_FAIL_FMT("Cannot recover ledger entry: Unexpected state");
break;
}
case consensus::LedgerRequestPurpose::HistoricalQuery:
{
context.historical_state_cache.handle_ledger_entry(index, body);
break;
}
default:
{
LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
}
}
});
DISPATCHER_SET_MESSAGE_HANDLER(
bp,
consensus::ledger_no_entry,
[this](const uint8_t* data, size_t size) {
ringbuffer::read_message<consensus::ledger_no_entry>(data, size);
node.recover_ledger_end();
const auto [index, purpose] =
ringbuffer::read_message<consensus::ledger_no_entry>(data, size);
switch (purpose)
{
case consensus::LedgerRequestPurpose::Recovery:
{
node.recover_ledger_end();
break;
}
case consensus::LedgerRequestPurpose::HistoricalQuery:
{
context.historical_state_cache.handle_no_entry(index);
break;
}
default:
{
LOG_FAIL_FMT("Unhandled purpose: {}", purpose);
}
}
});
rpcsessions->register_message_handlers(bp.get_dispatcher());

Просмотреть файл

@ -209,7 +209,7 @@ namespace asynchost
DISPATCHER_SET_MESSAGE_HANDLER(
disp, consensus::ledger_get, [&](const uint8_t* data, size_t size) {
// The enclave has asked for a ledger entry.
auto [idx] =
auto [idx, purpose] =
ringbuffer::read_message<consensus::ledger_get>(data, size);
auto& entry = read_entry(idx);
@ -217,11 +217,12 @@ namespace asynchost
if (entry.size() > 0)
{
RINGBUFFER_WRITE_MESSAGE(
consensus::ledger_entry, to_enclave, entry);
consensus::ledger_entry, to_enclave, idx, purpose, entry);
}
else
{
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_no_entry, to_enclave);
RINGBUFFER_WRITE_MESSAGE(
consensus::ledger_no_entry, to_enclave, idx, purpose);
}
});
}

Просмотреть файл

@ -13,6 +13,7 @@ namespace http
static constexpr auto CONTENT_TYPE = "content-type";
static constexpr auto DIGEST = "digest";
static constexpr auto LOCATION = "location";
static constexpr auto RETRY_AFTER = "retry-after";
static constexpr auto WWW_AUTHENTICATE = "www-authenticate";
static constexpr auto CCF_TX_SEQNO = "x-ccf-tx-seqno";

Просмотреть файл

@ -33,6 +33,13 @@ namespace kv
kv::ReplicateType replicate_type = kv::ReplicateType::ALL;
std::unordered_set<std::string> replicated_tables;
// Generally we will only accept deserialised views if they are contiguous -
// at Version N we reject everything but N+1. The exception is when a Store
// is used for historical queries, where it may deserialise arbitrary
// transactions. In this case the Store is a useful container for a set of
// Tables, but its versioning invariants are ignored.
const bool strict_versions = true;
DeserialiseSuccess commit_deserialised(OrderedViews& views, Version& v)
{
auto c = apply_views(views, [v]() { return v; });
@ -63,7 +70,7 @@ namespace kv
}
}
Store() {}
Store(bool strict_versions_ = true) : strict_versions(strict_versions_) {}
Store(
const ReplicateType& replicate_type_,
@ -107,7 +114,7 @@ namespace kv
}
template <class K, class V>
Map<K, V>* get(std::string name)
Map<K, V>* get(const std::string& name)
{
return get<Map<K, V>>(name);
}
@ -119,7 +126,7 @@ namespace kv
* @return Map
*/
template <class M>
M* get(std::string name)
M* get(const std::string& name)
{
std::lock_guard<SpinLock> mguard(maps_lock);
@ -137,6 +144,21 @@ namespace kv
return nullptr;
}
/** Get Map by type and name
*
* Using type and name of other Map, retrieve the equivalent Map from this
* Store
*
* @param other Other map
*
* @return Map
*/
template <class M>
M* get(const M& other)
{
return get<M>(other.get_name());
}
/** Create a Map
*
* Note this call will throw a logic_error if a map by that name already
@ -302,13 +324,16 @@ namespace kv
// consensus.
rollback(v - 1);
// Make sure this is the next transaction.
auto cv = current_version();
if (cv != (v - 1))
if (strict_versions)
{
LOG_FAIL_FMT(
"Tried to deserialise {} but current_version is {}", v, cv);
return DeserialiseSuccess::FAILED;
// Make sure this is the next transaction.
auto cv = current_version();
if (cv != (v - 1))
{
LOG_FAIL_FMT(
"Tried to deserialise {} but current_version is {}", v, cv);
return DeserialiseSuccess::FAILED;
}
}
// Deserialised transactions express read dependencies as versions,
@ -377,31 +402,35 @@ namespace kv
{
return success;
}
auto h = get_history();
if (h)
{
auto search = views.find("ccf.signatures");
if (search != views.end())
{
// Transactions containing a signature must only contain
// a signature and must be verified
if (views.size() > 1)
{
LOG_FAIL_FMT("Failed to deserialize");
LOG_DEBUG_FMT(
"Unexpected contents in signature transaction {}", v);
return DeserialiseSuccess::FAILED;
}
auto h = get_history();
auto search = views.find("ccf.signatures");
if (search != views.end())
{
// Transactions containing a signature must only contain
// a signature and must be verified
if (views.size() > 1)
{
LOG_FAIL_FMT("Failed to deserialize");
LOG_DEBUG_FMT("Unexpected contents in signature transaction {}", v);
return DeserialiseSuccess::FAILED;
}
if (h)
{
if (!h->verify(term))
{
LOG_FAIL_FMT("Failed to deserialize");
LOG_DEBUG_FMT("Signature in transaction {} failed to verify", v);
return DeserialiseSuccess::FAILED;
}
success = DeserialiseSuccess::PASS_SIGNATURE;
}
success = DeserialiseSuccess::PASS_SIGNATURE;
}
if (h)
{
h->append(data.data(), data.size());
}
}

Просмотреть файл

@ -112,7 +112,7 @@ static void deserialise(picobench::state& s)
tx.commit();
s.start_timer();
auto rc = kv_store2.deserialise(consensus->get_latest_data().first);
auto rc = kv_store2.deserialise(consensus->get_latest_data().value());
if (rc != kv::DeserialiseSuccess::PASS)
throw std::logic_error(
"Transaction deserialisation failed: " + std::to_string(rc));

Просмотреть файл

@ -49,10 +49,11 @@ TEST_CASE(
INFO("Deserialise transaction in target store");
{
REQUIRE(consensus->get_latest_data().second);
REQUIRE(!consensus->get_latest_data().first.empty());
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(!latest_data.value().empty());
REQUIRE(
kv_store_target.deserialise(consensus->get_latest_data().first) ==
kv_store_target.deserialise(latest_data.value()) ==
kv::DeserialiseSuccess::PASS);
kv::Tx tx_target;
@ -101,8 +102,10 @@ TEST_CASE(
INFO("Deserialise transaction in target store");
{
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(
kv_store_target.deserialise(consensus->get_latest_data().first) ==
kv_store_target.deserialise(latest_data.value()) ==
kv::DeserialiseSuccess::PASS);
kv::Tx tx_target;
@ -145,8 +148,10 @@ TEST_CASE(
INFO("Deserialise transaction in target store");
{
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(
kv_store_target.deserialise(consensus->get_latest_data().first) !=
kv_store_target.deserialise(latest_data.value()) !=
kv::DeserialiseSuccess::FAILED);
kv::Tx tx;
@ -180,8 +185,10 @@ TEST_CASE(
view_priv->put("privk1", "privv1");
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(
kv_store_target.deserialise(consensus->get_latest_data().first) !=
kv_store_target.deserialise(latest_data.value()) !=
kv::DeserialiseSuccess::FAILED);
kv::Tx tx_target;
@ -201,8 +208,10 @@ TEST_CASE(
auto view_priv2 = tx2.get_view(priv_map);
REQUIRE(view_priv2->get("privk1").has_value() == false);
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(
kv_store_target.deserialise(consensus->get_latest_data().first) !=
kv_store_target.deserialise(latest_data.value()) !=
kv::DeserialiseSuccess::FAILED);
kv::Tx tx_target;
@ -412,12 +421,13 @@ TEST_CASE("Integrity" * doctest::test_suite("serialisation"))
auto rc = tx.commit();
// Tamper with serialised public data
auto serialised_tx = consensus->get_latest_data().first;
auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
std::vector<uint8_t> value_to_corrupt(pub_value.begin(), pub_value.end());
REQUIRE(corrupt_serialised_tx(serialised_tx, value_to_corrupt));
REQUIRE(corrupt_serialised_tx(latest_data.value(), value_to_corrupt));
REQUIRE(
kv_store_target.deserialise(serialised_tx) ==
kv_store_target.deserialise(latest_data.value()) ==
kv::DeserialiseSuccess::FAILED);
}
}
@ -442,9 +452,10 @@ TEST_CASE("nlohmann (de)serialisation" * doctest::test_suite("serialisation"))
tx.get_view(t)->put(k1, v1);
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(
s1.deserialise(consensus->get_latest_data().first) !=
kv::DeserialiseSuccess::FAILED);
s1.deserialise(latest_data.value()) != kv::DeserialiseSuccess::FAILED);
}
SUBCASE("nlohmann")
@ -460,9 +471,10 @@ TEST_CASE("nlohmann (de)serialisation" * doctest::test_suite("serialisation"))
tx.get_view(t)->put(k1, v1);
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
const auto latest_data = consensus->get_latest_data();
REQUIRE(latest_data.has_value());
REQUIRE(
s1.deserialise(consensus->get_latest_data().first) !=
kv::DeserialiseSuccess::FAILED);
s1.deserialise(latest_data.value()) != kv::DeserialiseSuccess::FAILED);
}
}

Просмотреть файл

@ -13,7 +13,7 @@ namespace kv
class StubConsensus : public Consensus
{
private:
std::vector<std::shared_ptr<std::vector<uint8_t>>> replica;
std::vector<kv::BatchVector::value_type> replica;
ConsensusType consensus_type;
public:
@ -25,32 +25,50 @@ namespace kv
bool replicate(const BatchVector& entries) override
{
for (auto&& [index, data, globally_committable] : entries)
for (const auto& entry : entries)
{
replica.push_back(data);
replica.push_back(entry);
}
return true;
}
std::pair<std::vector<uint8_t>, bool> get_latest_data()
{
if (!replica.empty())
return std::make_pair(*replica.back(), true);
else
return std::make_pair(std::vector<uint8_t>(), false);
}
std::pair<std::vector<uint8_t>, bool> pop_oldest_data()
std::optional<std::vector<uint8_t>> get_latest_data()
{
if (!replica.empty())
{
auto pair = std::make_pair(*replica.front(), true);
replica.erase(replica.begin());
return pair;
return *std::get<1>(replica.back());
}
else
{
return std::make_pair(std::vector<uint8_t>(), false);
return std::nullopt;
}
}
std::optional<std::vector<uint8_t>> pop_oldest_data()
{
if (!replica.empty())
{
auto data = *std::get<1>(replica.front());
replica.erase(replica.begin());
return data;
}
else
{
return std::nullopt;
}
}
std::optional<kv::BatchVector::value_type> pop_oldest_entry()
{
if (!replica.empty())
{
auto entry = replica.front();
replica.erase(replica.begin());
return entry;
}
else
{
return std::nullopt;
}
}

Просмотреть файл

@ -0,0 +1,345 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/ledger_enclave_types.h"
#include "kv/store.h"
#include "node/historical_queries_interface.h"
#include "node/history.h"
#include "node/rpc/node_interface.h"
#include <list>
#include <map>
#include <memory>
#include <set>
namespace ccf::historical
{
class StateCache : public AbstractStateCache
{
protected:
kv::Store& source_store;
ringbuffer::WriterPtr to_host;
enum class RequestStage
{
Fetching,
Untrusted,
Trusted,
};
using LedgerEntry = std::vector<uint8_t>;
struct Request
{
RequestStage current_stage = RequestStage::Fetching;
crypto::Sha256Hash entry_hash = {};
StorePtr store = nullptr;
};
// These constitute a simple LRU, where only user queries will refresh an
// entry's priority
static constexpr size_t MAX_ACTIVE_REQUESTS = 10;
std::map<consensus::Index, Request> requests;
std::list<consensus::Index> recent_requests;
// To trust an index, we currently need to fetch a sequence of entries
// around it - these aren't user requests, so we don't store them, but we do
// need to distinguish things-we-asked-for from junk-from-the-host
std::set<consensus::Index> pending_fetches;
void request_entry_at(consensus::Index idx)
{
// To avoid duplicates, remove index if it was already requested
recent_requests.remove(idx);
// Add request to front of list, most recently requested
recent_requests.emplace_front(idx);
// Cull old requests
while (recent_requests.size() > MAX_ACTIVE_REQUESTS)
{
const auto old_idx = recent_requests.back();
recent_requests.pop_back();
requests.erase(old_idx);
}
// Try to insert new request
const auto ib = requests.insert(std::make_pair(idx, Request{}));
if (ib.second)
{
// If its a new request, begin fetching it
fetch_entry_at(idx);
}
}
void fetch_entry_at(consensus::Index idx)
{
const auto it =
std::find(pending_fetches.begin(), pending_fetches.end(), idx);
if (it != pending_fetches.end())
{
// Already fetching this index
return;
}
RINGBUFFER_WRITE_MESSAGE(
consensus::ledger_get,
to_host,
idx,
consensus::LedgerRequestPurpose::HistoricalQuery);
pending_fetches.insert(idx);
}
std::optional<ccf::Signature> get_signature(const StorePtr& sig_store)
{
kv::Tx tx;
auto sig_table = sig_store->get<ccf::Signatures>(ccf::Tables::SIGNATURES);
if (sig_table == nullptr)
{
throw std::logic_error(
"Missing signatures table in signature transaction");
}
auto sig_view = tx.get_view(*sig_table);
return sig_view->get(0);
}
std::optional<ccf::NodeInfo> get_node_info(ccf::NodeId node_id)
{
// Current solution: Use current state of Nodes table from real store.
// This only works while entries are never deleted from this table, and
// makes no check that the signing node was active at the point it
// produced this signature
kv::Tx tx;
auto nodes_table = source_store.get<ccf::Nodes>(ccf::Tables::NODES);
if (nodes_table == nullptr)
{
throw std::logic_error("Missing nodes table");
}
auto nodes_view = tx.get_view(*nodes_table);
return nodes_view->get(node_id);
}
void handle_signature_transaction(
consensus::Index sig_idx, const StorePtr& sig_store)
{
const auto sig = get_signature(sig_store);
if (!sig.has_value())
{
throw std::logic_error(
"Missing signature value in signature transaction");
}
// Build tree from signature
ccf::MerkleTreeHistory tree(sig->tree);
const auto real_root = tree.get_root();
if (real_root != sig->root)
{
throw std::logic_error("Invalid signature: invalid root");
}
const auto node_info = get_node_info(sig->node);
if (!node_info.has_value())
{
throw std::logic_error(fmt::format(
"Signature {} claims it was produced by node {}: This node is "
"unknown",
sig_idx,
sig->node));
}
auto verifier = tls::make_verifier(node_info->cert);
const auto verified = verifier->verify_hash(
real_root.h.data(),
real_root.h.size(),
sig->sig.data(),
sig->sig.size());
if (!verified)
{
throw std::logic_error(
fmt::format("Signature at {} is invalid", sig_idx));
}
auto it = requests.begin();
while (it != requests.end())
{
auto& request = it->second;
if (request.current_stage == RequestStage::Untrusted)
{
const auto& untrusted_idx = it->first;
const auto& untrusted_hash = request.entry_hash;
const auto& untrusted_store = request.store;
// Use try-catch to find whether this signature covers the target
// transaction ID
ccf::Receipt receipt;
try
{
receipt = tree.get_receipt(untrusted_idx);
LOG_DEBUG_FMT(
"From signature at {}, constructed a receipt for {}",
sig_idx,
untrusted_idx);
}
catch (const std::exception& e)
{
// This signature doesn't cover this untrusted idx, try the next
// request
++it;
continue;
}
// This is where full verification should go, checking that the entry
// we have is in the signed merkle tree
// Move stores from untrusted to trusted
LOG_DEBUG_FMT(
"Now trusting {} due to signature at {}", untrusted_idx, sig_idx);
request.current_stage = RequestStage::Trusted;
++it;
}
else
{
// Already trusted or still fetching, skip it
++it;
}
}
}
void deserialise_ledger_entry(
consensus::Index idx, const LedgerEntry& entry)
{
StorePtr store = std::make_shared<kv::Store>(false);
store->set_encryptor(source_store.get_encryptor());
store->clone_schema(source_store);
const auto deserialise_result = store->deserialise_views(entry);
switch (deserialise_result)
{
case kv::DeserialiseSuccess::FAILED:
{
throw std::logic_error("Deserialise failed!");
break;
}
case kv::DeserialiseSuccess::PASS:
case kv::DeserialiseSuccess::PASS_SIGNATURE:
{
LOG_DEBUG_FMT("Processed transaction at {}", idx);
auto request_it = requests.find(idx);
if (request_it != requests.end())
{
auto& request = request_it->second;
if (request.current_stage == RequestStage::Fetching)
{
// We were looking for this entry. Store the produced store
request.current_stage = RequestStage::Untrusted;
request.entry_hash = crypto::Sha256Hash(entry);
request.store = store;
}
else
{
LOG_DEBUG_FMT(
"Not fetching ledger entry {}: already have it in stage {}",
request_it->first,
request.current_stage);
}
}
if (deserialise_result == kv::DeserialiseSuccess::PASS_SIGNATURE)
{
// This looks like a valid signature - try to use this signature to
// move some stores from untrusted to trusted
handle_signature_transaction(idx, store);
}
else
{
// This is not a signature - try the next transaction
fetch_entry_at(idx + 1);
}
break;
}
default:
{
throw std::logic_error("Unexpected deserialise result");
}
}
}
public:
StateCache(kv::Store& store, const ringbuffer::WriterPtr& host_writer) :
source_store(store),
to_host(host_writer)
{}
StorePtr get_store_at(consensus::Index idx) override
{
const auto it = requests.find(idx);
if (it == requests.end())
{
// Treat this as a hint and start fetching it
request_entry_at(idx);
return nullptr;
}
if (it->second.current_stage == RequestStage::Trusted)
{
// Have this store and trust it
return it->second.store;
}
// Still fetching this store or don't trust it yet
return nullptr;
}
bool handle_ledger_entry(consensus::Index idx, const LedgerEntry& data)
{
const auto it =
std::find(pending_fetches.begin(), pending_fetches.end(), idx);
if (it == pending_fetches.end())
{
// Unexpected entry - ignore it?
return false;
}
pending_fetches.erase(it);
try
{
deserialise_ledger_entry(idx, data);
}
catch (const std::exception& e)
{
LOG_FAIL_FMT("Unable to deserialise entry {}: {}", idx, e.what());
return false;
}
return true;
}
void handle_no_entry(consensus::Index idx)
{
const auto request_it = requests.find(idx);
if (request_it != requests.end())
{
if (request_it->second.current_stage == RequestStage::Fetching)
{
requests.erase(request_it);
}
}
// The host failed or refused to give this entry. Currently just forget
// about it - don't have a mechanism for remembering this failure and
// reporting it to users.
pending_fetches.erase(idx);
}
};
}

Просмотреть файл

@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/ledger_enclave_types.h"
#include "kv/store.h"
#include <memory>
namespace ccf::historical
{
using StorePtr = std::shared_ptr<kv::Store>;
class AbstractStateCache
{
public:
virtual ~AbstractStateCache() = default;
virtual StorePtr get_store_at(consensus::Index idx) = 0;
};
class StubStateCache : public AbstractStateCache
{
public:
StorePtr get_store_at(consensus::Index idx) override
{
return nullptr;
}
};
}

Просмотреть файл

@ -203,12 +203,28 @@ namespace ccf
uint64_t index;
uint32_t max_index;
crypto::Sha256Hash root;
hash_vec* path;
struct Path
{
hash_vec* raw;
Path()
{
raw = init_path();
}
~Path()
{
free_path(raw);
}
};
std::shared_ptr<Path> path;
public:
Receipt()
{
path = init_path();
path = std::make_shared<Path>();
}
static Receipt from_v(const std::vector<uint8_t>& v)
@ -223,7 +239,7 @@ namespace ccf
s -= r.root.h.size();
for (size_t i = 0; i < s; i += r.root.SIZE)
{
path_insert(r.path, const_cast<uint8_t*>(buf + i));
path_insert(r.path->raw, const_cast<uint8_t*>(buf + i));
}
return r;
}
@ -231,44 +247,40 @@ namespace ccf
Receipt(merkle_tree* tree, uint64_t index_)
{
index = index_;
path = init_path();
path = std::make_shared<Path>();
if (!mt_get_path_pre(tree, index, path, root.h.data()))
if (!mt_get_path_pre(tree, index, path->raw, root.h.data()))
{
free_path(path);
throw std::logic_error("Precondition to mt_get_path violated");
}
max_index = mt_get_path(tree, index, path, root.h.data());
max_index = mt_get_path(tree, index, path->raw, root.h.data());
}
bool verify(merkle_tree* tree) const
{
if (!mt_verify_pre(tree, index, max_index, path, (uint8_t*)root.h.data()))
if (!mt_verify_pre(
tree, index, max_index, path->raw, (uint8_t*)root.h.data()))
{
throw std::logic_error("Precondition to mt_verify violated");
}
return mt_verify(tree, index, max_index, path, (uint8_t*)root.h.data());
}
~Receipt()
{
free_path(path);
return mt_verify(
tree, index, max_index, path->raw, (uint8_t*)root.h.data());
}
std::vector<uint8_t> to_v() const
{
size_t vs = sizeof(index) + sizeof(max_index) + root.h.size() +
(root.h.size() * path->sz);
(root.h.size() * path->raw->sz);
std::vector<uint8_t> v(vs);
uint8_t* buf = v.data();
serialized::write(buf, vs, index);
serialized::write(buf, vs, max_index);
serialized::write(buf, vs, root.h.data(), root.h.size());
for (size_t i = 0; i < path->sz; ++i)
for (size_t i = 0; i < path->raw->sz; ++i)
{
serialized::write(buf, vs, *(path->vs + i), root.h.size());
serialized::write(buf, vs, *(path->raw->vs + i), root.h.size());
}
return v;
}
@ -298,9 +310,9 @@ namespace ccf
mt_free(tree);
}
void append(const crypto::Sha256Hash& hash)
void append(crypto::Sha256Hash& hash)
{
uint8_t* h = const_cast<uint8_t*>(hash.h.data());
uint8_t* h = hash.h.data();
if (!mt_insert_pre(tree, h))
{
throw std::logic_error("Precondition to mt_insert violated");

Просмотреть файл

@ -1666,7 +1666,11 @@ namespace ccf
void read_ledger_idx(consensus::Index idx)
{
RINGBUFFER_WRITE_MESSAGE(consensus::ledger_get, to_host, idx);
RINGBUFFER_WRITE_MESSAGE(
consensus::ledger_get,
to_host,
idx,
consensus::LedgerRequestPurpose::Recovery);
}
void ledger_truncate(consensus::Index idx)

Просмотреть файл

@ -11,7 +11,6 @@
#include "forwarder.h"
#include "node/client_signatures.h"
#include "node/nodes.h"
#include "notifier_interface.h"
#include "rpc_exception.h"
#include "tls/verifier.h"

Просмотреть файл

@ -3,6 +3,7 @@
#pragma once
#include "node/rpc/node_interface.h"
#include "node/rpc/notifier_interface.h"
#include "node/share_manager.h"
namespace ccf

Просмотреть файл

@ -14,12 +14,39 @@ namespace ccf
Invalid,
};
constexpr char const* tx_status_to_str(TxStatus status)
{
switch (status)
{
case TxStatus::Unknown:
{
return "UNKNOWN";
}
case TxStatus::Pending:
{
return "PENDING";
}
case TxStatus::Committed:
{
return "COMMITTED";
}
case TxStatus::Invalid:
{
return "INVALID";
}
default:
{
return "Unhandled value";
}
}
}
DECLARE_JSON_ENUM(
TxStatus,
{{TxStatus::Unknown, "UNKNOWN"},
{TxStatus::Pending, "PENDING"},
{TxStatus::Committed, "COMMITTED"},
{TxStatus::Invalid, "INVALID"}});
{{TxStatus::Unknown, tx_status_to_str(TxStatus::Unknown)},
{TxStatus::Pending, tx_status_to_str(TxStatus::Pending)},
{TxStatus::Committed, tx_status_to_str(TxStatus::Committed)},
{TxStatus::Invalid, tx_status_to_str(TxStatus::Invalid)}});
constexpr size_t VIEW_UNKNOWN = 0;
@ -75,7 +102,7 @@ namespace ccf
// this node believes locally that this tx id is impossible, but does not
// have a global commit to back this up - it will eventually receive
// either a global commit confirming this belief, or an election and
// global commit where this tx id is valid
// global commit making this tx id invalid
return TxStatus::Unknown;
}
}

Просмотреть файл

@ -0,0 +1,284 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "node/historical_queries.h"
#include "ds/messaging.h"
#include "kv/test/null_encryptor.h"
#include "kv/test/stub_consensus.h"
#include "node/history.h"
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest/doctest.h>
struct StubWriter : public ringbuffer::AbstractWriter
{
public:
struct Write
{
ringbuffer::Message m;
bool finished;
std::vector<uint8_t> contents;
};
std::vector<Write> writes;
Write& get_write(const WriteMarker& marker)
{
REQUIRE(marker.has_value());
REQUIRE(marker.value() < writes.size());
return writes[marker.value()];
}
Write& get_last_message()
{
REQUIRE(writes.size() > 0);
auto& write = writes.back();
REQUIRE(write.finished);
return write;
}
WriteMarker prepare(
ringbuffer::Message m,
size_t size,
bool wait = true,
size_t* identifier = nullptr) override
{
const auto index = writes.size();
writes.push_back(Write{m, false, {}});
return index;
}
void finish(const WriteMarker& marker) override
{
get_write(marker).finished = true;
}
WriteMarker write_bytes(
const WriteMarker& marker, const uint8_t* bytes, size_t size) override
{
auto& write = get_write(marker);
write.contents.insert(write.contents.end(), bytes, bytes + size);
return marker;
}
};
TEST_CASE("StateCache")
{
auto encryptor = std::make_shared<kv::NullTxEncryptor>();
auto consensus = std::make_shared<kv::StubConsensus>();
kv::Store store(consensus);
store.set_encryptor(encryptor);
// Make history to produce signatures
auto& signatures = store.create<ccf::Signatures>(
ccf::Tables::SIGNATURES, kv::SecurityDomain::PUBLIC);
auto& nodes =
store.create<ccf::Nodes>(ccf::Tables::NODES, kv::SecurityDomain::PUBLIC);
const auto node_id = 0;
auto kp = tls::make_key_pair();
auto history = std::make_shared<ccf::MerkleTxHistory>(
store, node_id, *kp, signatures, nodes);
store.set_history(history);
using NumToString = kv::Map<size_t, std::string>;
constexpr size_t low_signature_transaction = 3;
constexpr size_t high_signature_transaction = 100;
constexpr size_t low_index = low_signature_transaction + 2;
constexpr size_t high_index = high_signature_transaction - 3;
constexpr size_t unsigned_index = high_signature_transaction + 5;
{
INFO("Build some interesting state in the store");
{
INFO("Store the signing node's key");
kv::Tx tx;
auto view = tx.get_view(nodes);
ccf::NodeInfo ni;
ni.cert = kp->self_sign("CN=Test node");
ni.status = ccf::NodeStatus::TRUSTED;
view->put(node_id, ni);
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
}
auto& public_table =
store.create<NumToString>("public", kv::SecurityDomain::PUBLIC);
auto& private_table =
store.create<NumToString>("private", kv::SecurityDomain::PRIVATE);
{
for (size_t i = 1; i < high_signature_transaction; ++i)
{
if (
i == low_signature_transaction - 1 ||
i == high_signature_transaction - 1)
{
history->emit_signature();
store.compact(store.current_version());
}
else
{
kv::Tx tx;
auto [public_view, private_view] =
tx.get_view(public_table, private_table);
const auto s = std::to_string(i);
public_view->put(i, s);
private_view->put(i, s);
REQUIRE(tx.commit() == kv::CommitSuccess::OK);
}
}
}
REQUIRE(store.current_version() == high_signature_transaction);
}
std::map<consensus::Index, std::vector<uint8_t>> ledger;
{
INFO("Rebuild ledger as seen by host");
auto next_ledger_entry = consensus->pop_oldest_entry();
while (next_ledger_entry.has_value())
{
const auto ib = ledger.insert(std::make_pair(
std::get<0>(next_ledger_entry.value()),
*std::get<1>(next_ledger_entry.value())));
REQUIRE(ib.second);
next_ledger_entry = consensus->pop_oldest_entry();
}
REQUIRE(ledger.size() == high_signature_transaction);
}
// Now we actually get to the historical queries
std::vector<consensus::Index> requested_ledger_entries = {};
messaging::BufferProcessor bp("historical_queries");
DISPATCHER_SET_MESSAGE_HANDLER(
bp,
consensus::ledger_get,
[&requested_ledger_entries](const uint8_t* data, size_t size) {
auto [idx, purpose] =
ringbuffer::read_message<consensus::ledger_get>(data, size);
REQUIRE(purpose == consensus::LedgerRequestPurpose::HistoricalQuery);
requested_ledger_entries.push_back(idx);
});
constexpr size_t buffer_size = 1 << 12;
ringbuffer::Reader rr(buffer_size);
auto rw = std::make_shared<ringbuffer::Writer>(rr);
ccf::historical::StateCache cache(store, rw);
{
INFO(
"Initially, no stores are available, even if they're requested multiple "
"times");
REQUIRE(cache.get_store_at(low_index) == nullptr);
REQUIRE(cache.get_store_at(low_index) == nullptr);
REQUIRE(cache.get_store_at(high_index) == nullptr);
REQUIRE(cache.get_store_at(low_index) == nullptr);
REQUIRE(cache.get_store_at(unsigned_index) == nullptr);
REQUIRE(cache.get_store_at(high_index) == nullptr);
REQUIRE(cache.get_store_at(low_index) == nullptr);
}
{
INFO("The host sees one request for each index");
const auto read = bp.read_n(100, rr);
REQUIRE(read == 3);
REQUIRE(requested_ledger_entries.size() == 3);
REQUIRE(
requested_ledger_entries ==
std::vector<consensus::Index>{low_index, high_index, unsigned_index});
}
auto provide_ledger_entry = [&](size_t i) {
bool accepted = cache.handle_ledger_entry(i, ledger.at(i));
// Pump outbound ringbuffer to clear messages
bp.read_n(100, rr);
return accepted;
};
{
INFO("Cache doesn't accept arbitrary entries");
REQUIRE(!provide_ledger_entry(high_index - 1));
REQUIRE(!provide_ledger_entry(high_index + 1));
}
{
INFO(
"Cache accepts requested entries, and then range of supporting entries");
REQUIRE(provide_ledger_entry(high_index));
// Count up to next signature
for (size_t i = high_index + 1; i < high_signature_transaction; ++i)
{
REQUIRE(provide_ledger_entry(i));
REQUIRE(cache.get_store_at(high_index) == nullptr);
}
REQUIRE(provide_ledger_entry(high_signature_transaction));
REQUIRE(cache.get_store_at(high_index) != nullptr);
}
{
INFO("Historical state can be retrieved from provided entries");
auto store_at_index = cache.get_store_at(high_index);
REQUIRE(store_at_index != nullptr);
{
auto& public_table = *store_at_index->get<NumToString>("public");
auto& private_table = *store_at_index->get<NumToString>("private");
kv::Tx tx;
auto [public_view, private_view] =
tx.get_view(public_table, private_table);
const auto k = high_index - 1;
const auto v = std::to_string(k);
auto public_v = public_view->get(k);
REQUIRE(public_v.has_value());
REQUIRE(*public_v == v);
auto private_v = private_view->get(k);
REQUIRE(private_v.has_value());
REQUIRE(*private_v == v);
size_t public_count = 0;
public_view->foreach([&public_count](const auto& k, const auto& v) {
REQUIRE(public_count++ == 0);
return true;
});
size_t private_count = 0;
private_view->foreach([&private_count](const auto& k, const auto& v) {
REQUIRE(private_count++ == 0);
return true;
});
}
}
{
INFO("Cache doesn't throw when given junk");
REQUIRE(cache.get_store_at(unsigned_index) == nullptr);
bool result;
REQUIRE_NOTHROW(result = cache.handle_ledger_entry(unsigned_index, {}));
REQUIRE(!result);
REQUIRE_NOTHROW(
result = cache.handle_ledger_entry(unsigned_index, {0x1, 0x2, 0x3}));
REQUIRE(!result);
REQUIRE_NOTHROW(
result = cache.handle_ledger_entry(unsigned_index, ledger[low_index]));
REQUIRE(!result);
REQUIRE_NOTHROW(
result = cache.handle_ledger_entry(
unsigned_index, ledger[high_signature_transaction]));
REQUIRE(!result);
}
}

Просмотреть файл

@ -209,6 +209,78 @@ def test_raw_text(network, args):
return network
@reqs.description("Read historical state")
@reqs.supports_methods("LOG_record", "LOG_get", "LOG_get_historical")
def test_historical_query(network, args):
if args.consensus == "pbft":
LOG.warning("Skipping historical queries in PBFT")
return network
if args.package == "liblogging":
primary, _ = network.find_primary()
with primary.node_client() as nc:
check_commit = infra.checker.Checker(nc)
check = infra.checker.Checker()
with primary.user_client() as c:
log_id = 10
msg = "This tests historical queries"
record_response = c.rpc("LOG_record", {"id": log_id, "msg": msg})
check_commit(record_response, result=True)
view = record_response.view
seqno = record_response.seqno
msg2 = "This overwrites the original message"
check_commit(
c.rpc("LOG_record", {"id": log_id, "msg": msg2}), result=True
)
check(c.get("LOG_get", {"id": log_id}), result={"msg": msg2})
timeout = 15
found = False
params = {"view": view, "seqno": seqno, "id": log_id}
end_time = time.time() + timeout
while time.time() < end_time:
get_response = c.rpc("LOG_get_historical", params)
if get_response.status == http.HTTPStatus.ACCEPTED:
retry_after = get_response.headers.get("retry-after")
if retry_after is None:
raise ValueError(
f"Response with status {get_response.status} is missing 'retry-after' header"
)
retry_after = int(retry_after)
LOG.warning(f"Sleeping for {retry_after}s")
time.sleep(retry_after)
elif get_response.status == http.HTTPStatus.OK:
assert (
get_response.result == msg
), f"{get_response.body} != {msg}"
found = True
break
elif get_response.status == http.HTTPStatus.NO_CONTENT:
raise ValueError(
f"Historical query response claims there was no write to {log_id} at {view}.{seqno}"
)
else:
raise ValueError(
f"Unexpected response status {get_response.status}: {get_response}"
)
if not found:
raise TimeoutError(
f"Unable to handle historical query after {timeout}s"
)
else:
LOG.warning(
f"Skipping {inspect.currentframe().f_code.co_name} as application is not C++"
)
return network
@reqs.description("Testing forwarding on member and node frontends")
@reqs.supports_methods("mkSign")
@reqs.at_least_n_nodes(2)
@ -455,6 +527,7 @@ def run(args):
network = test_cert_prefix(network, args)
network = test_anonymous_caller(network, args)
network = test_raw_text(network, args)
network = test_historical_query(network, args)
network = test_view_history(network, args)

Просмотреть файл

@ -62,13 +62,14 @@ class FakeSocket:
class Response:
def __init__(self, status, result, error, seqno, view, global_commit):
def __init__(self, status, result, error, seqno, view, global_commit, headers):
self.status = status
self.result = result
self.error = error
self.seqno = seqno
self.view = view
self.global_commit = global_commit
self.headers = headers
def to_dict(self):
d = {
@ -101,6 +102,7 @@ class Response:
seqno=int_or_none(rr.headers.get(CCF_TX_SEQNO_HEADER)),
view=int_or_none(rr.headers.get(CCF_TX_VIEW_HEADER)),
global_commit=int_or_none(rr.headers.get(CCF_GLOBAL_COMMIT_HEADER)),
headers=rr.headers,
)
@staticmethod
@ -128,6 +130,7 @@ class Response:
seqno=int_or_none(response.getheader(CCF_TX_SEQNO_HEADER)),
view=int_or_none(response.getheader(CCF_TX_VIEW_HEADER)),
global_commit=int_or_none(response.getheader(CCF_GLOBAL_COMMIT_HEADER)),
headers=response.headers,
)
@ -415,7 +418,7 @@ class WSClient:
else:
result = None
error = payload.decode()
return Response(status, result, error, seqno, view, global_commit)
return Response(status, result, error, seqno, view, global_commit, headers={})
class CCFClient: