From 02ac1baeddb4ce469163248720d46e0df3e508b5 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 27 Jun 2023 18:11:43 +0100 Subject: [PATCH] Improvements to basic perf (#5394) --- CMakeLists.txt | 2 +- tests/infra/basicperf.py | 232 ++++++++++++++++++++++++++++----------- 2 files changed, 169 insertions(+), 65 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 15e096755..cd9250b1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1443,7 +1443,7 @@ if(BUILD_TESTS) PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py CLIENT_BIN ./submit ADDITIONAL_ARGS --package "samples/apps/basic/libbasic" --repetitions - 10000 + 100000 ) if(WORKER_THREADS) diff --git a/tests/infra/basicperf.py b/tests/infra/basicperf.py index 7473b7089..57c7552c5 100644 --- a/tests/infra/basicperf.py +++ b/tests/infra/basicperf.py @@ -12,6 +12,9 @@ import http import hashlib from piccolo import generator import polars as pl +from typing import Dict +import random +import string def minimum_number_of_local_nodes(args): @@ -58,17 +61,111 @@ def configure_remote_client(args, client_id, client_host, common_dir): raise -def run(args): +def write_to_random_keys( + repetitions: int, msgs: generator.Messages, additional_headers: Dict[str, str] +): + """ + Write fixed-size messages to a range of keys, this is the usual logging workload + CCF has been running in various forms since early on. Each transaction produces a + ledger entry and causes replication to backups. + """ + batch_size = 100 + LOG.info(f"Workload: {repetitions} writes to a range of {batch_size} keys") + for i in range(repetitions): + key = f"{i % batch_size}" + msgs.append( + f"/records/{key}", + "PUT", + additional_headers=additional_headers, + body=f"{hashlib.md5(str(i).encode()).hexdigest()}", + content_type="text/plain", + ) + + +class RWMix: + """ + Similar to write_to_random_keys, but with the additions of reads back from the keys. + Reads do not produce ledger entries, but because they are interleaved with writes on + the same session, they are not offloaded to backups. + + The first pass always writes to all the keys, to make sure they are initialised. + """ + + def __init__(self, batch_size: int, write_fraction: float, msg_len=20): + self.batch_size = batch_size + assert write_fraction >= 0 and write_fraction <= 1 + self.write_fraction = write_fraction + self.msg_len = msg_len + + def __call__( + self, + repetitions: int, + msgs: generator.Messages, + additional_headers: Dict[str, str], + ): + assert repetitions % self.batch_size == 0 + LOG.info( + f"Workload: {repetitions} operations to a range of {self.batch_size} keys, with a write fraction of {self.write_fraction}" + ) + for batch in range(repetitions // self.batch_size): + # Randomly select a subset of the batch to be writes + writes = set( + random.sample( + range(self.batch_size), int(self.batch_size * self.write_fraction) + ) + ) + # Randomly shuffle the keys to be written/read + keys = list(range(self.batch_size)) + random.shuffle(keys) + for key in keys: + # The first batch always writes to all keys, to make sure they are initialised + if (batch == 0) or (key in writes): + msgs.append( + f"/records/{key}", + "PUT", + additional_headers=additional_headers, + body="".join( + random.choices(string.ascii_letters, k=self.msg_len) + ), + content_type="text/plain", + ) + else: + msgs.append( + f"/records/{key}", + "GET", + additional_headers=additional_headers, + ) + + +def configure_client_hosts(args, backups): + client_hosts = [] + if args.one_client_per_backup: + assert backups, "--one-client-per-backup was set but no backup was found" + client_hosts = ["localhost"] * len(backups) + else: + if args.client_nodes: + client_hosts.extend(args.client_nodes) + + if args.num_localhost_clients: + client_hosts.extend(["localhost"] * int(args.num_localhost_clients)) + + if not client_hosts: + client_hosts = ["localhost"] + return client_hosts + + +def run(args, append_messages): hosts = args.nodes if not hosts: hosts = ["local://localhost"] * minimum_number_of_local_nodes(args) args.initial_user_count = 3 + # Cap the signature emission at 100 ms, to bound commit latency. args.sig_ms_interval = 100 + args.sig_tx_interval = 10000 args.ledger_chunk_bytes = "5MB" # Set to cchost default value LOG.info("Starting nodes on {}".format(hosts)) - with infra.network.network( hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb ) as network: @@ -82,41 +179,23 @@ def run(args): jwt = jwt_issuer.issue_jwt() additional_headers["Authorization"] = f"Bearer {jwt}" - LOG.info(f"Generating {args.repetitions} parquet requests") - msgs = generator.Messages() - for i in range(args.repetitions): - key = f"{i % 100}" - msgs.append( - f"/records/{key}", - "PUT", - additional_headers=additional_headers, - body=f"{hashlib.md5(str(i).encode()).hexdigest()}", - content_type="text/plain", + client_hosts = configure_client_hosts(args, backups) + requests_file_paths = [] + + for client_idx in range(len(client_hosts)): + LOG.info(f"Generating {args.repetitions} requests for client_{client_idx}") + msgs = generator.Messages() + append_messages(args.repetitions, msgs, additional_headers) + + path_to_requests_file = os.path.join( + network.common_dir, f"pi_client{client_idx}_requests.parquet" ) + LOG.info(f"Writing generated requests to {path_to_requests_file}") + msgs.to_parquet_file(path_to_requests_file) + requests_file_paths.append(path_to_requests_file) - filename_prefix = "piccolo_driver" - path_to_requests_file = os.path.join( - network.common_dir, f"{filename_prefix}_requests.parquet" - ) - LOG.info(f"Writing generated requests to {path_to_requests_file}") - msgs.to_parquet_file(path_to_requests_file) - - nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to) clients = [] - client_hosts = [] - if args.one_client_per_backup: - assert backups, "--one-client-per-backup was set but no backup was found" - client_hosts = ["localhost"] * len(backups) - else: - if args.client_nodes: - client_hosts.extend(args.client_nodes) - - if args.num_localhost_clients: - client_hosts.extend(["localhost"] * int(args.num_localhost_clients)) - - if not client_hosts: - client_hosts = ["localhost"] - + nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to) for client_id, client_host in enumerate(client_hosts): node = nodes_to_send_to[client_id % len(nodes_to_send_to)] @@ -127,24 +206,20 @@ def run(args): [ args.client, "--cert", - os.path.join(remote_client.remote.root, "user1_cert.pem"), + "user1_cert.pem", "--key", - os.path.join(remote_client.remote.root, "user1_privk.pem"), + "user1_privk.pem", "--cacert", - network.cert_path, + os.path.basename(network.cert_path), f"--server-address={node.get_public_rpc_host()}:{node.get_public_rpc_port()}", "--max-writes-ahead", "1000", "--send-filepath", - os.path.join( - remote_client.remote.root, "piccolo_driver_requests.parquet" - ), + "pi_requests.parquet", "--response-filepath", - os.path.join( - remote_client.remote.root, "piccolo_driver_response.parquet" - ), + "pi_response.parquet", "--generator-filepath", - path_to_requests_file, + os.path.abspath(requests_file_paths[client_id]), "--pid-file-path", "cmd.pid", ] @@ -160,8 +235,7 @@ def run(args): for remote_client in clients: remote_client.start() - hard_stop_timeout = 90 - format_width = len(str(hard_stop_timeout)) + 3 + format_width = len(str(args.client_timeout_s)) + 3 try: with cimetrics.upload.metrics(complete=False) as metrics: @@ -172,55 +246,73 @@ def run(args): done = remote_client.check_done() # all the clients need to be done LOG.info( - f"Client {i} has {'completed' if done else 'not completed'} running ({time.time() - start_time:>{format_width}.2f}s / {hard_stop_timeout}s)" + f"Client {i} has {'completed' if done else 'not completed'} running ({time.time() - start_time:>{format_width}.2f}s / {args.client_timeout_s}s)" ) stop_waiting = stop_waiting and done if stop_waiting: break - if time.time() > start_time + hard_stop_timeout: + if time.time() > start_time + args.client_timeout_s: raise TimeoutError( - f"Client still running after {hard_stop_timeout}s" + f"Client still running after {args.client_timeout_s}s" ) time.sleep(5) agg = [] - for remote_client in clients: - # TODO: get from the remote properly + for client_id, remote_client in enumerate(clients): + # Note: this assumes client are run locally, but saves a copy send_file = os.path.join( - remote_client.remote.root, "piccolo_driver_requests.parquet" + remote_client.remote.root, "pi_requests.parquet" ) response_file = os.path.join( - remote_client.remote.root, "piccolo_driver_response.parquet" + remote_client.remote.root, "pi_response.parquet" ) LOG.info( f"Analyzing results from {send_file} and {response_file}" ) def table(): + payloads = pl.read_parquet(requests_file_paths[client_id]) sent = pl.read_parquet(send_file) rcvd = pl.read_parquet(response_file) - all = rcvd.join(sent, on="messageID") - all = all.with_columns( - pl.lit(remote_client.name).alias("client") + overall = payloads.join(sent, on="messageID") + overall = rcvd.join(overall, on="messageID") + overall = overall.with_columns( + client=pl.lit(remote_client.name), + requestSize=pl.col("request").apply(len), + responseSize=pl.col("rawResponse").apply(len), ) print( - all.with_columns( + overall.with_columns( pl.col("receiveTime").alias("latency") - pl.col("sendTime") ).sort("latency") ) - agg.append(all) + agg.append(overall) table() agg = pl.concat(agg, rechunk=True) print(agg) + agg_path = os.path.join( + network.common_dir, "aggregated_basicperf_output.parquet" + ) + with open(agg_path, "wb") as f: + agg.write_parquet(f) + print(f"Aggregated results written to {agg_path}") start_send = agg["sendTime"].sort()[0] end_recv = agg["receiveTime"].sort()[-1] throughput = len(agg) / (end_recv - start_send) - print(f"Average throughput: {throughput} tx/s") + print(f"Average throughput: {throughput:.2f} tx/s") + byte_input = ( + agg["requestSize"].sum() / (end_recv - start_send) + ) / (1024 * 1024) + print(f"Average request input: {byte_input:.2f} Mbytes/s") + byte_output = ( + agg["responseSize"].sum() / (end_recv - start_send) + ) / (1024 * 1024) + print(f"Average request output: {byte_output:.2f} Mbytes/s") sent = agg["sendTime"].sort() sent_per_sec = ( @@ -250,10 +342,8 @@ def run(args): ) print(per_sec) per_sec = per_sec.with_columns( - pl.col("sent").alias("sent_rate") / per_sec["sent"].max() - ) - per_sec = per_sec.with_columns( - pl.col("rcvd").alias("rcvd_rate") / per_sec["rcvd"].max() + sent_rate=pl.col("sent") / per_sec["sent"].max(), + rcvd_rate=pl.col("rcvd") / per_sec["rcvd"].max(), ) for row in per_sec.iter_rows(named=True): s = "S" * int(row["sent_rate"] * 20) @@ -341,10 +431,24 @@ def cli_args(): help="Unused, swallowed for compatibility with old args", action="store_true", ) + parser.add_argument( + "--client-timeout-s", + help="Number of seconds after which unresponsive clients are shut down", + default=90, + type=float, + ) + parser.add_argument( + "--rw-mix", + help="Run a batched, fractional read/write mix instead of pure writes", + type=float, + ) return infra.e2e_args.cli_args(parser=parser, accept_unknown=False) if __name__ == "__main__": args = cli_args() - run(args) + if args.rw_mix: + run(args, RWMix(1000, 0.5)) + else: + run(args, write_to_random_keys)