This commit is contained in:
Amaury Chamayou 2020-05-22 17:48:34 +01:00 коммит произвёл GitHub
Родитель e432c813cf
Коммит ba4ef14df2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 228 добавлений и 91 удалений

Просмотреть файл

@ -660,7 +660,6 @@ if(BUILD_TESTS)
endforeach()
# Logging scenario perf test
add_perf_test(
NAME logging_scenario_perf_test
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/perfclient.py
@ -677,4 +676,22 @@ if(BUILD_TESTS)
--repetitions
1000
)
add_perf_test(
NAME logging_scenario_ws_perf_test
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/perfclient.py
CONSENSUS raft
CLIENT_BIN ./scenario_perf_client
LABEL log_scenario_ws
ADDITIONAL_ARGS
--package
liblogging
--scenario-file
${CMAKE_CURRENT_LIST_DIR}/tests/perf_logging_scenario_100txs.json
--max-writes-ahead
1000
--repetitions
10000
--use-websockets
)
endif()

Просмотреть файл

@ -46,7 +46,7 @@ private:
LOG_INFO_FMT(header);
}
auto conn = get_connection();
auto conn = create_connection(true, false);
nlohmann::json accs = nlohmann::json::array();
@ -225,7 +225,7 @@ private:
}
// Create new connection to read balances
auto conn = get_connection();
auto conn = create_connection(true, false);
for (const auto& entry : expected)
{
@ -256,6 +256,8 @@ private:
std::to_string(actual_balance));
}
}
LOG_INFO_FMT("VERIFIED {}", prefix);
}
void verify_initial_state(const nlohmann::json& expected) override

Просмотреть файл

@ -89,6 +89,23 @@ if(BUILD_TESTS)
--metrics-file small_bank_${CONSENSUS}_metrics.json
)
add_perf_test(
NAME small_bank_client_ws_test_${CONSENSUS}
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py
CLIENT_BIN ./small_bank_client
VERIFICATION_FILE ${SMALL_BANK_VERIFICATION_FILE}
LABEL SB_WS
CONSENSUS ${CONSENSUS}
ADDITIONAL_ARGS
--transactions
${SMALL_BANK_ITERATIONS}
--max-writes-ahead
1000
--metrics-file
small_bank_${CONSENSUS}_metrics.json
--use-websockets
)
add_perf_test(
NAME small_bank_sigs_client_test_${CONSENSUS}
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py

Просмотреть файл

@ -72,6 +72,7 @@ namespace client
bool randomise = false;
bool check_responses = false;
bool relax_commit_target = false;
bool websockets = false;
///@}
PerfOptions(
@ -182,6 +183,10 @@ namespace client
check_responses,
"Check every JSON response for errors. Potentially slow")
->capture_default_str();
app
.add_flag(
"--use-websockets", websockets, "Use websockets to send transactions")
->capture_default_str();
}
};
@ -298,7 +303,8 @@ namespace client
std::chrono::high_resolution_clock::time_point last_write_time;
std::chrono::nanoseconds write_delay_ns = std::chrono::nanoseconds::zero();
std::shared_ptr<RpcTlsClient> create_connection(bool force_unsigned = false)
std::shared_ptr<RpcTlsClient> create_connection(
bool force_unsigned = false, bool upgrade = false)
{
// Create a cert if this is our first rpc_connection
const bool is_first = get_cert();
@ -323,6 +329,9 @@ namespace client
"Connected to server via TLS ({})", conn->get_ciphersuite_name());
}
if (upgrade)
conn->upgrade_to_ws();
return conn;
}
@ -416,8 +425,8 @@ namespace client
}
}
const auto global_commit_response =
wait_for_global_commit(trigger_signature(connection));
const auto global_commit_response = wait_for_global_commit(
trigger_signature(create_connection(true, false)));
size_t last_commit = 0;
if (!options.no_wait)
{
@ -614,7 +623,7 @@ namespace client
rand_generator(),
// timing gets its own new connection for any requests it wants to send -
// these are never signed
response_times(create_connection(true))
response_times(create_connection(true, false))
{}
void init_connection()
@ -622,7 +631,7 @@ namespace client
// Make sure the connection we're about to use has been initialised
if (!rpc_connection)
{
rpc_connection = create_connection();
rpc_connection = create_connection(false, options.websockets);
}
}
@ -644,7 +653,7 @@ namespace client
{
// Ensure creation transactions are globally committed before
// proceeding
wait_for_global_commit(trigger_signature(get_connection()));
wait_for_global_commit(trigger_signature(create_connection(true)));
}
}
catch (std::exception& e)

