From ba4ef14df2e5a0fb66d018dffff2f6520ccf793d Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Fri, 22 May 2020 17:48:34 +0100 Subject: [PATCH] Websockets perf tests (#1211) --- CMakeLists.txt | 19 +++- .../smallbank/clients/small_bank_client.cpp | 6 +- samples/apps/smallbank/smallbank.cmake | 17 ++++ samples/perf_client/perf_client.h | 21 +++-- src/clients/rpc_tls_client.h | 68 ++++++++++++++- src/clients/tls_client.h | 20 +++++ src/http/ws_builder.h | 86 +++++++++++++++++++ src/http/ws_rpc_context.h | 82 +----------------- 8 files changed, 228 insertions(+), 91 deletions(-) create mode 100644 src/http/ws_builder.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 141c4cfe49..ec3fe51f52 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -660,7 +660,6 @@ if(BUILD_TESTS) endforeach() - # Logging scenario perf test add_perf_test( NAME logging_scenario_perf_test PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/perfclient.py @@ -677,4 +676,22 @@ if(BUILD_TESTS) --repetitions 1000 ) + + add_perf_test( + NAME logging_scenario_ws_perf_test + PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/perfclient.py + CONSENSUS raft + CLIENT_BIN ./scenario_perf_client + LABEL log_scenario_ws + ADDITIONAL_ARGS + --package + liblogging + --scenario-file + ${CMAKE_CURRENT_LIST_DIR}/tests/perf_logging_scenario_100txs.json + --max-writes-ahead + 1000 + --repetitions + 10000 + --use-websockets + ) endif() diff --git a/samples/apps/smallbank/clients/small_bank_client.cpp b/samples/apps/smallbank/clients/small_bank_client.cpp index 2565c75d89..9911b10cb7 100644 --- a/samples/apps/smallbank/clients/small_bank_client.cpp +++ b/samples/apps/smallbank/clients/small_bank_client.cpp @@ -46,7 +46,7 @@ private: LOG_INFO_FMT(header); } - auto conn = get_connection(); + auto conn = create_connection(true, false); nlohmann::json accs = nlohmann::json::array(); @@ -225,7 +225,7 @@ private: } // Create new connection to read balances - auto conn = get_connection(); + auto conn = create_connection(true, false); for (const auto& entry : expected) { @@ -256,6 +256,8 @@ private: std::to_string(actual_balance)); } } + + LOG_INFO_FMT("VERIFIED {}", prefix); } void verify_initial_state(const nlohmann::json& expected) override diff --git a/samples/apps/smallbank/smallbank.cmake b/samples/apps/smallbank/smallbank.cmake index 44df48f920..b92d304ff2 100644 --- a/samples/apps/smallbank/smallbank.cmake +++ b/samples/apps/smallbank/smallbank.cmake @@ -89,6 +89,23 @@ if(BUILD_TESTS) --metrics-file small_bank_${CONSENSUS}_metrics.json ) + add_perf_test( + NAME small_bank_client_ws_test_${CONSENSUS} + PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py + CLIENT_BIN ./small_bank_client + VERIFICATION_FILE ${SMALL_BANK_VERIFICATION_FILE} + LABEL SB_WS + CONSENSUS ${CONSENSUS} + ADDITIONAL_ARGS + --transactions + ${SMALL_BANK_ITERATIONS} + --max-writes-ahead + 1000 + --metrics-file + small_bank_${CONSENSUS}_metrics.json + --use-websockets + ) + add_perf_test( NAME small_bank_sigs_client_test_${CONSENSUS} PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py diff --git a/samples/perf_client/perf_client.h b/samples/perf_client/perf_client.h index d1bcd353ae..6e8f51cd26 100644 --- a/samples/perf_client/perf_client.h +++ b/samples/perf_client/perf_client.h @@ -72,6 +72,7 @@ namespace client bool randomise = false; bool check_responses = false; bool relax_commit_target = false; + bool websockets = false; ///@} PerfOptions( @@ -182,6 +183,10 @@ namespace client check_responses, "Check every JSON response for errors. Potentially slow") ->capture_default_str(); + app + .add_flag( + "--use-websockets", websockets, "Use websockets to send transactions") + ->capture_default_str(); } }; @@ -298,7 +303,8 @@ namespace client std::chrono::high_resolution_clock::time_point last_write_time; std::chrono::nanoseconds write_delay_ns = std::chrono::nanoseconds::zero(); - std::shared_ptr create_connection(bool force_unsigned = false) + std::shared_ptr create_connection( + bool force_unsigned = false, bool upgrade = false) { // Create a cert if this is our first rpc_connection const bool is_first = get_cert(); @@ -323,6 +329,9 @@ namespace client "Connected to server via TLS ({})", conn->get_ciphersuite_name()); } + if (upgrade) + conn->upgrade_to_ws(); + return conn; } @@ -416,8 +425,8 @@ namespace client } } - const auto global_commit_response = - wait_for_global_commit(trigger_signature(connection)); + const auto global_commit_response = wait_for_global_commit( + trigger_signature(create_connection(true, false))); size_t last_commit = 0; if (!options.no_wait) { @@ -614,7 +623,7 @@ namespace client rand_generator(), // timing gets its own new connection for any requests it wants to send - // these are never signed - response_times(create_connection(true)) + response_times(create_connection(true, false)) {} void init_connection() @@ -622,7 +631,7 @@ namespace client // Make sure the connection we're about to use has been initialised if (!rpc_connection) { - rpc_connection = create_connection(); + rpc_connection = create_connection(false, options.websockets); } } @@ -644,7 +653,7 @@ namespace client { // Ensure creation transactions are globally committed before // proceeding - wait_for_global_commit(trigger_signature(get_connection())); + wait_for_global_commit(trigger_signature(create_connection(true))); } } catch (std::exception& e) diff --git a/src/clients/rpc_tls_client.h b/src/clients/rpc_tls_client.h index 86ed6550bd..333cef6676 100644 --- a/src/clients/rpc_tls_client.h +++ b/src/clients/rpc_tls_client.h @@ -5,6 +5,8 @@ #include "http/http_builder.h" #include "http/http_consts.h" #include "http/http_parser.h" +#include "http/ws_builder.h" +#include "http/ws_parser.h" #include "node/rpc/json_rpc.h" #include "tls_client.h" @@ -35,13 +37,26 @@ public: protected: http::ResponseParser parser; + ws::ResponseParser ws_parser; std::optional prefix; tls::KeyPairPtr key_pair = nullptr; + bool is_ws = false; size_t next_send_id = 0; size_t next_recv_id = 0; - std::vector gen_request_internal( + std::vector gen_ws_upgrade_request() + { + auto r = http::Request("/", HTTP_GET); + r.set_header("Upgrade", "websocket"); + r.set_header("Connection", "Upgrade"); + r.set_header("Sec-WebSocket-Key", "iT9AbE3Q96TfyWZ+3gQdfg=="); + r.set_header("Sec-WebSocket-Version", "13"); + + return r.build_request(); + } + + std::vector gen_http_request_internal( const std::string& method, const CBuffer params, const std::string& content_type, @@ -65,6 +80,30 @@ protected: return r.build_request(); } + std::vector gen_ws_request_internal( + const std::string& method, const CBuffer params) + { + auto path = method; + if (prefix.has_value()) + { + path = fmt::format("/{}/{}", prefix.value(), path); + } + std::vector body(params.p, params.p + params.n); + return ws::make_in_frame(path, body); + } + + std::vector gen_request_internal( + const std::string& method, + const CBuffer params, + const std::string& content_type, + http_method verb) + { + if (is_ws) + return gen_ws_request_internal(method, params); + else + return gen_http_request_internal(method, params, content_type, verb); + } + Response call_raw(const std::vector& raw) { CBuffer b(raw); @@ -88,9 +127,19 @@ public: std::shared_ptr node_ca = nullptr, std::shared_ptr cert = nullptr) : TlsClient(host, port, node_ca, cert), - parser(*this) + parser(*this), + ws_parser(*this) {} + void upgrade_to_ws() + { + auto upgrade = gen_ws_upgrade_request(); + auto response = call_raw(upgrade); + if (response.headers.find("sec-websocket-accept") == response.headers.end()) + throw std::logic_error("Failed to upgrade to websockets"); + is_ws = true; + } + void create_key_pair(const tls::Pem priv_key) { key_pair = tls::make_key_pair(priv_key); @@ -188,8 +237,19 @@ public: while (!last_response.has_value()) { - const auto next = read_all(); - parser.execute(next.data(), next.size()); + if (is_ws) + { + auto buf = read(ws::INITIAL_READ); + size_t n = ws_parser.consume(buf); + buf = read(n); + n = ws_parser.consume(buf); + assert(n == ws::INITIAL_READ); + } + else + { + const auto next = read_all(); + parser.execute(next.data(), next.size()); + } } return std::move(last_response.value()); diff --git a/src/clients/tls_client.h b/src/clients/tls_client.h index a00674f538..9dc5b79c77 100644 --- a/src/clients/tls_client.h +++ b/src/clients/tls_client.h @@ -140,6 +140,26 @@ public: } } + std::vector read(size_t read_size) + { + std::vector buf(read_size); + auto ret = mbedtls_ssl_read(&ssl, buf.data(), buf.size()); + if (ret > 0) + { + buf.resize(ret); + } + else if (ret == 0) + { + throw std::logic_error("Underlying transport closed"); + } + else + { + throw std::logic_error(tls::error_string(ret)); + } + + return buf; + } + void read(Buffer b) { for (size_t read = 0; read < b.n;) diff --git a/src/http/ws_builder.h b/src/http/ws_builder.h new file mode 100644 index 0000000000..7f2a355a6c --- /dev/null +++ b/src/http/ws_builder.h @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "../ds/serialized.h" +#include "../kv/kv_types.h" +#include "ws_parser.h" + +#include + +namespace ws +{ + static std::vector make_frame(size_t frame_size) + { + size_t sz_size = 0; + if (frame_size > 125) + { + sz_size = frame_size > std::numeric_limits::max() ? 8 : 2; + } + + size_t ws_h_size = ws::INITIAL_READ + sz_size; + std::vector msg(ws_h_size + frame_size); + msg[0] = 0x82; + switch (sz_size) + { + case 0: + { + msg[1] = frame_size; + break; + } + case 2: + { + msg[1] = 0x7e; + *((uint16_t*)&msg[2]) = htons(frame_size); + break; + } + case 8: + { + msg[1] = 0x7f; + *((uint64_t*)&msg[2]) = htobe64(frame_size); + break; + } + default: + throw std::logic_error(fmt::format("Invalid sz_size: {}", sz_size)); + } + + return msg; + }; + + static std::vector make_in_frame( + const std::string& path, const std::vector& body) + { + size_t in_frame_size = ws::in_header_size(path) + body.size(); + auto frame = make_frame(in_frame_size); + size_t ws_h_size = frame.size() - in_frame_size; + + uint8_t* p = frame.data() + ws_h_size; + size_t s = frame.size() - ws_h_size; + serialized::write_lps(p, s, path); + assert(s == body.size()); + ::memcpy(p, body.data(), s); + return frame; + } + + static std::vector make_out_frame( + size_t code, + kv::Version seqno, + kv::Consensus::View view, + kv::Version global_commit, + const std::vector& body) + { + size_t out_frame_size = ws::OUT_CCF_HEADER_SIZE + body.size(); + auto frame = make_frame(out_frame_size); + size_t ws_h_size = frame.size() - out_frame_size; + + uint8_t* p = frame.data() + ws_h_size; + size_t s = frame.size() - ws_h_size; + serialized::write(p, s, code); + serialized::write(p, s, seqno); + serialized::write(p, s, view); + serialized::write(p, s, global_commit); + assert(s == body.size()); + ::memcpy(p, body.data(), s); + return frame; + } +} \ No newline at end of file diff --git a/src/http/ws_rpc_context.h b/src/http/ws_rpc_context.h index 2606e81871..7efe7ebf79 100644 --- a/src/http/ws_rpc_context.h +++ b/src/http/ws_rpc_context.h @@ -6,6 +6,7 @@ #include "enclave/rpc_context.h" #include "http_parser.h" #include "http_sig.h" +#include "ws_builder.h" namespace ws { @@ -16,47 +17,7 @@ namespace ws kv::Consensus::View term = 0, kv::Version global_commit = 0) { - size_t frame_size = ws::OUT_CCF_HEADER_SIZE + body.size(); - size_t sz_size = 0; - if (frame_size > 125) - { - sz_size = frame_size > std::numeric_limits::max() ? 8 : 2; - } - - size_t ws_h_size = ws::INITIAL_READ + sz_size; - std::vector msg(ws_h_size + frame_size); - msg[0] = 0x82; - switch (sz_size) - { - case 0: - { - msg[1] = frame_size; - break; - } - case 2: - { - msg[1] = 0x7e; - *((uint16_t*)&msg[2]) = htons(frame_size); - break; - } - case 8: - { - msg[1] = 0x7f; - *((uint64_t*)&msg[2]) = htobe64(frame_size); - break; - } - default: - throw std::logic_error(fmt::format("Invalid sz_size: {}", sz_size)); - } - uint8_t* p = msg.data() + ws_h_size; - size_t s = msg.size() - ws_h_size; - serialized::write(p, s, code); - serialized::write(p, s, commit); - serialized::write(p, s, term); - serialized::write(p, s, global_commit); - assert(s == body.size()); - ::memcpy(p, body.data(), s); - return msg; + return make_out_frame(code, commit, term, global_commit, body); }; static std::vector error(size_t code, const std::string& msg) @@ -134,43 +95,8 @@ namespace ws { if (serialised_request.empty()) { - size_t frame_size = ws::in_header_size(path) + request_body.size(); - size_t sz_size = 0; - if (frame_size > 125) - { - sz_size = frame_size > std::numeric_limits::max() ? 8 : 2; - } - - size_t ws_h_size = ws::INITIAL_READ + sz_size; - serialised_request.resize(ws_h_size + frame_size); - serialised_request[0] = 0x82; - switch (sz_size) - { - case 0: - { - serialised_request[1] = frame_size; - break; - } - case 2: - { - serialised_request[1] = 0x7e; - *((uint16_t*)&serialised_request[2]) = htons(frame_size); - break; - } - case 8: - { - serialised_request[1] = 0x7f; - *((uint64_t*)&serialised_request[2]) = htobe64(frame_size); - break; - } - default: - throw std::logic_error(fmt::format("Invalid sz_size: {}", sz_size)); - } - uint8_t* p = serialised_request.data() + ws_h_size; - size_t s = serialised_request.size() - ws_h_size; - serialized::write_lps(p, s, path); - assert(s == request_body.size()); - ::memcpy(p, request_body.data(), s); + auto sr = make_in_frame(path, request_body); + serialised_request.swap(sr); } return serialised_request; }