Add perf test of historical query fetching (#3032)

This commit is contained in:
Eddy Ashton 2021-09-30 13:30:26 +01:00 коммит произвёл GitHub
Родитель bf6da80fe9
Коммит 74970ad420
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 214 добавлений и 85 удалений

Просмотреть файл

@ -819,6 +819,13 @@ if(BUILD_TESTS)
--msg-ser-fmt
text
)
add_e2e_test(
NAME historical_query_perf_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/historical_query_perf.py
CONSENSUS cft
LABEL perf
)
endif()
# Generate and install CMake export file for consumers using CMake

Просмотреть файл

@ -394,6 +394,7 @@ class RequestClient:
ca: str,
session_auth: Optional[Identity] = None,
signing_auth: Optional[Identity] = None,
common_headers: Optional[dict] = None,
**kwargs,
):
self.host = host
@ -401,6 +402,7 @@ class RequestClient:
self.ca = ca
self.session_auth = session_auth
self.signing_auth = signing_auth
self.common_headers = common_headers
self.key_id = None
cert = None
if self.session_auth:
@ -422,6 +424,9 @@ class RequestClient:
timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC,
):
extra_headers = {}
if self.common_headers is not None:
extra_headers.update(self.common_headers)
extra_headers.update(request.headers)
auth = None
@ -503,6 +508,7 @@ class CCFClient:
:param Identity signing_auth: Path to private key and certificate to be used to sign requests for the session (optional).
:param int connection_timeout: Maximum time to wait for successful connection establishment before giving up.
:param str description: Message to print on each request emitted with this client.
:param dict common_headers: Headers which should be added to every request.
:param dict kwargs: Keyword args to be forwarded to the client implementation.
A :py:exc:`CCFConnectionException` exception is raised if the connection is not established successfully within ``connection_timeout`` seconds.
@ -520,6 +526,7 @@ class CCFClient:
connection_timeout: int = DEFAULT_CONNECTION_TIMEOUT_SEC,
description: Optional[str] = None,
curl: bool = False,
common_headers: Optional[dict] = None,
**kwargs,
):
self.connection_timeout = connection_timeout
@ -533,7 +540,7 @@ class CCFClient:
self.client_impl = CurlClient(host, port, ca, session_auth, signing_auth)
else:
self.client_impl = RequestClient(
host, port, ca, session_auth, signing_auth, **kwargs
host, port, ca, session_auth, signing_auth, common_headers, **kwargs
)
def _response(self, response: Response) -> Response:

Просмотреть файл

@ -953,7 +953,7 @@ namespace loggingapp
}
// Set a maximum range, paginate larger requests
static constexpr size_t max_seqno_per_page = 20;
static constexpr size_t max_seqno_per_page = 2000;
const auto range_begin = from_seqno;
const auto range_end =
std::min(to_seqno, range_begin + max_seqno_per_page);

Просмотреть файл

