зеркало из https://github.com/microsoft/CCF.git
Add a builtin endpoint describing installed indexing strategies (#5061)
This commit is contained in:
Родитель
06888e47c1
Коммит
8168ce9282
|
@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
|||
- Upgraded OpenEnclave to [0.18.5](https://github.com/openenclave/openenclave/releases/tag/v0.18.5).
|
||||
- Upgraded t_cose from [v1.1 to v1.1.1](https://github.com/laurencelundblade/t_cose/compare/v1.1...v1.1.1). v1.1.1 can optionally allow unknown critical header parameters in COSE_Sign1 envelopes which is desirable for CCF C++ applications.
|
||||
|
||||
### Added
|
||||
|
||||
- New `/node/index/strategies` endpoint, which will list all indexing strategies currently installed alongside a description of how far each has progressed.
|
||||
|
||||
## [4.0.0-dev4]
|
||||
|
||||
[4.0.0-dev4]: https://github.com/microsoft/CCF/releases/tag/ccf-4.0.0-dev4
|
||||
|
|
|
@ -348,7 +348,7 @@
|
|||
"info": {
|
||||
"description": "This CCF sample app implements a simple logging application, securely recording messages at client-specified IDs. It demonstrates most of the features available to CCF apps.",
|
||||
"title": "CCF Sample Logging App",
|
||||
"version": "1.18.0"
|
||||
"version": "1.19.0"
|
||||
},
|
||||
"openapi": "3.0.0",
|
||||
"paths": {
|
||||
|
@ -1009,6 +1009,21 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"/app/log/private/uninstall_committed_index": {
|
||||
"post": {
|
||||
"responses": {
|
||||
"204": {
|
||||
"description": "Default response description"
|
||||
},
|
||||
"default": {
|
||||
"$ref": "#/components/responses/default"
|
||||
}
|
||||
},
|
||||
"x-ccf-forwarding": {
|
||||
"$ref": "#/components/x-ccf-forwarding/sometimes"
|
||||
}
|
||||
}
|
||||
},
|
||||
"/app/log/public": {
|
||||
"delete": {
|
||||
"parameters": [
|
||||
|
|
|
@ -939,7 +939,7 @@
|
|||
"info": {
|
||||
"description": "This API provides public, uncredentialed access to service and node state.",
|
||||
"title": "CCF Public Node API",
|
||||
"version": "2.38.0"
|
||||
"version": "2.39.0"
|
||||
},
|
||||
"openapi": "3.0.0",
|
||||
"paths": {
|
||||
|
@ -1100,6 +1100,28 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"/node/index/strategies": {
|
||||
"get": {
|
||||
"responses": {
|
||||
"200": {
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/json"
|
||||
}
|
||||
}
|
||||
},
|
||||
"description": "Default response description"
|
||||
},
|
||||
"default": {
|
||||
"$ref": "#/components/responses/default"
|
||||
}
|
||||
},
|
||||
"x-ccf-forwarding": {
|
||||
"$ref": "#/components/x-ccf-forwarding/never"
|
||||
}
|
||||
}
|
||||
},
|
||||
"/node/js_metrics": {
|
||||
"get": {
|
||||
"responses": {
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#define FMT_HEADER_ONLY
|
||||
#include <fmt/format.h>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <set>
|
||||
|
||||
namespace ds
|
||||
{
|
||||
|
@ -112,6 +113,10 @@ namespace ds
|
|||
return fmt::format("{}_array", schema_name<typename T::value_type>());
|
||||
}
|
||||
}
|
||||
else if constexpr (nonstd::is_specialization<T, std::set>::value)
|
||||
{
|
||||
return fmt::format("{}_set", schema_name<typename T::value_type>());
|
||||
}
|
||||
else if constexpr (
|
||||
nonstd::is_specialization<T, std::map>::value ||
|
||||
nonstd::is_specialization<T, std::unordered_map>::value)
|
||||
|
@ -197,7 +202,9 @@ namespace ds
|
|||
{
|
||||
fill_schema<typename T::value_type>(schema);
|
||||
}
|
||||
else if constexpr (nonstd::is_specialization<T, std::vector>::value)
|
||||
else if constexpr (
|
||||
nonstd::is_specialization<T, std::vector>::value ||
|
||||
nonstd::is_specialization<T, std::set>::value)
|
||||
{
|
||||
if constexpr (std::is_same<T, std::vector<uint8_t>>::value)
|
||||
{
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include <llhttp/llhttp.h>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <regex>
|
||||
#include <set>
|
||||
#include <string_view>
|
||||
|
||||
namespace ds
|
||||
|
@ -278,7 +279,9 @@ namespace ds
|
|||
{
|
||||
return add_schema_component<typename T::value_type>();
|
||||
}
|
||||
else if constexpr (nonstd::is_specialization<T, std::vector>::value)
|
||||
else if constexpr (
|
||||
nonstd::is_specialization<T, std::vector>::value ||
|
||||
nonstd::is_specialization<T, std::set>::value)
|
||||
{
|
||||
if constexpr (std::is_same<T, std::vector<uint8_t>>::value)
|
||||
{
|
||||
|
|
|
@ -48,5 +48,17 @@ namespace ccf::indexing
|
|||
|
||||
strategies.erase(strategy);
|
||||
}
|
||||
|
||||
nlohmann::json describe() const
|
||||
{
|
||||
auto j = nlohmann::json::array();
|
||||
|
||||
for (const auto& strategy : strategies)
|
||||
{
|
||||
j.push_back(strategy->describe());
|
||||
}
|
||||
|
||||
return j;
|
||||
}
|
||||
};
|
||||
}
|
|
@ -22,6 +22,8 @@ namespace ccf::indexing::strategies
|
|||
const ccf::ByteVector& k,
|
||||
const ccf::ByteVector& v) override;
|
||||
|
||||
nlohmann::json describe() override;
|
||||
|
||||
std::optional<SeqNoCollection> get_write_txs_impl(
|
||||
const ccf::ByteVector& serialised_key, ccf::SeqNo from, ccf::SeqNo to);
|
||||
|
||||
|
|
|
@ -32,6 +32,8 @@ namespace ccf::indexing::strategies
|
|||
const ccf::TxID& tx_id, const kv::ReadOnlyStorePtr& store) override;
|
||||
std::optional<ccf::SeqNo> next_requested() override;
|
||||
|
||||
nlohmann::json describe() override;
|
||||
|
||||
ccf::TxID get_indexed_watermark() const;
|
||||
};
|
||||
}
|
||||
|
|
|
@ -46,6 +46,20 @@ namespace ccf::indexing
|
|||
* nullopt if it wants none. Allows indexes to be populated
|
||||
* lazily on-demand, or out-of-order, or reset */
|
||||
virtual std::optional<ccf::SeqNo> next_requested() = 0;
|
||||
|
||||
virtual nlohmann::json describe()
|
||||
{
|
||||
auto j = nlohmann::json::object();
|
||||
j["name"] = get_name();
|
||||
|
||||
const auto nr = next_requested();
|
||||
if (nr.has_value())
|
||||
{
|
||||
j["next_requested_seqno"] = *nr;
|
||||
}
|
||||
|
||||
return j;
|
||||
}
|
||||
};
|
||||
|
||||
using StrategyPtr = std::shared_ptr<Strategy>;
|
||||
|
|
|
@ -132,19 +132,25 @@ namespace loggingapp
|
|||
class CommittedRecords : public ccf::indexing::Strategy
|
||||
{
|
||||
private:
|
||||
std::string map_name;
|
||||
std::map<size_t, std::string> records;
|
||||
std::mutex txid_lock;
|
||||
ccf::TxID current_txid = {};
|
||||
|
||||
public:
|
||||
CommittedRecords(const std::string& name) : ccf::indexing::Strategy(name) {}
|
||||
CommittedRecords(
|
||||
const std::string& map_name_, const ccf::TxID& initial_txid = {}) :
|
||||
ccf::indexing::Strategy(fmt::format("CommittedRecords {}", map_name_)),
|
||||
map_name(map_name_),
|
||||
current_txid(initial_txid)
|
||||
{}
|
||||
|
||||
void handle_committed_transaction(
|
||||
const ccf::TxID& tx_id, const kv::ReadOnlyStorePtr& store)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(txid_lock);
|
||||
auto tx_diff = store->create_tx_diff();
|
||||
auto m = tx_diff.template diff<RecordsMap>(PRIVATE_RECORDS);
|
||||
auto m = tx_diff.template diff<RecordsMap>(map_name);
|
||||
m->foreach([this](const size_t& k, std::optional<std::string> v) -> bool {
|
||||
if (v.has_value())
|
||||
{
|
||||
|
@ -306,14 +312,12 @@ namespace loggingapp
|
|||
"recording messages at client-specified IDs. It demonstrates most of "
|
||||
"the features available to CCF apps.";
|
||||
|
||||
openapi_info.document_version = "1.18.0";
|
||||
openapi_info.document_version = "1.19.0";
|
||||
|
||||
index_per_public_key = std::make_shared<RecordsIndexingStrategy>(
|
||||
PUBLIC_RECORDS, context, 10000, 20);
|
||||
context.get_indexing_strategies().install_strategy(index_per_public_key);
|
||||
|
||||
committed_records = std::make_shared<CommittedRecords>(PRIVATE_RECORDS);
|
||||
|
||||
const ccf::AuthnPolicies auth_policies = {
|
||||
ccf::jwt_auth_policy, ccf::user_cert_auth_policy};
|
||||
|
||||
|
@ -455,10 +459,34 @@ namespace loggingapp
|
|||
// track of deleted keys too, so that the index can observe the deleted
|
||||
// keys.
|
||||
auto install_committed_index = [this, &context](auto& ctx) {
|
||||
if (committed_records != nullptr)
|
||||
{
|
||||
ctx.rpc_ctx->set_response_status(HTTP_STATUS_PRECONDITION_FAILED);
|
||||
ctx.rpc_ctx->set_response_body("Already installed");
|
||||
return;
|
||||
}
|
||||
|
||||
ccf::View view;
|
||||
ccf::SeqNo seqno;
|
||||
auto result = get_last_committed_txid_v1(view, seqno);
|
||||
if (result != ccf::ApiResult::OK)
|
||||
{
|
||||
ctx.rpc_ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
|
||||
ctx.rpc_ctx->set_response_body(fmt::format(
|
||||
"Failed to retrieve current committed TxID: {}", result));
|
||||
return;
|
||||
}
|
||||
|
||||
// tracking committed records also wants to track deletes so enable that
|
||||
// in the historical queries too
|
||||
context.get_historical_state().track_deletes_on_missing_keys(true);
|
||||
|
||||
// Indexing from the start of time may be expensive. Since this is a
|
||||
// locally-targetted sample, we only index from the _currently_
|
||||
// committed TxID
|
||||
committed_records = std::make_shared<CommittedRecords>(
|
||||
PRIVATE_RECORDS, ccf::TxID{view, seqno});
|
||||
|
||||
context.get_indexing_strategies().install_strategy(committed_records);
|
||||
};
|
||||
|
||||
|
@ -470,6 +498,26 @@ namespace loggingapp
|
|||
.set_auto_schema<void, void>()
|
||||
.install();
|
||||
|
||||
auto uninstall_committed_index = [this, &context](auto& ctx) {
|
||||
if (committed_records == nullptr)
|
||||
{
|
||||
ctx.rpc_ctx->set_response_status(HTTP_STATUS_PRECONDITION_FAILED);
|
||||
ctx.rpc_ctx->set_response_body("Not currently installed");
|
||||
return;
|
||||
}
|
||||
|
||||
context.get_indexing_strategies().uninstall_strategy(committed_records);
|
||||
committed_records = nullptr;
|
||||
};
|
||||
|
||||
make_command_endpoint(
|
||||
"/log/private/uninstall_committed_index",
|
||||
HTTP_POST,
|
||||
uninstall_committed_index,
|
||||
ccf::no_auth_required)
|
||||
.set_auto_schema<void, void>()
|
||||
.install();
|
||||
|
||||
auto get_committed = [this](auto& ctx) {
|
||||
// Parse id from query
|
||||
const auto parsed_query =
|
||||
|
|
|
@ -75,10 +75,9 @@ namespace ccf::indexing
|
|||
const auto next_requested = strategy->next_requested();
|
||||
if (!next_requested.has_value())
|
||||
{
|
||||
// If this strategy does not want any more Txs, don't consider
|
||||
// advancing it any further If this strategy has an upper-bound on Txs
|
||||
// it cares about, and we've already provided that, don't consider
|
||||
// advancing it any further
|
||||
// If this strategy has an upper-bound on Txs it cares about, and
|
||||
// we've already provided that, don't consider advancing it any
|
||||
// further
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -398,6 +398,19 @@ namespace ccf::indexing::strategies
|
|||
current_seqnos.insert(tx_id.seqno);
|
||||
}
|
||||
|
||||
nlohmann::json SeqnosByKey_Bucketed_Untyped::describe()
|
||||
{
|
||||
auto j = VisitEachEntryInMap::describe();
|
||||
{
|
||||
std::lock_guard<ccf::pal::Mutex> guard(impl->results_access);
|
||||
j["seqnos_per_bucket"] = impl->seqnos_per_bucket;
|
||||
j["old_results_max_size"] = impl->old_results.get_max_size();
|
||||
j["old_results_current_size"] = impl->old_results.size();
|
||||
j["current_results_size"] = impl->current_results.size();
|
||||
}
|
||||
return j;
|
||||
}
|
||||
|
||||
std::optional<SeqNoCollection> SeqnosByKey_Bucketed_Untyped::
|
||||
get_write_txs_impl(
|
||||
const ccf::ByteVector& serialised_key, ccf::SeqNo from, ccf::SeqNo to)
|
||||
|
|
|
@ -33,6 +33,14 @@ namespace ccf::indexing::strategies
|
|||
return current_txid.seqno + 1;
|
||||
}
|
||||
|
||||
nlohmann::json VisitEachEntryInMap::describe()
|
||||
{
|
||||
auto j = Strategy::describe();
|
||||
j["target_map"] = map_name;
|
||||
j["indexed_watermark"] = get_indexed_watermark();
|
||||
return j;
|
||||
}
|
||||
|
||||
ccf::TxID VisitEachEntryInMap::get_indexed_watermark() const
|
||||
{
|
||||
return current_txid;
|
||||
|
|
|
@ -390,7 +390,7 @@ namespace ccf
|
|||
openapi_info.description =
|
||||
"This API provides public, uncredentialed access to service and node "
|
||||
"state.";
|
||||
openapi_info.document_version = "2.38.0";
|
||||
openapi_info.document_version = "2.39.0";
|
||||
}
|
||||
|
||||
void init_handlers() override
|
||||
|
@ -1790,6 +1790,21 @@ namespace ccf
|
|||
.set_forwarding_required(endpoints::ForwardingRequired::Never)
|
||||
.set_auto_schema<void, ServiceConfiguration>()
|
||||
.install();
|
||||
|
||||
auto list_indexing_strategies = [this](
|
||||
auto& args,
|
||||
const nlohmann::json& params) {
|
||||
return make_success(this->context.get_indexing_strategies().describe());
|
||||
};
|
||||
|
||||
make_endpoint(
|
||||
"/index/strategies",
|
||||
HTTP_GET,
|
||||
json_adapter(list_indexing_strategies),
|
||||
no_auth_required)
|
||||
.set_forwarding_required(endpoints::ForwardingRequired::Never)
|
||||
.set_auto_schema<void, nlohmann::json>()
|
||||
.install();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1602,11 +1602,26 @@ def test_post_local_commit_failure(network, args):
|
|||
)
|
||||
@reqs.supports_methods("/app/log/private/committed", "/app/log/private")
|
||||
def test_committed_index(network, args, timeout=5):
|
||||
def get_strategies(client):
|
||||
# Also test /node/index/strategies here, since this test already adds and
|
||||
# removes indexing strategies
|
||||
res = client.get("/node/index/strategies")
|
||||
assert res.status_code == http.HTTPStatus.OK
|
||||
# Dictify here for easy lookup
|
||||
return {o["name"]: o for o in res.body.json()}
|
||||
|
||||
remote_node, _ = network.find_primary()
|
||||
strategy_name = "CommittedRecords records"
|
||||
with remote_node.client() as c:
|
||||
strategies = get_strategies(c)
|
||||
assert strategy_name not in strategies
|
||||
|
||||
res = c.post("/app/log/private/install_committed_index")
|
||||
assert res.status_code == http.HTTPStatus.OK
|
||||
|
||||
strategies = get_strategies(c)
|
||||
assert strategy_name in strategies
|
||||
|
||||
txid = network.txs.issue(network, number_txs=1, send_public=False)
|
||||
|
||||
_, log_id = network.txs.get_log_id(txid)
|
||||
|
@ -1642,6 +1657,14 @@ def test_committed_index(network, args, timeout=5):
|
|||
assert r.body.json()["error"]["message"] == f"No such record: {log_id}."
|
||||
assert r.body.json()["error"]["code"] == "ResourceNotFound"
|
||||
|
||||
# Uninstall index before proceeding
|
||||
with remote_node.client() as c:
|
||||
res = c.post("/app/log/private/uninstall_committed_index")
|
||||
assert res.status_code == http.HTTPStatus.OK
|
||||
|
||||
strategies = get_strategies(c)
|
||||
assert strategy_name not in strategies
|
||||
|
||||
|
||||
@reqs.description(
|
||||
"Check BasicConstraints are set correctly on network and node certificates"
|
||||
|
|
Загрузка…
Ссылка в новой задаче