Просмотреть файл

@ -5,6 +5,8 @@
#include "http/http_builder.h"
#include "http/http_consts.h"
#include "http/http_parser.h"
#include "http/ws_builder.h"
#include "http/ws_parser.h"
#include "node/rpc/json_rpc.h"
#include "tls_client.h"
@ -35,13 +37,26 @@ public:
protected:
http::ResponseParser parser;
ws::ResponseParser ws_parser;
std::optional<std::string> prefix;
tls::KeyPairPtr key_pair = nullptr;
bool is_ws = false;
size_t next_send_id = 0;
size_t next_recv_id = 0;
std::vector<uint8_t> gen_request_internal(
std::vector<uint8_t> gen_ws_upgrade_request()
{
auto r = http::Request("/", HTTP_GET);
r.set_header("Upgrade", "websocket");
r.set_header("Connection", "Upgrade");
r.set_header("Sec-WebSocket-Key", "iT9AbE3Q96TfyWZ+3gQdfg==");
r.set_header("Sec-WebSocket-Version", "13");
return r.build_request();
}
std::vector<uint8_t> gen_http_request_internal(
const std::string& method,
const CBuffer params,
const std::string& content_type,
@ -65,6 +80,30 @@ protected:
return r.build_request();
}
std::vector<uint8_t> gen_ws_request_internal(
const std::string& method, const CBuffer params)
{
auto path = method;
if (prefix.has_value())
{
path = fmt::format("/{}/{}", prefix.value(), path);
}
std::vector<uint8_t> body(params.p, params.p + params.n);
return ws::make_in_frame(path, body);
}
std::vector<uint8_t> gen_request_internal(
const std::string& method,
const CBuffer params,
const std::string& content_type,
http_method verb)
{
if (is_ws)
return gen_ws_request_internal(method, params);
else
return gen_http_request_internal(method, params, content_type, verb);
}
Response call_raw(const std::vector<uint8_t>& raw)
{
CBuffer b(raw);
@ -88,9 +127,19 @@ public:
std::shared_ptr<tls::CA> node_ca = nullptr,
std::shared_ptr<tls::Cert> cert = nullptr) :
TlsClient(host, port, node_ca, cert),
parser(*this)
parser(*this),
ws_parser(*this)
{}
void upgrade_to_ws()
{
auto upgrade = gen_ws_upgrade_request();
auto response = call_raw(upgrade);
if (response.headers.find("sec-websocket-accept") == response.headers.end())
throw std::logic_error("Failed to upgrade to websockets");
is_ws = true;
}
void create_key_pair(const tls::Pem priv_key)
{
key_pair = tls::make_key_pair(priv_key);
@ -188,8 +237,19 @@ public:
while (!last_response.has_value())
{
const auto next = read_all();
parser.execute(next.data(), next.size());
if (is_ws)
{
auto buf = read(ws::INITIAL_READ);
size_t n = ws_parser.consume(buf);
buf = read(n);
n = ws_parser.consume(buf);
assert(n == ws::INITIAL_READ);
}
else
{
const auto next = read_all();
parser.execute(next.data(), next.size());
}
}
return std::move(last_response.value());

Просмотреть файл

@ -140,6 +140,26 @@ public:
}
}
std::vector<uint8_t> read(size_t read_size)
{
std::vector<uint8_t> buf(read_size);
auto ret = mbedtls_ssl_read(&ssl, buf.data(), buf.size());
if (ret > 0)
{
buf.resize(ret);
}
else if (ret == 0)
{
throw std::logic_error("Underlying transport closed");
}
else
{
throw std::logic_error(tls::error_string(ret));
}
return buf;
}
void read(Buffer b)
{
for (size_t read = 0; read < b.n;)

86
src/http/ws_builder.h Normal file
Просмотреть файл

@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "../ds/serialized.h"
#include "../kv/kv_types.h"
#include "ws_parser.h"
#include <arpa/inet.h>
namespace ws
{
static std::vector<uint8_t> make_frame(size_t frame_size)
{
size_t sz_size = 0;
if (frame_size > 125)
{
sz_size = frame_size > std::numeric_limits<uint16_t>::max() ? 8 : 2;
}
size_t ws_h_size = ws::INITIAL_READ + sz_size;
std::vector<uint8_t> msg(ws_h_size + frame_size);
msg[0] = 0x82;
switch (sz_size)
{
case 0:
{
msg[1] = frame_size;
break;
}
case 2:
{
msg[1] = 0x7e;
*((uint16_t*)&msg[2]) = htons(frame_size);
break;
}
case 8:
{
msg[1] = 0x7f;
*((uint64_t*)&msg[2]) = htobe64(frame_size);
break;
}
default:
throw std::logic_error(fmt::format("Invalid sz_size: {}", sz_size));
}
return msg;
};
static std::vector<uint8_t> make_in_frame(
const std::string& path, const std::vector<uint8_t>& body)
{
size_t in_frame_size = ws::in_header_size(path) + body.size();
auto frame = make_frame(in_frame_size);
size_t ws_h_size = frame.size() - in_frame_size;
uint8_t* p = frame.data() + ws_h_size;
size_t s = frame.size() - ws_h_size;
serialized::write_lps(p, s, path);
assert(s == body.size());
::memcpy(p, body.data(), s);
return frame;
}
static std::vector<uint8_t> make_out_frame(
size_t code,
kv::Version seqno,
kv::Consensus::View view,
kv::Version global_commit,
const std::vector<uint8_t>& body)
{
size_t out_frame_size = ws::OUT_CCF_HEADER_SIZE + body.size();
auto frame = make_frame(out_frame_size);
size_t ws_h_size = frame.size() - out_frame_size;
uint8_t* p = frame.data() + ws_h_size;
size_t s = frame.size() - ws_h_size;
serialized::write<uint16_t>(p, s, code);
serialized::write<size_t>(p, s, seqno);
serialized::write<size_t>(p, s, view);
serialized::write<size_t>(p, s, global_commit);
assert(s == body.size());
::memcpy(p, body.data(), s);
return frame;
}
}

Просмотреть файл

@ -6,6 +6,7 @@
#include "enclave/rpc_context.h"
#include "http_parser.h"
#include "http_sig.h"
#include "ws_builder.h"
namespace ws
{
@ -16,47 +17,7 @@ namespace ws
kv::Consensus::View term = 0,
kv::Version global_commit = 0)
{
size_t frame_size = ws::OUT_CCF_HEADER_SIZE + body.size();
size_t sz_size = 0;
if (frame_size > 125)
{
sz_size = frame_size > std::numeric_limits<uint16_t>::max() ? 8 : 2;
}
size_t ws_h_size = ws::INITIAL_READ + sz_size;
std::vector<uint8_t> msg(ws_h_size + frame_size);
msg[0] = 0x82;
switch (sz_size)
{
case 0:
{
msg[1] = frame_size;
break;
}
case 2:
{
msg[1] = 0x7e;
*((uint16_t*)&msg[2]) = htons(frame_size);
break;
}
case 8:
{
msg[1] = 0x7f;
*((uint64_t*)&msg[2]) = htobe64(frame_size);
break;
}
default:
throw std::logic_error(fmt::format("Invalid sz_size: {}", sz_size));
}
uint8_t* p = msg.data() + ws_h_size;
size_t s = msg.size() - ws_h_size;
serialized::write<uint16_t>(p, s, code);
serialized::write<size_t>(p, s, commit);
serialized::write<size_t>(p, s, term);
serialized::write<size_t>(p, s, global_commit);
assert(s == body.size());
::memcpy(p, body.data(), s);
return msg;
return make_out_frame(code, commit, term, global_commit, body);
};
static std::vector<uint8_t> error(size_t code, const std::string& msg)
@ -134,43 +95,8 @@ namespace ws
{
if (serialised_request.empty())
{
size_t frame_size = ws::in_header_size(path) + request_body.size();
size_t sz_size = 0;
if (frame_size > 125)
{
sz_size = frame_size > std::numeric_limits<uint16_t>::max() ? 8 : 2;
}
size_t ws_h_size = ws::INITIAL_READ + sz_size;
serialised_request.resize(ws_h_size + frame_size);
serialised_request[0] = 0x82;
switch (sz_size)
{
case 0:
{
serialised_request[1] = frame_size;
break;
}
case 2:
{
serialised_request[1] = 0x7e;
*((uint16_t*)&serialised_request[2]) = htons(frame_size);
break;
}
case 8:
{
serialised_request[1] = 0x7f;
*((uint64_t*)&serialised_request[2]) = htobe64(frame_size);
break;
}
default:
throw std::logic_error(fmt::format("Invalid sz_size: {}", sz_size));
}
uint8_t* p = serialised_request.data() + ws_h_size;
size_t s = serialised_request.size() - ws_h_size;
serialized::write_lps(p, s, path);
assert(s == request_body.size());
::memcpy(p, request_body.data(), s);
auto sr = make_in_frame(path, request_body);
serialised_request.swap(sr);
}
return serialised_request;
}