[release/2.x] Cherry pick: Improve test coverage for historical range queries (#4964) (#4990)

This commit is contained in:
Julien Maffre 2023-02-13 10:10:25 +00:00 коммит произвёл GitHub
Родитель 60b3f8eba2
Коммит 53eaeb2ae2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 213 добавлений и 202 удалений

Просмотреть файл

@ -45,7 +45,7 @@ jobs:
suffix: "Instrumented"
artifact_name: "NoSGX_Instrumented"
ctest_filter: '-LE "benchmark|perf"'
ctest_timeout: "800"
ctest_timeout: "1600"
- template: common.yml
parameters:

Просмотреть файл

@ -1,4 +1,4 @@
___ ___
(- *) (O o) | Y ^ O
( V ) < V > O +---'---'
(- *) (o o) | Y & +
( V ) z v z O +---'---'
/--x-m- /--m-m---xXx--/--yy---

Просмотреть файл

@ -40,20 +40,6 @@ function delete_record(map, id) {
return { body: true };
}
function update_first_write(id, is_private = false, scope) {
const first_writes =
ccf.kv[is_private ? "first_write_version" : "public:first_write_version"];
if (!first_writes.has(id)) {
const records = is_private
? private_records(ccf.kv, scope)
: public_records(ccf.kv, scope);
const prev_version = records.getVersionOfPreviousWrite(id);
if (prev_version) {
first_writes.set(id, ccf.jsonCompatibleToBuf(prev_version));
}
}
}
export function get_private(request, scope) {
const parsedQuery = parse_request_query(request);
const id = get_id_from_query(parsedQuery);
@ -91,17 +77,6 @@ export function get_historical_public_with_receipt(request) {
return result;
}
function get_first_write_version(id, is_private = true) {
let version =
ccf.kv[
is_private ? "first_write_version" : "public:first_write_version"
].get(id);
if (version !== undefined) {
version = ccf.bufToJsonCompatible(version);
}
return version;
}
function get_last_write_version(id, is_private = true, scope) {
const records = is_private
? private_records(ccf.kv, scope)
@ -119,27 +94,9 @@ function get_historical_range_impl(request, isPrivate, nextLinkPrefix) {
throw new Error("from_seqno is not an integer");
}
} else {
// If no start point is specified, use the first time this ID was
// written to
const firstWriteVersion = get_first_write_version(id, isPrivate);
if (firstWriteVersion !== undefined) {
from_seqno = firstWriteVersion;
} else {
// It's possible there's been a single write but no subsequent
// transaction to write this to the FirstWritesMap - check version
// of previous write
const lastWrittenVersion = get_last_write_version(id, isPrivate);
if (lastWrittenVersion !== undefined) {
from_seqno = lastWrittenVersion;
} else {
// This key has never been written to. Return the empty response now
return {
body: {
entries: [],
},
};
}
}
// If no from_seqno is specified, defaults to very first transaction
// in ledger
from_seqno = 1;
}
if (to_seqno !== undefined) {
@ -295,7 +252,6 @@ export function post_private(request) {
let params = request.body.json();
const id = ccf.strToBuf(params.id.toString());
private_records(ccf.kv, parsedQuery.scope).set(id, ccf.strToBuf(params.msg));
update_first_write(id);
return { body: true };
}
@ -304,7 +260,6 @@ export function post_public(request) {
let params = request.body.json();
const id = ccf.strToBuf(params.id.toString());
public_records(ccf.kv, parsedQuery.scope).set(id, ccf.strToBuf(params.msg));
update_first_write(id, false);
if (params.record_claim) {
const claims_digest = ccf.digest("SHA-256", ccf.strToBuf(params.msg));
ccf.rpc.setClaimsDigest(claims_digest);
@ -315,23 +270,18 @@ export function post_public(request) {
export function delete_private(request) {
const parsedQuery = parse_request_query(request);
const id = get_id_from_query(parsedQuery);
update_first_write(id);
return delete_record(private_records(ccf.kv, parsedQuery.scope), id);
}
export function delete_public(request) {
const parsedQuery = parse_request_query(request);
const id = get_id_from_query(parsedQuery);
update_first_write(id, false);
return delete_record(public_records(ccf.kv, parsedQuery.scope), id);
}
export function clear_private(request) {
const parsedQuery = parse_request_query(request);
const records = private_records(ccf.kv, parsedQuery.scope);
records.forEach((_, id) => {
update_first_write(id);
});
records.clear();
return { body: true };
}
@ -339,9 +289,6 @@ export function clear_private(request) {
export function clear_public(request) {
const parsedQuery = parse_request_query(request);
const records = public_records(ccf.kv, parsedQuery.scope);
records.forEach((_, id) => {
update_first_write(id, false);
});
records.clear();
return { body: true };
}

Просмотреть файл

@ -29,12 +29,7 @@ namespace loggingapp
static constexpr auto PUBLIC_RECORDS = "public:records";
static constexpr auto PRIVATE_RECORDS = "records";
// Stores the index at which each key was first written to. Must be written by
// the _next_ write transaction to that key.
using FirstWritesMap = kv::Map<size_t, ccf::SeqNo>;
static constexpr auto PUBLIC_FIRST_WRITES = "public:first_write_version";
static constexpr auto FIRST_WRITES = "first_write_version";
// SNIPPET_START: indexing_strategy_definition
using RecordsIndexingStrategy = ccf::indexing::LazyStrategy<
ccf::indexing::strategies::SeqnosByKey_Bucketed<RecordsMap>>;
@ -139,26 +134,6 @@ namespace loggingapp
std::shared_ptr<RecordsIndexingStrategy> index_per_public_key = nullptr;
static void update_first_write(
kv::Tx& tx,
size_t id,
bool is_private = true,
const optional<std::string>& scope = std::nullopt)
{
auto first_writes =
tx.rw<FirstWritesMap>(is_private ? FIRST_WRITES : PUBLIC_FIRST_WRITES);
if (!first_writes->has(id))
{
auto records = tx.ro<RecordsMap>(
is_private ? private_records(scope) : public_records(scope));
const auto prev_version = records->get_version_of_previous_write(id);
if (prev_version.has_value())
{
first_writes->put(id, prev_version.value());
}
}
}
std::optional<ccf::TxStatus> get_tx_status(ccf::SeqNo seqno)
{
ccf::ApiResult result;
@ -267,7 +242,6 @@ namespace loggingapp
auto records_handle =
ctx.tx.template rw<RecordsMap>(private_records(ctx));
records_handle->put(in.id, in.msg);
update_first_write(ctx.tx, in.id, true, get_scope(ctx));
return ccf::make_success(true);
};
// SNIPPET_END: record
@ -340,7 +314,7 @@ namespace loggingapp
auto records_handle =
ctx.tx.template rw<RecordsMap>(private_records(ctx));
auto removed = records_handle->remove(id);
update_first_write(ctx.tx, id, true, get_scope(ctx));
records_handle->remove(id);
return ccf::make_success(LoggingRemove::Out{removed});
};
@ -353,10 +327,6 @@ namespace loggingapp
auto clear = [this](auto& ctx, nlohmann::json&&) {
auto records_handle =
ctx.tx.template rw<RecordsMap>(private_records(ctx));
records_handle->foreach([&ctx](const auto& id, const auto&) {
update_first_write(ctx.tx, id, true, get_scope(ctx));
return true;
});
records_handle->clear();
return ccf::make_success(true);
};
@ -395,7 +365,6 @@ namespace loggingapp
ctx.tx.template rw<RecordsMap>(public_records(ctx));
const auto id = params["id"].get<size_t>();
records_handle->put(id, in.msg);
update_first_write(ctx.tx, in.id, false, get_scope(ctx));
// SNIPPET_START: set_claims_digest
if (in.record_claim)
{
@ -474,7 +443,7 @@ namespace loggingapp
auto records_handle =
ctx.tx.template rw<RecordsMap>(public_records(ctx));
auto removed = records_handle->remove(id);
update_first_write(ctx.tx, id, false, get_scope(ctx));
records_handle->remove(id);
return ccf::make_success(LoggingRemove::Out{removed});
};
@ -490,10 +459,6 @@ namespace loggingapp
auto clear_public = [this](auto& ctx, nlohmann::json&&) {
auto public_records_handle =
ctx.tx.template rw<RecordsMap>(public_records(ctx));
public_records_handle->foreach([&ctx](const auto& id, const auto&) {
update_first_write(ctx.tx, id, false, get_scope(ctx));
return true;
});
public_records_handle->clear();
return ccf::make_success(true);
};
@ -554,7 +519,6 @@ namespace loggingapp
auto records_handle =
ctx.tx.template rw<RecordsMap>(private_records(ctx));
records_handle->put(in.id, log_line);
update_first_write(ctx.tx, in.id, true, get_scope(ctx));
ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK);
ctx.rpc_ctx->set_response_header(
@ -584,7 +548,6 @@ namespace loggingapp
auto records_handle =
ctx.tx.template rw<RecordsMap>(private_records(ctx));
records_handle->put(in.id, log_line);
update_first_write(ctx.tx, in.id, true, get_scope(ctx));
return ccf::make_success(true);
};
make_endpoint(
@ -810,7 +773,6 @@ namespace loggingapp
auto records_handle =
ctx.tx.template rw<RecordsMap>(private_records(ctx));
records_handle->put(id, log_line);
update_first_write(ctx.tx, id, true, get_scope(ctx));
ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK);
};
@ -1008,41 +970,9 @@ namespace loggingapp
if (!http::get_query_value(
parsed_query, "from_seqno", from_seqno, error_reason))
{
// If no start point is specified, use the first time this ID was
// written to
auto first_writes =
ctx.tx.ro<FirstWritesMap>("public:first_write_version");
const auto first_write_version = first_writes->get(id);
if (first_write_version.has_value())
{
from_seqno = first_write_version.value();
}
else
{
// It's possible there's been a single write but no subsequent
// transaction to write this to the FirstWritesMap - check version
// of previous write
auto records = ctx.tx.ro<RecordsMap>(public_records(ctx));
const auto last_written_version =
records->get_version_of_previous_write(id);
if (last_written_version.has_value())
{
from_seqno = last_written_version.value();
}
else
{
// This key has never been written to. Return the empty response
// now
LoggingGetHistoricalRange::Out response;
nlohmann::json j_response = response;
ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK);
ctx.rpc_ctx->set_response_header(
http::headers::CONTENT_TYPE,
http::headervalues::contenttype::JSON);
ctx.rpc_ctx->set_response_body(j_response.dump());
return;
}
}
// If no from_seqno is specified, defaults to very first transaction
// in ledger
from_seqno = 1;
}
size_t to_seqno;
@ -1207,7 +1137,7 @@ namespace loggingapp
return;
}
}
// else the index authoritatvely tells us there are _no_ interesting
// else the index authoritatively tells us there are _no_ interesting
// seqnos in this range, so we have no stores to process, but can return
// a complete result
@ -1506,7 +1436,6 @@ namespace loggingapp
auto view = ctx.tx.template rw<RecordsMap>(private_records(ctx));
view->put(in.id, in.msg);
update_first_write(ctx.tx, in.id, true, get_scope(ctx));
return ccf::make_success(true);
};
make_endpoint(

Просмотреть файл

@ -833,51 +833,6 @@ def test_historical_receipts_with_claims(network, args):
return network
def get_all_entries(
client, target_id, from_seqno=None, to_seqno=None, timeout=5, log_on_success=False
):
LOG.info(
f"Getting historical entries{f' from {from_seqno}' if from_seqno is not None else ''}{f' to {to_seqno}' if to_seqno is not None else ''} for id {target_id}"
)
logs = None if log_on_success else []
start_time = time.time()
end_time = start_time + timeout
entries = []
path = f"/app/log/public/historical/range?id={target_id}"
if from_seqno is not None:
path += f"&from_seqno={from_seqno}"
if to_seqno is not None:
path += f"&to_seqno={to_seqno}"
while time.time() < end_time:
r = client.get(path, log_capture=logs)
if r.status_code == http.HTTPStatus.OK:
j_body = r.body.json()
entries += j_body["entries"]
if "@nextLink" in j_body:
path = j_body["@nextLink"]
continue
else:
# No @nextLink means we've reached end of range
duration = time.time() - start_time
LOG.info(f"Done! Fetched {len(entries)} entries in {duration:0.2f}s")
return entries, duration
elif r.status_code == http.HTTPStatus.ACCEPTED:
# Ignore retry-after header, retry soon
time.sleep(0.1)
continue
else:
LOG.error("Printing historical/range logs on unexpected status")
flush_info(logs, None)
raise ValueError(
f"Unexpected status code from historical range query: {r.status_code}"
)
LOG.error("Printing historical/range logs on timeout")
flush_info(logs, None)
raise TimeoutError(f"Historical range not available after {timeout}s")
@reqs.description("Read range of historical state")
@reqs.supports_methods("/app/log/public", "/app/log/public/historical/range")
def test_historical_query_range(network, args):
@ -921,9 +876,9 @@ def test_historical_query_range(network, args):
infra.commit.wait_for_commit(c, seqno=last_seqno, view=view, timeout=3)
entries_a, _ = get_all_entries(c, id_a)
entries_b, _ = get_all_entries(c, id_b)
entries_c, _ = get_all_entries(c, id_c)
entries_a, _ = network.txs.verify_range_for_idx(id_a, node=primary)
entries_b, _ = network.txs.verify_range_for_idx(id_b, node=primary)
entries_c, _ = network.txs.verify_range_for_idx(id_c, node=primary)
# Fetching A and B should take a similar amount of time, C (which was only written to in a brief window in the history) should be much faster
# NB: With larger page size, this is not necessarily true! Small range means _all_ responses fit in a single response page
@ -931,11 +886,17 @@ def test_historical_query_range(network, args):
# assert duration_c < duration_b
# Confirm that we can retrieve these with more specific queries, and we end up with the same result
alt_a, _ = get_all_entries(c, id_a, from_seqno=first_seqno)
alt_a, _ = network.txs.verify_range_for_idx(
id_a, node=primary, from_seqno=first_seqno
)
assert alt_a == entries_a
alt_a, _ = get_all_entries(c, id_a, to_seqno=last_seqno)
alt_a, _ = network.txs.verify_range_for_idx(
id_a, node=primary, to_seqno=last_seqno
)
assert alt_a == entries_a
alt_a, _ = get_all_entries(c, id_a, from_seqno=first_seqno, to_seqno=last_seqno)
alt_a, _ = network.txs.verify_range_for_idx(
id_a, node=primary, from_seqno=first_seqno, to_seqno=last_seqno
)
assert alt_a == entries_a
actual_len = len(entries_a) + len(entries_b) + len(entries_c)
@ -1370,7 +1331,12 @@ def test_receipts(network, args):
@reqs.supports_methods("/app/receipt", "/app/log/private")
@reqs.at_least_n_nodes(2)
def test_random_receipts(
network, args, lts=True, additional_seqnos=MappingProxyType({}), node=None
network,
args,
lts=True,
additional_seqnos=MappingProxyType({}),
node=None,
log_capture=None,
):
if node is None:
node, _ = network.find_primary_and_any_backup()
@ -1409,7 +1375,9 @@ def test_random_receipts(
):
start_time = time.time()
while time.time() < (start_time + 3.0):
rc = c.get(f"/app/receipt?transaction_id={view}.{s}")
rc = c.get(
f"/app/receipt?transaction_id={view}.{s}", log_capture=log_capture
)
if rc.status_code == http.HTTPStatus.OK:
receipt = rc.body.json()
if lts and not receipt.get("cert"):
@ -1426,7 +1394,7 @@ def test_random_receipts(
assert receipt["proof"] == [], receipt
break
elif rc.status_code == http.HTTPStatus.ACCEPTED:
time.sleep(0.5)
time.sleep(0.1)
else:
view += 1
if view > max_view:

Просмотреть файл

@ -237,7 +237,8 @@ def run_file_operations(args):
test_forced_snapshot(network, args)
primary, _ = network.find_primary()
network.stop_all_nodes()
# Scoped transactions are not handled by historical range queries
network.stop_all_nodes(skip_verification=True)
test_split_ledger_on_stopped_network(primary, args)

Просмотреть файл

@ -5,12 +5,16 @@ import infra.network
import infra.proc
import infra.commit
import http
from e2e_logging import get_all_entries
import cimetrics.upload
from concurrent import futures
from infra.log_capture import flush_info
import infra.jwt_issuer
import time
from loguru import logger as LOG
DEFAULT_TIMEOUT_S = 10
def submit_range(primary, id_pattern, start, end, format_width):
LOG.info(f"Starting submission of {start:>{format_width}} to {end:>{format_width}}")
@ -49,6 +53,61 @@ def submit_range(primary, id_pattern, start, end, format_width):
return (first_seqno, view, last_seqno)
def get_all_entries(
client,
target_id,
from_seqno=None,
to_seqno=None,
timeout=DEFAULT_TIMEOUT_S,
log_on_success=False,
headers=None,
):
LOG.info(
f"Getting historical entries{f' from {from_seqno}' if from_seqno is not None else ''}{f' to {to_seqno}' if to_seqno is not None else ''} for id {target_id}"
)
logs = None if log_on_success else []
start_time = time.time()
end_time = start_time + timeout
entries = []
path = f"/app/log/public/historical/range?id={target_id}"
if from_seqno is not None:
path += f"&from_seqno={from_seqno}"
if to_seqno is not None:
path += f"&to_seqno={to_seqno}"
while time.time() < end_time:
r = client.get(path, headers=headers or {}) # , log_capture=logs)
if r.status_code == http.HTTPStatus.OK:
j_body = r.body.json()
entries += j_body["entries"]
if "@nextLink" in j_body:
path = j_body["@nextLink"]
continue
else:
# No @nextLink means we've reached end of range
duration = time.time() - start_time
LOG.info(f"Done! Fetched {len(entries)} entries in {duration:0.2f}s")
return entries, duration
elif r.status_code == http.HTTPStatus.ACCEPTED:
# Ignore retry-after header, retry soon
time.sleep(0.1)
continue
else:
LOG.error("Printing historical/range logs on unexpected status")
flush_info(logs, None)
raise ValueError(
f"""
Unexpected status code from historical range query: {r.status_code}
{r.body}
"""
)
LOG.error("Printing historical/range logs on timeout")
flush_info(logs, None)
raise TimeoutError(f"Historical range not available after {timeout}s")
def test_historical_query_range(network, args):
id_a = 2
id_b = 3

Просмотреть файл

@ -201,6 +201,98 @@ class LoggingTxs:
)
return TxID(wait_point.view, wait_point.seqno)
def verify_range_for_idx(
self,
idx,
node=None,
timeout=5,
log_capture=None,
from_seqno=None,
to_seqno=None,
):
node = node or self.network.find_primary()[0]
headers = self._get_headers_base()
start_time = time.time()
end_time = start_time + timeout
entries = []
path = f"/app/log/public/historical/range?id={idx}"
if from_seqno is not None:
path += f"&from_seqno={from_seqno}"
if to_seqno is not None:
path += f"&to_seqno={to_seqno}"
while time.time() < end_time:
with node.client(self.user) as c:
r = c.get(path, headers=headers, log_capture=log_capture)
if r.status_code == http.HTTPStatus.OK:
j_body = r.body.json()
entries += j_body["entries"]
if "@nextLink" in j_body:
path = j_body["@nextLink"]
continue
else:
# No @nextLink means we've reached end of range
duration = time.time() - start_time
# Check that all recoded entries have been returned
# Ignore scoped entries as they are recorded in a separate table
# that is not indexed
stored_entries = [
{"msg": e["msg"], "seqno": e["seqno"]}
for e in self.pub[idx]
if e["scope"] is None
]
returned_entries = [
{"msg": e["msg"], "seqno": e["seqno"]} for e in entries
]
diff = [e for e in stored_entries if e not in returned_entries]
if diff:
raise ValueError(
f"These recorded public entries were not returned by historical range endpoint for idx {idx}: {diff}"
)
return entries, duration
elif r.status_code == http.HTTPStatus.ACCEPTED:
# Ignore retry-after header, retry soon
time.sleep(0.1)
continue
else:
LOG.error("Printing historical/range logs on unexpected status")
raise ValueError(
f"""
Unexpected status code from historical range query: {r.status_code}
{r.body}
"""
)
raise TimeoutError(
f"Historical range for idx {idx} not available after {timeout}s"
)
def verify_range(
self,
node=None,
timeout=5,
log_capture=None,
from_seqno=None,
to_seqno=None,
):
LOG.info(
f"Verifying historical range for all entries (from: {from_seqno}, to: {to_seqno})"
)
entries_count = 0
for idx in self.pub.keys():
entries, _ = self.verify_range_for_idx(
idx, node, timeout, log_capture, from_seqno, to_seqno
)
entries_count += len(entries)
LOG.info(
f"Successfully verified {entries_count} recorded public entries with historical range endpoint for {len(self.pub)} indices"
)
def verify(
self,
network=None,

Просмотреть файл

@ -677,13 +677,13 @@ class Network:
accept_ledger_diff=False,
**kwargs,
):
if not skip_verification:
if self.txs is not None:
LOG.info("Verifying that all committed txs can be read before shutdown")
log_capture = []
self.txs.verify(self, log_capture=log_capture)
if verbose_verification:
flush_info(log_capture, None)
if not skip_verification and self.txs is not None:
LOG.info("Verifying that all committed txs can be read before shutdown")
log_capture = []
self.txs.verify(log_capture=log_capture)
self.txs.verify_range(log_capture=log_capture)
if verbose_verification:
flush_info(log_capture, None)
fatal_error_found = False

Просмотреть файл

@ -95,6 +95,14 @@ def test_new_service(
# Note: Changes to constitution between versions should be tested here
LOG.info("Update JS app")
js_app_directory = (
"../samples/apps/logging/js"
if install_path == LOCAL_CHECKOUT_DIRECTORY
else os.path.join(install_path, "samples/logging/js")
)
network.consortium.set_js_app_from_dir(primary, js_app_directory)
LOG.info(f"Add node to new service [cycle nodes: {cycle_existing_nodes}]")
nodes_to_cycle = network.get_joined_nodes() if cycle_existing_nodes else []
nodes_to_add_count = len(nodes_to_cycle) if cycle_existing_nodes else 1
@ -151,7 +159,10 @@ def test_new_service(
LOG.info("Apply transactions to new nodes only")
issue_activity_on_live_service(network, args)
test_random_receipts(network, args, lts=True)
test_random_receipts(network, args, lts=True, log_capture=[])
# Setting from_seqno=1 as open ranges do not work with older ledgers
# that did not record the now-deprecated "public:first_write_version" table
network.txs.verify_range(log_capture=[], from_seqno=1)
# Local build and install bin/ and lib/ directories differ
@ -165,7 +176,7 @@ def get_bin_and_lib_dirs_for_install_path(install_path):
def set_js_args(args, from_install_path, to_install_path=None):
# Use from_version's app and constitution as new JS features may not be available
# on older versions, but upgrade to the new constitution once the new network is ready
# on older versions, but upgrade to the new constitution and JS app once the new network is ready
js_app_directory = (
"../samples/apps/logging/js"
if from_install_path == LOCAL_CHECKOUT_DIRECTORY
@ -319,6 +330,7 @@ def run_code_upgrade_from(
args,
lts=True,
additional_seqnos={txid.seqno: claims.encode()},
log_capture=[],
)
# Also check receipts on an old node
if index + 1 < len(old_nodes):
@ -329,6 +341,7 @@ def run_code_upgrade_from(
lts=True,
additional_seqnos={txid.seqno: None},
node=next_node,
log_capture=[],
)
node.stop()
@ -520,6 +533,8 @@ def run_ledger_compatibility_since_first(args, local_branch, use_snapshot):
else:
time.sleep(3)
issue_activity_on_live_service(network, args)
if idx > 0:
test_new_service(
network,