@ -15,8 +15,6 @@ import socket
import os
from collections import defaultdict
import time
import tempfile
import base64
import json
import hashlib
import ccf.clients
@ -432,29 +430,23 @@ def test_multi_auth(network, args):
require_new_response(r)
LOG.info("Authenticate via JWT token")
jwt_key_priv_pem, _ = infra.crypto.generate_rsa_keypair(2048)
jwt_cert_pem = infra.crypto.generate_cert(jwt_key_priv_pem)
jwt_kid = "my_key_id"
jwt_issuer = "https://example.issuer"
# Add JWT issuer
with tempfile.NamedTemporaryFile(prefix="ccf", mode="w+") as metadata_fp:
jwt_cert_der = infra.crypto.cert_pem_to_der(jwt_cert_pem)
der_b64 = base64.b64encode(jwt_cert_der).decode("ascii")
data = {
"issuer": jwt_issuer,
"jwks": {
"keys": [{"kty": "RSA", "kid": jwt_kid, "x5c": [der_b64]}]
},
}
json.dump(data, metadata_fp)
metadata_fp.flush()
network.consortium.set_jwt_issuer(primary, metadata_fp.name)
jwt_issuer = infra.jwt_issuer.JwtIssuer()
jwt_issuer.register(network)
jwt = jwt_issuer.issue_jwt(claims={"user": "Alice"})
with primary.client() as c:
jwt = infra.crypto.create_jwt({}, jwt_key_priv_pem, jwt_kid)
r = c.get("/app/multi_auth", headers={"authorization": "Bearer " + jwt})
require_new_response(r)
LOG.info("Authenticate via second JWT token")
jwt2 = jwt_issuer.issue_jwt(claims={"user": "Bob"})
with primary.client(
common_headers={"authorization": "Bearer " + jwt2}
) as c:
r = c.get("/app/multi_auth")
require_new_response(r)
else:
LOG.warning(
f"Skipping {inspect.currentframe().f_code.co_name} as application does not implement '/multi_auth'"
@ -674,6 +666,49 @@ def test_historical_receipts(network, args):
return network
def get_all_entries(client, target_id, from_seqno=None, to_seqno=None, timeout=5):
LOG.info(
f"Getting historical entries{f' from {from_seqno}' if from_seqno is not None else ''}{f' to {to_seqno}' if to_seqno is not None else ''} for id {target_id}"
)
logs = []
start_time = time.time()
end_time = start_time + timeout
entries = []
path = f"/app/log/private/historical/range?id={target_id}"
if from_seqno is not None:
path += f"&from_seqno={from_seqno}"
if to_seqno is not None:
path += f"&to_seqno={to_seqno}"
while time.time() < end_time:
r = client.get(path, log_capture=logs)
if r.status_code == http.HTTPStatus.OK:
j_body = r.body.json()
entries += j_body["entries"]
if "@nextLink" in j_body:
path = j_body["@nextLink"]
continue
else:
# No @nextLink means we've reached end of range
duration = time.time() - start_time
LOG.info(f"Done! Fetched {len(entries)} entries in {duration:0.2f}s")
return entries, duration
elif r.status_code == http.HTTPStatus.ACCEPTED:
# Ignore retry-after header, retry soon
time.sleep(0.1)
continue
else:
LOG.error("Printing historical/range logs on unexpected status")
flush_info(logs, None)
raise ValueError(
f"Unexpected status code from historical range query: {r.status_code}"
)
LOG.error("Printing historical/range logs on timeout")
flush_info(logs, None)
raise TimeoutError(f"Historical range not available after {timeout}s")
@reqs.description("Read range of historical state")
@reqs.supports_methods("log/private", "log/private/historical/range")
def test_historical_query_range(network, args):
@ -683,8 +718,6 @@ def test_historical_query_range(network, args):
)
return network
primary, _ = network.find_primary()
id_a = 142
id_b = 143
id_c = 144
@ -692,56 +725,12 @@ def test_historical_query_range(network, args):
first_seqno = None
last_seqno = None
def get_all_entries(target_id, from_seqno=None, to_seqno=None):
LOG.info(
f"Getting historical entries{f' from {from_seqno}' if from_seqno is not None else ''}{f' to {last_seqno}' if to_seqno is not None else ''} for id {target_id}"
)
logs = []
with primary.client("user0") as c:
timeout = 5
start_time = time.time()
end_time = start_time + timeout
entries = []
path = f"/app/log/private/historical/range?id={target_id}"
if from_seqno is not None:
path += f"&from_seqno={first_seqno}"
if to_seqno is not None:
path += f"&to_seqno={to_seqno}"
while time.time() < end_time:
r = c.get(path, log_capture=logs)
if r.status_code == http.HTTPStatus.OK:
j_body = r.body.json()
entries += j_body["entries"]
if "@nextLink" in j_body:
path = j_body["@nextLink"]
continue
else:
# No @nextLink means we've reached end of range
duration = time.time() - start_time
LOG.info(
f"Done! Fetched {len(entries)} entries in {duration:0.2f}s"
)
return entries, duration
elif r.status_code == http.HTTPStatus.ACCEPTED:
# Ignore retry-after header, retry soon
time.sleep(0.1)
continue
else:
LOG.error("Printing historical/range logs on unexpected status")
flush_info(logs, None)
raise ValueError(
f"Unexpected status code from historical range query: {r.status_code}"
)
LOG.error("Printing historical/range logs on timeout")
flush_info(logs, None)
raise TimeoutError(f"Historical range not available after {timeout}s")
primary, _ = network.find_primary()
with primary.client("user0") as c:
# Submit many transactions, overwriting the same IDs
# Need to submit through network.txs so these can be verified at shutdown, but also need to submit one at a
# time to retrieve the submitted transactions
msgs = dict()
msgs = {}
n_entries = 100
def id_for(i):
@ -769,20 +758,21 @@ def test_historical_query_range(network, args):
ccf.commit.wait_for_commit(c, seqno=last_seqno, view=view, timeout=3)
entries_a, duration_a = get_all_entries(id_a)
entries_b, duration_b = get_all_entries(id_b)
entries_c, duration_c = get_all_entries(id_c)
entries_a, _ = get_all_entries(c, id_a)
entries_b, _ = get_all_entries(c, id_b)
entries_c, _ = get_all_entries(c, id_c)
# Fetching A and B should take a similar amount of time, C (which was only written to in a brief window in the history) should be much faster
assert duration_c < duration_a
assert duration_c < duration_b
# NB: With larger page size, this is not necessarily true! Small range means _all_ responses fit in a single response page
# assert duration_c < duration_a
# assert duration_c < duration_b
# Confirm that we can retrieve these with more specific queries, and we end up with the same result
alt_a, _ = get_all_entries(id_a, from_seqno=first_seqno)
alt_a, _ = get_all_entries(c, id_a, from_seqno=first_seqno)
assert alt_a == entries_a
alt_a, _ = get_all_entries(id_a, to_seqno=last_seqno)
alt_a, _ = get_all_entries(c, id_a, to_seqno=last_seqno)
assert alt_a == entries_a
alt_a, _ = get_all_entries(id_a, from_seqno=first_seqno, to_seqno=last_seqno)
alt_a, _ = get_all_entries(c, id_a, from_seqno=first_seqno, to_seqno=last_seqno)
assert alt_a == entries_a
actual_len = len(entries_a) + len(entries_b) + len(entries_c)

