Modify sample historical range query endpoint to better handle subranges (#2556)

This commit is contained in:
Eddy Ashton 2021-05-07 11:20:23 +01:00 коммит произвёл GitHub
Родитель 45db0a3b27
Коммит 90648533ea
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 165 добавлений и 46 удалений

Просмотреть файл

@ -30,12 +30,18 @@ The Logging application simply has:
:lines: 1
:dedent:
Table creation happens in the app's constructor:
These tables are then accessed by type and name:
.. literalinclude:: ../../samples/apps/logging/logging.cpp
:language: cpp
:start-after: SNIPPET_START: constructor
:end-before: SNIPPET_END: constructor
:start-after: SNIPPET: public_table_access
:lines: 1
:dedent:
.. literalinclude:: ../../samples/apps/logging/logging.cpp
:language: cpp
:start-after: SNIPPET: private_table_access
:lines: 1
:dedent:
RPC Handler

Просмотреть файл

@ -629,7 +629,7 @@
{
"in": "query",
"name": "from_seqno",
"required": true,
"required": false,
"schema": {
"$ref": "#/components/schemas/uint64"
}
@ -637,7 +637,7 @@
{
"in": "query",
"name": "to_seqno",
"required": true,
"required": false,
"schema": {
"$ref": "#/components/schemas/uint64"
}

Просмотреть файл

@ -23,7 +23,11 @@ using namespace nlohmann;
namespace loggingapp
{
// SNIPPET: table_definition
using Table = kv::Map<size_t, string>;
using RecordsTable = kv::Map<size_t, string>;
// Stores the index at which each key was first written to. Must be written by
// the _next_ write transaction to that key.
using FirstWritesTable = kv::Map<size_t, ccf::SeqNo>;
// SNIPPET_START: custom_identity
struct CustomIdentity : public ccf::AuthnIdentity
@ -118,9 +122,6 @@ namespace loggingapp
class LoggerHandlers : public ccf::UserEndpointRegistry
{
private:
Table records;
Table public_records;
const nlohmann::json record_public_params_schema;
const nlohmann::json record_public_result_schema;
@ -129,13 +130,24 @@ namespace loggingapp
metrics::Tracker metrics_tracker;
void update_first_write(kv::Tx& tx, size_t id)
{
auto first_writes = tx.rw<FirstWritesTable>("first_write_version");
if (!first_writes->has(id))
{
auto private_records = tx.ro<RecordsTable>("records");
const auto prev_version =
private_records->get_version_of_previous_write(id);
if (prev_version.has_value())
{
first_writes->put(id, prev_version.value());
}
}
}
public:
// SNIPPET_START: constructor
LoggerHandlers(ccfapp::AbstractNodeContext& context) :
ccf::UserEndpointRegistry(context),
records("records"),
public_records("public:records"),
// SNIPPET_END: constructor
record_public_params_schema(nlohmann::json::parse(j_record_public_in)),
record_public_result_schema(nlohmann::json::parse(j_record_public_out)),
get_public_params_schema(nlohmann::json::parse(j_get_public_in)),
@ -158,8 +170,10 @@ namespace loggingapp
"Cannot record an empty log message.");
}
auto records_handle = ctx.tx.rw(records);
// SNIPPET: private_table_access
auto records_handle = ctx.tx.template rw<RecordsTable>("records");
records_handle->put(in.id, in.msg);
update_first_write(ctx.tx, in.id);
return ccf::make_success(true);
};
// SNIPPET_END: record
@ -195,7 +209,7 @@ namespace loggingapp
std::move(error_reason));
}
auto records_handle = args.tx.ro(records);
auto records_handle = args.tx.template ro<RecordsTable>("records");
auto record = records_handle->get(id);
if (record.has_value())
@ -236,8 +250,9 @@ namespace loggingapp
std::move(error_reason));
}
auto records_handle = ctx.tx.rw(records);
auto records_handle = ctx.tx.template rw<RecordsTable>("records");
auto removed = records_handle->remove(id);
update_first_write(ctx.tx, id);
return ccf::make_success(LoggingRemove::Out{removed});
};
@ -259,7 +274,9 @@ namespace loggingapp
"Cannot record an empty log message.");
}
auto records_handle = ctx.tx.rw(public_records);
// SNIPPET: public_table_access
auto records_handle =
ctx.tx.template rw<RecordsTable>("public:records");
records_handle->put(params["id"], in.msg);
return ccf::make_success(true);
};
@ -288,7 +305,8 @@ namespace loggingapp
std::move(error_reason));
}
auto public_records_handle = args.tx.ro(public_records);
auto public_records_handle =
args.tx.template ro<RecordsTable>("public:records");
auto record = public_records_handle->get(id);
if (record.has_value())
@ -324,7 +342,8 @@ namespace loggingapp
std::move(error_reason));
}
auto records_handle = ctx.tx.rw(public_records);
auto records_handle =
ctx.tx.template rw<RecordsTable>("public:records");
auto removed = records_handle->remove(id);
return ccf::make_success(LoggingRemove::Out{removed});
@ -368,8 +387,9 @@ namespace loggingapp
}
const auto log_line = fmt::format("{}: {}", cert->subject, in.msg);
auto records_handle = args.tx.rw(records);
auto records_handle = args.tx.template rw<RecordsTable>("records");
records_handle->put(in.id, log_line);
update_first_write(args.tx, in.id);
args.rpc_ctx->set_response_status(HTTP_STATUS_OK);
args.rpc_ctx->set_response_header(
@ -396,8 +416,9 @@ namespace loggingapp
}
const auto log_line = fmt::format("Anonymous: {}", in.msg);
auto records_handle = args.tx.rw(records);
auto records_handle = args.tx.template rw<RecordsTable>("records");
records_handle->put(in.id, log_line);
update_first_write(args.tx, in.id);
return ccf::make_success(true);
};
make_endpoint(
@ -620,8 +641,9 @@ namespace loggingapp
const std::vector<uint8_t>& content = args.rpc_ctx->get_request_body();
const std::string log_line(content.begin(), content.end());
auto records_handle = args.tx.rw(records);
auto records_handle = args.tx.template rw<RecordsTable>("records");
records_handle->put(id, log_line);
update_first_write(args.tx, id);
args.rpc_ctx->set_response_status(HTTP_STATUS_OK);
};
@ -652,7 +674,8 @@ namespace loggingapp
}
auto historical_tx = historical_state->store->create_read_only_tx();
auto records_handle = historical_tx.ro(records);
auto records_handle =
historical_tx.template ro<RecordsTable>("records");
const auto v = records_handle->get(id);
if (v.has_value())
@ -708,7 +731,8 @@ namespace loggingapp
}
auto historical_tx = historical_state->store->create_read_only_tx();
auto records_handle = historical_tx.ro(records);
auto records_handle =
historical_tx.template ro<RecordsTable>("records");
const auto v = records_handle->get(id);
if (v.has_value())
@ -747,15 +771,8 @@ namespace loggingapp
std::string error_reason;
size_t from_seqno;
size_t to_seqno;
size_t id;
if (
!http::get_query_value(
parsed_query, "from_seqno", from_seqno, error_reason) ||
!http::get_query_value(
parsed_query, "to_seqno", to_seqno, error_reason) ||
!http::get_query_value(parsed_query, "id", id, error_reason))
if (!http::get_query_value(parsed_query, "id", id, error_reason))
{
args.rpc_ctx->set_error(
HTTP_STATUS_BAD_REQUEST,
@ -764,6 +781,81 @@ namespace loggingapp
return;
}
size_t from_seqno;
if (!http::get_query_value(
parsed_query, "from_seqno", from_seqno, error_reason))
{
// If no start point is specified, use the first time this ID was
// written to
auto first_writes =
args.tx.ro<FirstWritesTable>("first_write_version");
const auto first_write_version = first_writes->get(id);
if (first_write_version.has_value())
{
from_seqno = first_write_version.value();
}
else
{
// It's possible there's been a single write but no subsequent
// transaction to write this to the FirstWritesTable - check version
// of previous write
auto records = args.tx.ro<RecordsTable>("records");
const auto last_written_version =
records->get_version_of_previous_write(id);
if (last_written_version.has_value())
{
from_seqno = last_written_version.value();
}
else
{
// This key has never been written to. Return the empty response
// now
LoggingGetHistoricalRange::Out response;
nlohmann::json j_response = response;
args.rpc_ctx->set_response_status(HTTP_STATUS_OK);
args.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE,
http::headervalues::contenttype::JSON);
args.rpc_ctx->set_response_body(j_response.dump());
return;
}
}
}
size_t to_seqno;
if (!http::get_query_value(
parsed_query, "to_seqno", to_seqno, error_reason))
{
// If no end point is specified, use the last time this ID was
// written to
auto records = args.tx.ro<RecordsTable>("records");
const auto last_written_version =
records->get_version_of_previous_write(id);
if (last_written_version.has_value())
{
to_seqno = last_written_version.value();
}
else
{
// If there's no last written version, it may have never been
// written but may simply be currently deleted. Use current commit
// index as end point to ensure we include any deleted entries.
ccf::View view;
ccf::SeqNo seqno;
const auto result = get_last_committed_txid_v1(view, seqno);
if (result != ccf::ApiResult::OK)
{
args.rpc_ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
fmt::format(
"Failed to get committed transaction: {}",
ccf::api_result_to_str(result)));
}
to_seqno = seqno;
}
}
// Range must be in order
if (to_seqno < from_seqno)
{
@ -803,8 +895,7 @@ namespace loggingapp
ccf::errors::InvalidInput,
fmt::format(
"Only committed transactions can be queried. Transaction {}.{} "
"is "
"{}",
"is {}",
view_of_final_seqno,
to_seqno,
ccf::tx_status_to_str(tx_status)));
@ -862,7 +953,8 @@ namespace loggingapp
auto& store = stores[i];
auto historical_tx = store->create_read_only_tx();
auto records_handle = historical_tx.ro(records);
auto records_handle =
historical_tx.template ro<RecordsTable>("records");
const auto v = records_handle->get(id);
if (v.has_value())
@ -906,7 +998,7 @@ namespace loggingapp
nlohmann::json j_response = response;
args.rpc_ctx->set_response_status(HTTP_STATUS_OK);
args.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::TEXT);
http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON);
args.rpc_ctx->set_response_body(j_response.dump());
// ALSO: Assume this response makes it all the way to the client, and
@ -920,8 +1012,10 @@ namespace loggingapp
get_historical_range,
auth_policies)
.set_auto_schema<void, LoggingGetHistoricalRange::Out>()
.add_query_parameter<size_t>("from_seqno")
.add_query_parameter<size_t>("to_seqno")
.add_query_parameter<size_t>(
"from_seqno", ccf::endpoints::QueryParamPresence::OptionalParameter)
.add_query_parameter<size_t>(
"to_seqno", ccf::endpoints::QueryParamPresence::OptionalParameter)
.add_query_parameter<size_t>("id")
.set_forwarding_required(ccf::endpoints::ForwardingRequired::Never)
.install();
@ -970,8 +1064,9 @@ namespace loggingapp
"Cannot record an empty log message.");
}
auto view = ctx.tx.rw(records);
auto view = ctx.tx.template rw<RecordsTable>("records");
view->put(in.id, in.msg);
update_first_write(ctx.tx, in.id);
return ccf::make_success(true);
};
make_endpoint(

Просмотреть файл

@ -523,22 +523,26 @@ def test_historical_query_range(network, args):
primary, _ = network.find_primary()
id_a = 100
id_b = 101
id_a = 142
id_b = 143
first_seqno = None
last_seqno = None
def get_all_entries(target_id):
def get_all_entries(target_id, from_seqno=None, to_seqno=None):
LOG.info(
f"Getting historical entries from {first_seqno} to {last_seqno} for id {target_id}"
f"Getting historical entries{f' from {from_seqno}' if from_seqno is not None else ''}{f' to {last_seqno}' if to_seqno is not None else ''} for id {target_id}"
)
logs = []
with primary.client("user0") as c:
timeout = 5
end_time = time.time() + timeout
entries = []
path = f"/app/log/private/historical/range?from_seqno={first_seqno}&to_seqno={last_seqno}&id={target_id}"
path = f"/app/log/private/historical/range?id={target_id}"
if from_seqno is not None:
path += f"&from_seqno={first_seqno}"
if to_seqno is not None:
path += f"&to_seqno={to_seqno}"
while time.time() < end_time:
r = c.get(path, log_capture=logs)
if r.status_code == http.HTTPStatus.OK:
@ -573,7 +577,9 @@ def test_historical_query_range(network, args):
n_entries = 100
for i in range(n_entries):
idx = id_b if i % 3 == 0 else id_a
network.txs.issue(network, repeat=True, idx=idx, wait_for_sync=False)
network.txs.issue(
network, repeat=True, idx=idx, wait_for_sync=False, log_capture=[]
)
_, tx = network.txs.get_last_tx(idx=idx)
msg = tx["msg"]
seqno = tx["seqno"]
@ -589,6 +595,15 @@ def test_historical_query_range(network, args):
entries_a = get_all_entries(id_a)
entries_b = get_all_entries(id_b)
# Confirm that we can retrieve these with more specific queries, and we end up with the same result
alt_a = get_all_entries(id_a, from_seqno=first_seqno)
assert alt_a == entries_a
alt_a = get_all_entries(id_a, to_seqno=last_seqno)
assert alt_a == entries_a
alt_a = get_all_entries(id_a, from_seqno=first_seqno, to_seqno=last_seqno)
assert alt_a == entries_a
actual_len = len(entries_a) + len(entries_b)
assert (
n_entries == actual_len

Просмотреть файл

@ -60,9 +60,10 @@ class LoggingTxs:
repeat=False,
idx=None,
wait_for_sync=True,
log_capture=None,
):
self.network = network
remote_node, _ = network.find_primary()
remote_node, _ = network.find_primary(log_capture=log_capture)
if on_backup:
remote_node = network.find_any_backup()
@ -86,6 +87,7 @@ class LoggingTxs:
"id": target_idx,
"msg": priv_msg,
},
log_capture=log_capture,
)
self.priv[target_idx].append(
{"msg": priv_msg, "seqno": rep_priv.seqno, "view": rep_priv.view}
@ -100,6 +102,7 @@ class LoggingTxs:
"id": target_idx,
"msg": pub_msg,
},
log_capture=log_capture,
)
self.pub[target_idx].append(
{"msg": pub_msg, "seqno": rep_pub.seqno, "view": rep_pub.view}