This commit is contained in:
Amaury Chamayou 2023-06-27 18:11:43 +01:00 коммит произвёл GitHub
Родитель f59535a302
Коммит 02ac1baedd
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 169 добавлений и 65 удалений

Просмотреть файл

@ -1443,7 +1443,7 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py
CLIENT_BIN ./submit
ADDITIONAL_ARGS --package "samples/apps/basic/libbasic" --repetitions
10000
100000
)
if(WORKER_THREADS)

Просмотреть файл

@ -12,6 +12,9 @@ import http
import hashlib
from piccolo import generator
import polars as pl
from typing import Dict
import random
import string
def minimum_number_of_local_nodes(args):
@ -58,17 +61,111 @@ def configure_remote_client(args, client_id, client_host, common_dir):
raise
def run(args):
def write_to_random_keys(
repetitions: int, msgs: generator.Messages, additional_headers: Dict[str, str]
):
"""
Write fixed-size messages to a range of keys, this is the usual logging workload
CCF has been running in various forms since early on. Each transaction produces a
ledger entry and causes replication to backups.
"""
batch_size = 100
LOG.info(f"Workload: {repetitions} writes to a range of {batch_size} keys")
for i in range(repetitions):
key = f"{i % batch_size}"
msgs.append(
f"/records/{key}",
"PUT",
additional_headers=additional_headers,
body=f"{hashlib.md5(str(i).encode()).hexdigest()}",
content_type="text/plain",
)
class RWMix:
"""
Similar to write_to_random_keys, but with the additions of reads back from the keys.
Reads do not produce ledger entries, but because they are interleaved with writes on
the same session, they are not offloaded to backups.
The first pass always writes to all the keys, to make sure they are initialised.
"""
def __init__(self, batch_size: int, write_fraction: float, msg_len=20):
self.batch_size = batch_size
assert write_fraction >= 0 and write_fraction <= 1
self.write_fraction = write_fraction
self.msg_len = msg_len
def __call__(
self,
repetitions: int,
msgs: generator.Messages,
additional_headers: Dict[str, str],
):
assert repetitions % self.batch_size == 0
LOG.info(
f"Workload: {repetitions} operations to a range of {self.batch_size} keys, with a write fraction of {self.write_fraction}"
)
for batch in range(repetitions // self.batch_size):
# Randomly select a subset of the batch to be writes
writes = set(
random.sample(
range(self.batch_size), int(self.batch_size * self.write_fraction)
)
)
# Randomly shuffle the keys to be written/read
keys = list(range(self.batch_size))
random.shuffle(keys)
for key in keys:
# The first batch always writes to all keys, to make sure they are initialised
if (batch == 0) or (key in writes):
msgs.append(
f"/records/{key}",
"PUT",
additional_headers=additional_headers,
body="".join(
random.choices(string.ascii_letters, k=self.msg_len)
),
content_type="text/plain",
)
else:
msgs.append(
f"/records/{key}",
"GET",
additional_headers=additional_headers,
)
def configure_client_hosts(args, backups):
client_hosts = []
if args.one_client_per_backup:
assert backups, "--one-client-per-backup was set but no backup was found"
client_hosts = ["localhost"] * len(backups)
else:
if args.client_nodes:
client_hosts.extend(args.client_nodes)
if args.num_localhost_clients:
client_hosts.extend(["localhost"] * int(args.num_localhost_clients))
if not client_hosts:
client_hosts = ["localhost"]
return client_hosts
def run(args, append_messages):
hosts = args.nodes
if not hosts:
hosts = ["local://localhost"] * minimum_number_of_local_nodes(args)
args.initial_user_count = 3
# Cap the signature emission at 100 ms, to bound commit latency.
args.sig_ms_interval = 100
args.sig_tx_interval = 10000
args.ledger_chunk_bytes = "5MB" # Set to cchost default value
LOG.info("Starting nodes on {}".format(hosts))
with infra.network.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
@ -82,41 +179,23 @@ def run(args):
jwt = jwt_issuer.issue_jwt()
additional_headers["Authorization"] = f"Bearer {jwt}"
LOG.info(f"Generating {args.repetitions} parquet requests")
msgs = generator.Messages()
for i in range(args.repetitions):
key = f"{i % 100}"
msgs.append(
f"/records/{key}",
"PUT",
additional_headers=additional_headers,
body=f"{hashlib.md5(str(i).encode()).hexdigest()}",
content_type="text/plain",
client_hosts = configure_client_hosts(args, backups)
requests_file_paths = []
for client_idx in range(len(client_hosts)):
LOG.info(f"Generating {args.repetitions} requests for client_{client_idx}")
msgs = generator.Messages()
append_messages(args.repetitions, msgs, additional_headers)
path_to_requests_file = os.path.join(
network.common_dir, f"pi_client{client_idx}_requests.parquet"
)
LOG.info(f"Writing generated requests to {path_to_requests_file}")
msgs.to_parquet_file(path_to_requests_file)
requests_file_paths.append(path_to_requests_file)
filename_prefix = "piccolo_driver"
path_to_requests_file = os.path.join(
network.common_dir, f"{filename_prefix}_requests.parquet"
)
LOG.info(f"Writing generated requests to {path_to_requests_file}")
msgs.to_parquet_file(path_to_requests_file)
nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to)
clients = []
client_hosts = []
if args.one_client_per_backup:
assert backups, "--one-client-per-backup was set but no backup was found"
client_hosts = ["localhost"] * len(backups)
else:
if args.client_nodes:
client_hosts.extend(args.client_nodes)
if args.num_localhost_clients:
client_hosts.extend(["localhost"] * int(args.num_localhost_clients))
if not client_hosts:
client_hosts = ["localhost"]
nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to)
for client_id, client_host in enumerate(client_hosts):
node = nodes_to_send_to[client_id % len(nodes_to_send_to)]
@ -127,24 +206,20 @@ def run(args):
[
args.client,
"--cert",
os.path.join(remote_client.remote.root, "user1_cert.pem"),
"user1_cert.pem",
"--key",
os.path.join(remote_client.remote.root, "user1_privk.pem"),
"user1_privk.pem",
"--cacert",
network.cert_path,
os.path.basename(network.cert_path),
f"--server-address={node.get_public_rpc_host()}:{node.get_public_rpc_port()}",
"--max-writes-ahead",
"1000",
"--send-filepath",
os.path.join(
remote_client.remote.root, "piccolo_driver_requests.parquet"
),
"pi_requests.parquet",
"--response-filepath",
os.path.join(
remote_client.remote.root, "piccolo_driver_response.parquet"
),
"pi_response.parquet",
"--generator-filepath",
path_to_requests_file,
os.path.abspath(requests_file_paths[client_id]),
"--pid-file-path",
"cmd.pid",
]
@ -160,8 +235,7 @@ def run(args):
for remote_client in clients:
remote_client.start()
hard_stop_timeout = 90
format_width = len(str(hard_stop_timeout)) + 3
format_width = len(str(args.client_timeout_s)) + 3
try:
with cimetrics.upload.metrics(complete=False) as metrics:
@ -172,55 +246,73 @@ def run(args):
done = remote_client.check_done()
# all the clients need to be done
LOG.info(
f"Client {i} has {'completed' if done else 'not completed'} running ({time.time() - start_time:>{format_width}.2f}s / {hard_stop_timeout}s)"
f"Client {i} has {'completed' if done else 'not completed'} running ({time.time() - start_time:>{format_width}.2f}s / {args.client_timeout_s}s)"
)
stop_waiting = stop_waiting and done
if stop_waiting:
break
if time.time() > start_time + hard_stop_timeout:
if time.time() > start_time + args.client_timeout_s:
raise TimeoutError(
f"Client still running after {hard_stop_timeout}s"
f"Client still running after {args.client_timeout_s}s"
)
time.sleep(5)
agg = []
for remote_client in clients:
# TODO: get from the remote properly
for client_id, remote_client in enumerate(clients):
# Note: this assumes client are run locally, but saves a copy
send_file = os.path.join(
remote_client.remote.root, "piccolo_driver_requests.parquet"
remote_client.remote.root, "pi_requests.parquet"
)
response_file = os.path.join(
remote_client.remote.root, "piccolo_driver_response.parquet"
remote_client.remote.root, "pi_response.parquet"
)
LOG.info(
f"Analyzing results from {send_file} and {response_file}"
)
def table():
payloads = pl.read_parquet(requests_file_paths[client_id])
sent = pl.read_parquet(send_file)
rcvd = pl.read_parquet(response_file)
all = rcvd.join(sent, on="messageID")
all = all.with_columns(
pl.lit(remote_client.name).alias("client")
overall = payloads.join(sent, on="messageID")
overall = rcvd.join(overall, on="messageID")
overall = overall.with_columns(
client=pl.lit(remote_client.name),
requestSize=pl.col("request").apply(len),
responseSize=pl.col("rawResponse").apply(len),
)
print(
all.with_columns(
overall.with_columns(
pl.col("receiveTime").alias("latency")
- pl.col("sendTime")
).sort("latency")
)
agg.append(all)
agg.append(overall)
table()
agg = pl.concat(agg, rechunk=True)
print(agg)
agg_path = os.path.join(
network.common_dir, "aggregated_basicperf_output.parquet"
)
with open(agg_path, "wb") as f:
agg.write_parquet(f)
print(f"Aggregated results written to {agg_path}")
start_send = agg["sendTime"].sort()[0]
end_recv = agg["receiveTime"].sort()[-1]
throughput = len(agg) / (end_recv - start_send)
print(f"Average throughput: {throughput} tx/s")
print(f"Average throughput: {throughput:.2f} tx/s")
byte_input = (
agg["requestSize"].sum() / (end_recv - start_send)
) / (1024 * 1024)
print(f"Average request input: {byte_input:.2f} Mbytes/s")
byte_output = (
agg["responseSize"].sum() / (end_recv - start_send)
) / (1024 * 1024)
print(f"Average request output: {byte_output:.2f} Mbytes/s")
sent = agg["sendTime"].sort()
sent_per_sec = (
@ -250,10 +342,8 @@ def run(args):
)
print(per_sec)
per_sec = per_sec.with_columns(
pl.col("sent").alias("sent_rate") / per_sec["sent"].max()
)
per_sec = per_sec.with_columns(
pl.col("rcvd").alias("rcvd_rate") / per_sec["rcvd"].max()
sent_rate=pl.col("sent") / per_sec["sent"].max(),
rcvd_rate=pl.col("rcvd") / per_sec["rcvd"].max(),
)
for row in per_sec.iter_rows(named=True):
s = "S" * int(row["sent_rate"] * 20)
@ -341,10 +431,24 @@ def cli_args():
help="Unused, swallowed for compatibility with old args",
action="store_true",
)
parser.add_argument(
"--client-timeout-s",
help="Number of seconds after which unresponsive clients are shut down",
default=90,
type=float,
)
parser.add_argument(
"--rw-mix",
help="Run a batched, fractional read/write mix instead of pure writes",
type=float,
)
return infra.e2e_args.cli_args(parser=parser, accept_unknown=False)
if __name__ == "__main__":
args = cli_args()
run(args)
if args.rw_mix:
run(args, RWMix(1000, 0.5))
else:
run(args, write_to_random_keys)