diff --git a/CMakeLists.txt b/CMakeLists.txt index 848c828791..9c1a7c264c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -948,36 +948,8 @@ if(BUILD_TESTS) PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/piccolo_driver.py CONSENSUS cft CLIENT_BIN ./submit - ADDITIONAL_ARGS - -p - "samples/apps/logging/liblogging" - -m - 1000 - --cert - "user1_cert.pem" - --key - "user1_privk.pem" - --cacert - "service_cert.pem" - ) - - add_perf_test( - NAME pi_ls_jwt - PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/piccolo_driver.py - CONSENSUS cft - CLIENT_BIN ./submit - ADDITIONAL_ARGS - -p - "samples/apps/logging/liblogging" - -m - 1000 - --cert - "user1_cert.pem" - --key - "user1_privk.pem" - --cacert - "service_cert.pem" - --use-jwt + ADDITIONAL_ARGS --package "samples/apps/logging/liblogging" + --max-writes-ahead 1000 --repetitions 10000 ) add_perf_test( @@ -999,6 +971,21 @@ if(BUILD_TESTS) msgpack ) + add_perf_test( + NAME pi_ls_jwt + PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/piccolo_driver.py + CONSENSUS cft + CLIENT_BIN ./submit + ADDITIONAL_ARGS + --package + "samples/apps/logging/liblogging" + --max-writes-ahead + 1000 + --repetitions + 1000 + --use-jwt + ) + add_perf_test( NAME ls_js PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/perfclient.py diff --git a/doc/architecture/performance/generator.rst b/doc/architecture/performance/generator.rst index e8c10e9ad5..725806cb5b 100644 --- a/doc/architecture/performance/generator.rst +++ b/doc/architecture/performance/generator.rst @@ -9,7 +9,7 @@ that could be submitted to the server. The user can declare the requests by leve the functions inside the library in :ccf_repo:`tests/perf-system/generator/generator.py`. The user can generate requests from the library by either calling the command line tool in :ccf_repo:`tests/perf-system/generator/generate_packages.py` or by creating a script -calling the functions of the library, such as the :ccf_repo:`tests/perf-system/generator/loggin_generator.py` +calling the functions of the library, such as the :ccf_repo:`tests/perf-system/generator/logging_generator.py` which contains a sample generation of requests for the logging CCF application. Prior running any of these files you first need to install the requirements @@ -70,7 +70,7 @@ can run your script as you would run any python file: .. code-block:: bash - $ python3 loggin_generator.py + $ python3 logging_generator.py Parquet files are an easy and well-compressed way of storing requests generated from this component diff --git a/doc/architecture/performance/submitter.rst b/doc/architecture/performance/submitter.rst index e2404ff51f..79dc522d56 100644 --- a/doc/architecture/performance/submitter.rst +++ b/doc/architecture/performance/submitter.rst @@ -36,7 +36,7 @@ the following arguments -s,--send-filepath TEXT REQUIRED Path to parquet file to store the submitted requests. -r,--response-filepath TEXT REQUIRED Path to parquet file to store the responses from the submitted requests. -g,--generator-filepath TEXT REQUIRED Path to parquet file with the generated requests to be submitted. - -m,--max-inflight-requests INT=0 Specifies the number of outstanding requests sent to the server while waiting for response. When this options is set to 0 there will be no pipelining. Any other value will enable pipelining. A positive value will specify a window of outstanding requests on the server while waiting for a response. -1 or a negative value will set the window of outstanding requests to maximum i.e. submit requests without waiting for a response + -m,--max-writes-ahead INT=0 Specifies the number of outstanding requests sent to the server while waiting for response. When this options is set to 0 there will be no pipelining. Any other value will enable pipelining. A positive value will specify a window of outstanding requests on the server while waiting for a response. -1 or a negative value will set the window of outstanding requests to maximum i.e. submit requests without waiting for a response Once the component finishes submitting and receiving responses for all the requests it will then store the results into two ``.parquet`` files. Hence, the path to file with the diff --git a/tests/infra/network.py b/tests/infra/network.py index 092a05dad0..dc3468aec4 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -93,6 +93,7 @@ class UserInfo: local_id: int service_id: str cert_path: str + key_path: str class Network: @@ -860,7 +861,8 @@ class Network: cert_path = os.path.join(self.common_dir, f"{local_user_id}_cert.pem") with open(cert_path, encoding="utf-8") as c: service_user_id = infra.crypto.compute_cert_der_hash_hex_from_pem(c.read()) - new_user = UserInfo(local_user_id, service_user_id, cert_path) + key_path = os.path.join(self.common_dir, f"{local_user_id}_privk.pem") + new_user = UserInfo(local_user_id, service_user_id, cert_path, key_path) if record: self.users.append(new_user) diff --git a/tests/infra/piccolo_driver.py b/tests/infra/piccolo_driver.py index 42ad97c2ad..e0e3e5f2b7 100644 --- a/tests/infra/piccolo_driver.py +++ b/tests/infra/piccolo_driver.py @@ -13,6 +13,8 @@ import cimetrics.upload import time import http import sys +import hashlib +import json sys.path.insert(0, "../tests/perf-system/generator") import generator @@ -21,8 +23,16 @@ sys.path.insert(0, "../tests/perf-system/analyzer") import analyzer -def get_command_args(args, get_command): - command_args = [] +def get_command_args(args, network, get_command): + client_ident = network.users[0] + command_args = [ + "--cert", + client_ident.cert_path, + "--key", + client_ident.key_path, + "--cacert", + network.cert_path, + ] return get_command(*command_args) @@ -92,48 +102,48 @@ def run(get_command, args): primary, backups = network.find_nodes() - command_args = get_command_args(args, get_command) + command_args = get_command_args(args, network, get_command) - jwt_header = "" + additional_headers = {} if args.use_jwt: jwt_issuer = infra.jwt_issuer.JwtIssuer("https://example.issuer") jwt_issuer.register(network) jwt = jwt_issuer.issue_jwt() - jwt_header = "Authorization: Bearer " + jwt + additional_headers["Authorization"] = f"Bearer {jwt}" - logging_filename = "piccolo_logging_100ktxs" - LOG.info("Starting parquet requests generation") + LOG.info(f"Generating {args.repetitions} parquet requests") msgs = generator.Messages() - for i in range(100000): + for i in range(args.repetitions): + body = { + "id": i % 100, + "msg": f"Unique message: {hashlib.md5(str(i).encode()).hexdigest()}", + } msgs.append( - "127.0.0.1:8000", "/app/log/private", "POST", - additional_headers=jwt_header, - data='{"id": ' - + str(i % 100) - + ', "msg": "Unique message: 93b885adfe0da089cdf634904fd59f7' - + str(i) - + '"}', + additional_headers=additional_headers, + body=json.dumps(body), ) - path_to_generator_file = os.path.join( - network.common_dir, f"{logging_filename}.parquet" + filename_prefix = "piccolo_driver" + path_to_requests_file = os.path.join( + network.common_dir, f"{filename_prefix}_requests.parquet" ) - msgs.to_parquet_file(path_to_generator_file) + LOG.info(f"Writing generated requests to {path_to_requests_file}") + msgs.to_parquet_file(path_to_requests_file) path_to_send_file = os.path.join( - network.common_dir, f"{logging_filename}_send.parquet" + network.common_dir, f"{filename_prefix}_send.parquet" ) path_to_response_file = os.path.join( - network.common_dir, f"{logging_filename}_response.parquet" + network.common_dir, f"{filename_prefix}_response.parquet" ) # Add filepaths in commands - command_args += ["-s", path_to_send_file] - command_args += ["-r", path_to_response_file] - command_args += ["--generator-filepath", path_to_generator_file] + command_args += ["--send-filepath", path_to_send_file] + command_args += ["--response-filepath", path_to_response_file] + command_args += ["--generator-filepath", path_to_requests_file] nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to) clients = [] @@ -198,6 +208,9 @@ def run(get_command, args): for remote_client in clients: analysis = analyzer.Analyze() + LOG.info( + f"Analyzing results from {path_to_send_file} and {path_to_response_file}" + ) df_sends = analyzer.get_df_from_parquet_file(path_to_send_file) df_responses = analyzer.get_df_from_parquet_file( path_to_response_file @@ -237,8 +250,8 @@ def run(get_command, args): for remote_client in clients: remote_client.stop() - except Exception: - LOG.error("Stopping clients due to exception") + except Exception as e: + LOG.error(f"Stopping clients due to exception: {e}") for remote_client in clients: remote_client.stop() raise @@ -293,6 +306,17 @@ def cli_args(add=lambda x: None, accept_unknown=False): help="Use JWT with a temporary issuer as authentication method.", action="store_true", ) + parser.add_argument( + "--repetitions", + help="Number of requests to send", + type=int, + default=100, + ) + parser.add_argument( + "--write-tx-times", + help="Unused, swallowed for compatibility with old args", + action="store_true", + ) parser.add_argument("--config", help="Path to config for client binary", default="") return infra.e2e_args.cli_args( @@ -312,11 +336,7 @@ if __name__ == "__main__": unknown_args = [term for arg in unknown_args for term in arg.split(" ")] - write_tx_index = unknown_args.index("--write-tx-times") - def get_command(*args): - return ( - [*args] + unknown_args[:write_tx_index] + unknown_args[write_tx_index + 1 :] - ) + return [*args] + unknown_args run(get_command, args) diff --git a/tests/perf-system/analyzer/analyzer.py b/tests/perf-system/analyzer/analyzer.py index d0f17e38cb..83b9c77763 100644 --- a/tests/perf-system/analyzer/analyzer.py +++ b/tests/perf-system/analyzer/analyzer.py @@ -316,7 +316,7 @@ class Analyze: def get_df_from_parquet_file(input_file: str): - return pd.read_parquet(input_file, engine="fastparquet") + return pd.read_parquet(input_file) def default_analysis(send_file, response_file): diff --git a/tests/perf-system/generator/generate_packages.py b/tests/perf-system/generator/generate_packages.py index 62dced80fa..5a67e35cb8 100644 --- a/tests/perf-system/generator/generate_packages.py +++ b/tests/perf-system/generator/generate_packages.py @@ -74,7 +74,6 @@ def main(): msg = Messages() msg.append( - args.host, args.path, args.verb, args.request_type, diff --git a/tests/perf-system/generator/generator.py b/tests/perf-system/generator/generator.py index 0272951ab4..1a2a90c6d2 100644 --- a/tests/perf-system/generator/generator.py +++ b/tests/perf-system/generator/generator.py @@ -12,59 +12,46 @@ import fastparquet as fp # type: ignore class Messages: def __init__(self): - self.df = pd.DataFrame(columns=["messageID", "request"]) + self.requests = [] def append( self, - host, path, verb, - request_type="HTTP/1.1", + http_version="HTTP/1.1", content_type="application/json", - additional_headers="", - data="", - iterations=1, + additional_headers=None, + body=bytes(), ): """ - Create a new df with the contents specified by the arguments, - append it to self.df and return the new df + Serialise HTTP request specified by the arguments, and + append it to self.requests """ - batch_df = pd.DataFrame(columns=["messageID", "request"]) - data_headers = b"\r\n" - if len(additional_headers) > 0: - additional_headers += "\r\n" - if len(data) > 0: - if isinstance(data, str): - data = data.encode("ascii") - data_headers = (f"content-length: {len(data)}\r\n\r\n").encode( - "ascii" - ) + data - df_size = len(self.df.index) + headers = {} + if additional_headers is not None: + headers.update({k.lower(): v for k, v in additional_headers.items()}) - for ind in range(iterations): - batch_df.loc[ind] = [ - str(ind + df_size), - ( - verb.upper() - + " " - + path - + " " - + request_type - + "\r\n" - + "host: " - + host - + "\r\n" - + additional_headers - + "content-type: " - + content_type.lower() - + "\r\n" - ).encode("ascii") - + data_headers, - ] + # Insert content-length, and content-type headers, if they're not already present + if "content-length" not in headers: + headers["content-length"] = str(len(body)) + if "content-type" not in headers and content_type is not None: + headers["content-type"] = content_type - self.df = pd.concat([self.df, batch_df]) - return batch_df + # Convert body to bytes if we were given a string + if type(body) == str: + body = body.encode("utf-8") + + request_line = f"{verb.upper()} {path} {http_version}" + headers_string = "\r\n".join(f"{k}: {v}" for k, v in headers.items()) + serialised_request = ( + f"{request_line}\r\n{headers_string}\r\n\r\n".encode("ascii") + body + ) + + self.requests.append( + {"messageID": str(len(self.requests)), "request": serialised_request} + ) def to_parquet_file(self, path): - fp.write(path, self.df) + df = pd.DataFrame(self.requests) + fp.write(path, df, write_index=True) diff --git a/tests/perf-system/generator/loggin_generator.py b/tests/perf-system/generator/loggin_generator.py deleted file mode 100644 index f70958b8e5..0000000000 --- a/tests/perf-system/generator/loggin_generator.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the Apache 2.0 License. -from generator import Messages - -HOST = "127.0.0.1:8000" -REQUEST_CONTENT_TYPE = "content-type: application/json" - - -msgs = Messages() - -inputs = msgs.append(HOST, "/app/log/private/count", "GET") - -for i in range(14): - msgs.append( - HOST, - "/app/log/private", - "POST", - data='{"id": ' + str(i) + ', "msg": "Logged ' + str(i) + ' to private table"}', - ) -inputs = msgs.append(HOST, "/app/log/private/count", "GET") - -for i in range(14): - msgs.append(HOST, "/app/log/private?id=" + str(i), "GET") -inputs = msgs.append(HOST, "/app/log/private/count", "GET") - -for i in range(14): - msgs.append(HOST, "/app/log/private?id=" + str(i), "DELETE") -inputs = msgs.append(HOST, "/app/log/private/count", "GET") - - -msgs.to_parquet_file("new_raw.parquet") diff --git a/tests/perf-system/generator/logging_generator.py b/tests/perf-system/generator/logging_generator.py new file mode 100644 index 0000000000..0844c79d5d --- /dev/null +++ b/tests/perf-system/generator/logging_generator.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. +from generator import Messages + + +common_headers = {"host": "127.0.0.1:8000"} +msgs = Messages() + +msgs.append("/app/log/private/count", "GET") + +msg_count = 14 + +for i in range(msg_count): + msgs.append( + "/app/log/private", + "POST", + additional_headers=common_headers, + body=f'{{"id": {i}, "msg": "Logged {i} to private table"}}', + ) +msgs.append("/app/log/private/count", "GET", additional_headers=common_headers) + +for i in range(msg_count): + msgs.append(f"/app/log/private?id={i}", "GET", additional_headers=common_headers) +msgs.append("/app/log/private/count", "GET", additional_headers=common_headers) + +for i in range(msg_count): + msgs.append(f"/app/log/private?id={i}", "DELETE", additional_headers=common_headers) +msgs.append("/app/log/private/count", "GET", additional_headers=common_headers) + +msgs.to_parquet_file("new_raw.parquet") diff --git a/tests/perf-system/submitter/handle_arguments.h b/tests/perf-system/submitter/handle_arguments.h index 08bd921a44..1124eb2594 100644 --- a/tests/perf-system/submitter/handle_arguments.h +++ b/tests/perf-system/submitter/handle_arguments.h @@ -75,7 +75,7 @@ public: ->required(); app .add_option( - "-m,--max-inflight-requests", + "-m,--max-writes-ahead", max_inflight_requests, "Specifies the number of outstanding requests sent to the server while " "waiting for response. When this options is set to 0 there will be no " diff --git a/tests/perf-system/submitter/submit.cpp b/tests/perf-system/submitter/submit.cpp index 766a24ff6a..8939c66e44 100644 --- a/tests/perf-system/submitter/submit.cpp +++ b/tests/perf-system/submitter/submit.cpp @@ -12,9 +12,9 @@ #include #include +#include #include #include -#include #include #include #include @@ -40,8 +40,11 @@ void read_parquet_file(string generator_filepath, ParquetData& data_handler) st = parquet::arrow::OpenFile(input, pool, &arrow_reader); if (!st.ok()) { - LOG_FAIL_FMT("Couldn't find generator file"); - exit(2); + LOG_FAIL_FMT( + "Couldn't find generator file ({}): {}", + generator_filepath, + st.ToString()); + exit(1); } else { @@ -49,35 +52,90 @@ void read_parquet_file(string generator_filepath, ParquetData& data_handler) } // Read entire file as a single Arrow table - auto selected_columns = {0, 1}; - std::shared_ptr table; - st = arrow_reader->ReadTable(selected_columns, &table); - if (!st.ok()) + std::shared_ptr table = nullptr; + st = arrow_reader->ReadTable(&table); + if (!st.ok() || table == nullptr) { - LOG_FAIL_FMT("Couldn't open generator file"); - exit(2); + LOG_FAIL_FMT( + "Couldn't open generator file ({}): {}", + generator_filepath, + st.ToString()); + exit(1); } else { LOG_INFO_FMT("Opened generator file"); } - std::shared_ptr<::arrow::ChunkedArray> column; + const auto& schema = table->schema(); - ::arrow::Status column1Status = arrow_reader->ReadColumn(1, &column); - std::shared_ptr col1Vals = - std::dynamic_pointer_cast(column->chunk( - 0)); // ASSIGN there is only one chunk with col->num_chunks(); + std::vector column_names = {"messageID", "request"}; - ::arrow::Status column2Status = arrow_reader->ReadColumn(2, &column); - std::shared_ptr col2Vals = - std::dynamic_pointer_cast(column->chunk( - 0)); // ASSIGN there is only one chunk with col->num_chunks(); - for (int row = 0; row < col1Vals->length(); row++) + st = schema->CanReferenceFieldsByNames(column_names); + if (!st.ok()) { - data_handler.ids.push_back(col1Vals->GetString(row)); - data_handler.request.push_back({col2Vals->Value(row).begin(), - col2Vals->Value(row).end()}); + LOG_FAIL_FMT( + "Input file does not contain unambiguous field names - cannot lookup " + "desired columns: {}", + st.ToString()); + exit(1); + } + + const auto message_id_idx = schema->GetFieldIndex("messageID"); + if (message_id_idx == -1) + { + LOG_FAIL_FMT("No messageID field found in file"); + exit(1); + } + + std::shared_ptr<::arrow::ChunkedArray> message_id_column = + table->column(message_id_idx); + if (message_id_column->num_chunks() != 1) + { + LOG_FAIL_FMT( + "Expected a single chunk, found {}", message_id_column->num_chunks()); + exit(1); + } + + auto message_id_values = + std::dynamic_pointer_cast(message_id_column->chunk(0)); + if (message_id_values == nullptr) + { + LOG_FAIL_FMT( + "The messageID column of input file could not be read as string array"); + exit(1); + } + + const auto request_idx = schema->GetFieldIndex("request"); + if (request_idx == -1) + { + LOG_FAIL_FMT("No request field found in file"); + exit(1); + } + + std::shared_ptr<::arrow::ChunkedArray> request_column = + table->column(request_idx); + if (request_column->num_chunks() != 1) + { + LOG_FAIL_FMT( + "Expected a single chunk, found {}", request_column->num_chunks()); + exit(1); + } + + auto request_values = + std::dynamic_pointer_cast(request_column->chunk(0)); + if (request_values == nullptr) + { + LOG_FAIL_FMT( + "The request column of input file could not be read as binary array"); + exit(1); + } + + for (int row = 0; row < table->num_rows(); row++) + { + data_handler.ids.push_back(message_id_values->GetString(row)); + const auto request = request_values->Value(row); + data_handler.request.push_back({request.begin(), request.end()}); } } @@ -127,17 +185,21 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler) PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids)); arrow::NumericBuilder send_time_builder; - PARQUET_THROW_NOT_OK(send_time_builder.AppendValues(data_handler.send_time)); + PARQUET_THROW_NOT_OK( + send_time_builder.AppendValues(data_handler.send_time)); auto table = arrow::Table::Make( - arrow::schema({arrow::field("messageID", arrow::utf8()), - arrow::field("sendTime", arrow::float64())}), - {message_id_builder.Finish().ValueOrDie(), send_time_builder.Finish().ValueOrDie()}); + arrow::schema( + {arrow::field("messageID", arrow::utf8()), + arrow::field("sendTime", arrow::float64())}), + {message_id_builder.Finish().ValueOrDie(), + send_time_builder.Finish().ValueOrDie()}); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(args.send_filepath)); - PARQUET_THROW_NOT_OK( - parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); + PARQUET_ASSIGN_OR_THROW( + outfile, arrow::io::FileOutputStream::Open(args.send_filepath)); + PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable( + *table, arrow::default_memory_pool(), outfile, 1)); } // Write Response Parquet @@ -146,7 +208,8 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler) PARQUET_THROW_NOT_OK(message_id_builder.AppendValues(data_handler.ids)); arrow::NumericBuilder receive_time_builder; - PARQUET_THROW_NOT_OK(receive_time_builder.AppendValues(data_handler.response_time)); + PARQUET_THROW_NOT_OK( + receive_time_builder.AppendValues(data_handler.response_time)); arrow::BinaryBuilder raw_response_builder; for (auto& raw_response : data_handler.raw_response) @@ -156,17 +219,20 @@ void store_parquet_results(ArgumentParser args, ParquetData data_handler) } auto table = arrow::Table::Make( - arrow::schema({arrow::field("messageID", arrow::utf8()), - arrow::field("receiveTime", arrow::float64()), - arrow::field("rawResponse", arrow::binary()), - }), - {message_id_builder.Finish().ValueOrDie(), receive_time_builder.Finish().ValueOrDie(), + arrow::schema({ + arrow::field("messageID", arrow::utf8()), + arrow::field("receiveTime", arrow::float64()), + arrow::field("rawResponse", arrow::binary()), + }), + {message_id_builder.Finish().ValueOrDie(), + receive_time_builder.Finish().ValueOrDie(), raw_response_builder.Finish().ValueOrDie()}); std::shared_ptr outfile; - PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(args.response_filepath)); - PARQUET_THROW_NOT_OK( - parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); + PARQUET_ASSIGN_OR_THROW( + outfile, arrow::io::FileOutputStream::Open(args.response_filepath)); + PARQUET_THROW_NOT_OK(parquet::arrow::WriteTable( + *table, arrow::default_memory_pool(), outfile, 1)); } LOG_INFO_FMT("Finished storing results");