Просмотреть файл

@ -0,0 +1,123 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.e2e_args
import infra.network
import infra.proc
import ccf.commit
import http
from e2e_logging import get_all_entries
import cimetrics.upload
from loguru import logger as LOG
def test_historical_query_range(network, args):
first_seqno = None
last_seqno = None
id_single = 1
id_a = 2
id_b = 3
id_c = 4
id_pattern = [id_a, id_a, id_a, id_b, id_b, id_c]
n_entries = 3001
jwt_issuer = infra.jwt_issuer.JwtIssuer()
jwt_issuer.register(network)
jwt = jwt_issuer.issue_jwt()
primary, _ = network.find_primary()
with primary.client("user0") as c:
# Submit many transactions, overwriting the same IDs
msgs = {}
def id_for(i):
# id_single is used for a single entry, in the middle of the range
if i == n_entries // 2:
return id_single
else:
return id_pattern[i % len(id_pattern)]
LOG.info(f"Submitting {n_entries} entries")
for i in range(n_entries):
idx = id_for(i)
msg = f"Unique message {i}"
r = c.post(
"/app/log/private",
{
"id": idx,
"msg": msg,
},
# Print logs for every 1000th submission, to show progress
log_capture=None if i % 1000 == 0 else [],
)
assert r.status_code == http.HTTPStatus.OK
seqno = r.seqno
view = r.view
msgs[seqno] = msg
if first_seqno is None:
first_seqno = seqno
last_seqno = seqno
ccf.commit.wait_for_commit(c, seqno=last_seqno, view=view, timeout=3)
LOG.info(
f"Total ledger contains {last_seqno} entries, of which we expect our transactions to be spread over a range of ~{last_seqno - first_seqno} transactions"
)
# Total fetch time depends on number of entries. We expect to be much faster than this, but
# to set a safe timeout allow for a rate as low as 100 fetches per second
timeout = n_entries / 100
# Ensure all nodes have reached committed state before querying a backup for historical state
network.wait_for_all_nodes_to_commit(primary=primary)
entries = {}
node = network.find_node_by_role(role=infra.network.NodeRole.BACKUP, log_capture=[])
with node.client(common_headers={"authorization": f"Bearer {jwt}"}) as c:
entries[id_a], duration_a = get_all_entries(c, id_a, timeout=timeout)
entries[id_b], duration_b = get_all_entries(c, id_b, timeout=timeout)
entries[id_c], duration_c = get_all_entries(c, id_c, timeout=timeout)
id_a_fetch_rate = len(entries[id_a]) / duration_a
id_b_fetch_rate = len(entries[id_b]) / duration_b
id_c_fetch_rate = len(entries[id_c]) / duration_c
average_fetch_rate = (id_a_fetch_rate + id_b_fetch_rate + id_c_fetch_rate) / 3
with cimetrics.upload.metrics(complete=False) as metrics:
upload_name = "Historical query (/s)"
LOG.debug(f"Uploading metric: {upload_name} = {average_fetch_rate}")
metrics.put(upload_name, average_fetch_rate)
# NB: The similar test in e2e_logging checks correctness, so we make no duplicate
# assertions here
return network
def run(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_join(args)
network = test_historical_query_range(network, args)
if __name__ == "__main__":
def add(parser):
pass
args = infra.e2e_args.cli_args(add=add)
args.package = "samples/apps/logging/liblogging"
args.nodes = infra.e2e_args.max_nodes(args, f=0)
args.initial_member_count = 1
run(args)

Просмотреть файл

@ -753,22 +753,24 @@ class Network:
return (self._get_node_by_service_id(primary_id), view)
def find_backups(self, primary=None, timeout=3):
def find_backups(self, primary=None, timeout=3, log_capture=None):
if primary is None:
primary, _ = self.find_primary(timeout=timeout)
primary, _ = self.find_primary(timeout=timeout, log_capture=log_capture)
return [n for n in self.get_joined_nodes() if n != primary]
def find_any_backup(self, primary=None, timeout=3):
return random.choice(self.find_backups(primary=primary, timeout=timeout))
def find_any_backup(self, primary=None, timeout=3, log_capture=None):
return random.choice(
self.find_backups(primary=primary, timeout=timeout, log_capture=log_capture)
)
def find_node_by_role(self, role=NodeRole.ANY):
def find_node_by_role(self, role=NodeRole.ANY, log_capture=None):
role_ = (
random.choice([NodeRole.PRIMARY, NodeRole.BACKUP]) if NodeRole.ANY else role
)
if role_ == NodeRole.PRIMARY:
return self.find_primary()[0]
return self.find_primary(log_capture=log_capture)[0]
else:
return self.find_any_backup()
return self.find_any_backup(log_capture=log_capture)
def find_random_node(self):
return random.choice(self.get_joined_nodes())