Perf test session consistency - wait for global commit (#1039)

This commit is contained in:
Eddy Ashton 2020-04-07 10:32:40 +01:00 коммит произвёл GitHub
Родитель ce3d71b638
Коммит 716ae897cc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 575 добавлений и 517 удалений

Просмотреть файл

@ -5,7 +5,18 @@
using namespace std;
using namespace nlohmann;
using Base = client::PerfBase;
struct SmallBankClientOptions : public client::PerfOptions
{
size_t total_accounts = 10;
SmallBankClientOptions(CLI::App& app) :
client::PerfOptions("Small_Bank_ClientCpp", app)
{
app.add_option("--accounts", total_accounts)->capture_default_str();
}
};
using Base = client::PerfBase<SmallBankClientOptions>;
class SmallBankClient : public Base
{
@ -27,21 +38,18 @@ private:
"SmallBank_deposit_checking",
"SmallBank_balance"};
size_t total_accounts = 10;
void print_accounts(const string& header = {})
{
if (!header.empty())
{
cout << header << endl;
LOG_INFO_FMT(header);
}
// Create new connection to read balances
auto conn = create_connection();
auto conn = get_connection();
nlohmann::json accs = nlohmann::json::array();
for (auto i = 0ul; i < total_accounts; i++)
for (auto i = 0ul; i < options.total_accounts; i++)
{
json j;
j["name"] = to_string(i);
@ -52,31 +60,35 @@ private:
accs.push_back({{"account", i}, {"balance", result}});
}
std::cout << accs.dump(4) << std::endl;
LOG_INFO_FMT("Accounts:\n{}", accs.dump(4));
}
void send_creation_transactions(
const std::shared_ptr<RpcTlsClient>& connection) override
std::optional<RpcTlsClient::Response> send_creation_transactions() override
{
const auto from = 0;
const auto to = total_accounts;
const auto to = options.total_accounts;
cout << "Creating accounts: from " << from << " to " << to << endl;
auto connection = get_connection();
LOG_INFO_FMT("Creating accounts from {} to {}", from, to);
json j;
j["from"] = from;
j["to"] = to;
j["checking_amt"] = 1000;
j["savings_amt"] = 1000;
connection->call("SmallBank_create_batch", j);
const auto response = connection->call("SmallBank_create_batch", j);
check_response(response);
return response;
}
void prepare_transactions() override
{
// Reserve space for transfer transactions
prepared_txs.resize(num_transactions);
prepared_txs.resize(options.num_transactions);
for (decltype(num_transactions) i = 0; i < num_transactions; i++)
for (decltype(options.num_transactions) i = 0; i < options.num_transactions;
i++)
{
uint8_t operation =
rand_range((uint8_t)TransactionTypes::NumberTransactions);
@ -86,16 +98,16 @@ private:
switch ((TransactionTypes)operation)
{
case TransactionTypes::TransactSavings:
j["name"] = to_string(rand_range(total_accounts));
j["name"] = to_string(rand_range(options.total_accounts));
j["value"] = rand_range<int>(-50, 50);
break;
case TransactionTypes::Amalgamate:
{
unsigned int src_account = rand_range(total_accounts);
unsigned int src_account = rand_range(options.total_accounts);
j["name_src"] = to_string(src_account);
unsigned int dest_account = rand_range(total_accounts - 1);
unsigned int dest_account = rand_range(options.total_accounts - 1);
if (dest_account >= src_account)
dest_account += 1;
@ -104,17 +116,17 @@ private:
break;
case TransactionTypes::WriteCheck:
j["name"] = to_string(rand_range(total_accounts));
j["name"] = to_string(rand_range(options.total_accounts));
j["value"] = rand_range<int>(50);
break;
case TransactionTypes::DepositChecking:
j["name"] = to_string(rand_range(total_accounts));
j["name"] = to_string(rand_range(options.total_accounts));
j["value"] = rand_range<int>(50) + 1;
break;
case TransactionTypes::GetBalance:
j["name"] = to_string(rand_range(total_accounts));
j["name"] = to_string(rand_range(options.total_accounts));
break;
default:
@ -134,7 +146,10 @@ private:
if (r.status != HTTP_STATUS_OK)
{
const std::string error_msg(r.body.begin(), r.body.end());
if (error_msg.find("Not enough money in savings account") == string::npos)
if (
error_msg.find("Not enough money in savings account") == string::npos &&
error_msg.find("Account already exists in accounts table") ==
string::npos)
{
throw logic_error(error_msg);
return false;
@ -146,26 +161,17 @@ private:
void pre_creation_hook() override
{
if (verbosity >= 1)
{
cout << "Creating " << total_accounts << " accounts..." << endl;
}
LOG_DEBUG_FMT("Creating {} accounts", options.total_accounts);
}
void post_creation_hook() override
{
if (verbosity >= 2)
{
print_accounts("Initial accounts:");
}
LOG_TRACE_FMT("Initial accounts:");
}
void post_timing_body_hook() override
{
if (verbosity >= 2)
{
print_accounts("Final accounts:");
}
LOG_TRACE_FMT("Final accounts:");
}
void verify_params(const nlohmann::json& expected) override
@ -176,13 +182,15 @@ private:
const auto it = expected.find("accounts");
if (it != expected.end())
{
const auto expected_accounts = it->get<decltype(total_accounts)>();
if (expected_accounts != total_accounts)
const auto expected_accounts =
it->get<decltype(options.total_accounts)>();
if (expected_accounts != options.total_accounts)
{
throw std::runtime_error(
"Verification file is only applicable for " +
std::to_string(expected_accounts) +
" accounts, but currently have " + std::to_string(total_accounts));
" accounts, but currently have " +
std::to_string(options.total_accounts));
}
}
}
@ -207,7 +215,7 @@ private:
}
// Create new connection to read balances
auto conn = create_connection();
auto conn = get_connection();
for (const auto& entry : expected)
{
@ -252,23 +260,16 @@ private:
}
public:
SmallBankClient() : Base("Small_Bank_ClientCpp") {}
void setup_parser(CLI::App& app) override
{
Base::setup_parser(app);
app.add_option("--accounts", total_accounts);
}
SmallBankClient(const SmallBankClientOptions& o) : Base(o) {}
};
int main(int argc, char** argv)
{
SmallBankClient client;
CLI::App cli_app{"Small Bank Client"};
client.setup_parser(cli_app);
SmallBankClientOptions options(cli_app);
CLI11_PARSE(cli_app, argc, argv);
SmallBankClient client(options);
client.run();
return 0;

Просмотреть файл

@ -10,11 +10,11 @@
#include "clients/sig_rpc_tls_client.h"
#include "ds/cli_helper.h"
#include "ds/files.h"
#include "ds/logger.h"
// STL/3rdparty
#include <CLI11/CLI11.hpp>
#include <fstream>
#include <iostream>
#include <nlohmann/json.hpp>
#include <random>
#include <thread>
@ -28,27 +28,148 @@ namespace client
int threads = std::thread::hardware_concurrency();
if (core_id > threads || core_id < 0)
{
std::cerr << "Invalid core id: " << core_id << std::endl;
LOG_FATAL_FMT("Invalid core id: {}", core_id);
return false;
}
cpu_set_t set;
std::cout << "Pinning to core:" << core_id << std::endl;
LOG_INFO_FMT("Pinning to core: {}", core_id);
CPU_ZERO(&set);
CPU_SET(core_id, &set);
if (sched_setaffinity(0, sizeof(cpu_set_t), &set) < 0)
{
std::cerr << "Unable to set affinity" << std::endl;
LOG_FATAL_FMT("Unable to set affinity");
return false;
}
return true;
}
struct PerfOptions
{
/// Options set from command line
///@{
std::string label; //< Default set in constructor
cli::ParsedAddress server_address;
std::string cert_file, key_file, ca_file, verification_file;
size_t num_transactions = 10000;
size_t thread_count = 1;
size_t session_count = 1;
size_t max_writes_ahead = 0;
size_t latency_rounds = 1;
size_t generator_seed = 42u;
bool sign = false;
bool no_create = false;
bool no_wait = false;
bool write_tx_times = false;
bool randomise = false;
bool check_responses = false;
bool relax_commit_target = false;
///@}
PerfOptions(const std::string& default_label, CLI::App& app) :
label(default_label)
{
// Enable config from file
app.set_config("--config");
app
.add_option(
"--label",
label,
fmt::format(
"Identifier for this client, written to {}", perf_summary))
->capture_default_str();
// Connection details
cli::add_address_option(
app,
server_address,
"--rpc-address",
"Remote node JSON RPC address to which requests should be sent")
->required(true);
app.add_option("--cert", cert_file)
->required(true)
->check(CLI::ExistingFile);
app.add_option("--pk", key_file)
->required(true)
->check(CLI::ExistingFile);
app.add_option("--ca", ca_file)->required(true)->check(CLI::ExistingFile);
app
.add_option(
"--verify",
verification_file,
"Verify results against expectation, specified in file")
->required(false)
->check(CLI::ExistingFile);
app.add_option("--generator-seed", generator_seed);
// Transaction counts and batching details
app
.add_option(
"--transactions",
num_transactions,
"The basic number of transactions to send (will actually send this "
"many for each thread, in each session)")
->capture_default_str();
app.add_option("-t,--threads", thread_count)->capture_default_str();
app.add_option("-s,--sessions", session_count)->capture_default_str();
app
.add_option(
"--max-writes-ahead",
max_writes_ahead,
"How many transactions the client should send without waiting for "
"responses. 0 will send all transactions before blocking for any "
"responses, 1 will minimise latency by serially waiting for each "
"transaction's response, other values may provide a balance between "
"throughput and latency")
->capture_default_str();
app.add_option("--latency-rounds", latency_rounds)->capture_default_str();
// Boolean flags
app.add_flag("--sign", sign, "Send client-signed transactions")
->capture_default_str();
app
.add_flag("--no-create", no_create, "Skip creation/setup transactions")
->capture_default_str();
app
.add_flag(
"--no-wait",
no_wait,
"Don't wait for transactions to be globally committed")
->capture_default_str();
app
.add_flag(
"--write-tx-times",
write_tx_times,
"Write tx sent and received times to csv")
->capture_default_str();
app
.add_flag(
"--randomise",
randomise,
"Use non-deterministically random transaction contents each run")
->capture_default_str();
app
.add_flag(
"--check-responses",
check_responses,
"Check every JSON response for errors. Potentially slow")
->capture_default_str();
}
};
/** Base class for perf-testing clients. Provides hooks to set initial state,
* prepare a batch of transactions, and then measure the latency and
* throughput of processing those batched transactions */
template <typename TOptions>
class PerfBase
{
private:
@ -60,9 +181,9 @@ namespace client
{
if (tls_cert == nullptr)
{
const auto raw_cert = files::slurp(cert_file);
const auto raw_key = files::slurp(key_file);
const auto ca = files::slurp(ca_file);
const auto raw_cert = files::slurp(options.cert_file);
const auto raw_key = files::slurp(options.key_file);
const auto ca = files::slurp(options.ca_file);
key = tls::Pem(raw_key);
@ -79,7 +200,7 @@ namespace client
// check_response for derived-overridable validation
void process_reply(const RpcTlsClient::Response& reply)
{
if (check_responses)
if (options.check_responses)
{
if (!check_response(reply))
{
@ -87,35 +208,40 @@ namespace client
}
}
std::optional<timing::CommitIDs> commits = std::nullopt;
if (timing.has_value())
if (response_times.is_timing_active() && reply.status == HTTP_STATUS_OK)
{
const auto commit_it = reply.headers.find(http::headers::CCF_COMMIT);
const auto global_it =
reply.headers.find(http::headers::CCF_GLOBAL_COMMIT);
const auto term_it = reply.headers.find(http::headers::CCF_TERM);
// If any of these are missing, we'll write no commits and consider
// this a failed request
if (
commit_it != reply.headers.end() &&
global_it != reply.headers.end() && term_it != reply.headers.end())
{
const size_t commit = std::atoi(commit_it->second.c_str());
const size_t global = std::atoi(global_it->second.c_str());
const size_t term = std::atoi(term_it->second.c_str());
commits.emplace(timing::CommitIDs{commit, global, term});
highest_local_commit = std::max<size_t>(highest_local_commit, commit);
}
const auto commits = timing::parse_commit_ids(reply);
// Record time of received responses
timing->record_receive(reply.id, commits);
response_times.record_receive(reply.id, commits);
if (commits.term < last_response_commit.term)
{
throw std::logic_error(fmt::format(
"Term went backwards (expected {}, saw {})!",
last_response_commit.term,
commits.term));
}
else if (
commits.term > last_response_commit.term &&
commits.local <= last_response_commit.index)
{
throw std::logic_error(fmt::format(
"There has been an election and transactions have "
"been lost! (saw {}.{}, currently at {}.{})",
last_response_commit.term,
last_response_commit.index,
commits.term,
commits.local));
}
last_response_commit = {commits.term, commits.local};
}
}
protected:
TOptions options;
std::mt19937 rand_generator;
nlohmann::json verification_target;
@ -132,30 +258,33 @@ namespace client
std::shared_ptr<RpcTlsClient> rpc_connection;
PreparedTxs prepared_txs;
std::optional<timing::ResponseTimes> timing;
size_t highest_local_commit = 0;
timing::ResponseTimes response_times;
timing::CommitPoint last_response_commit = {0, 0};
std::shared_ptr<RpcTlsClient> create_connection(bool force_unsigned = false)
{
// Create a cert if this is our first rpc_connection
const bool is_first = get_cert();
const auto conn = (sign && !force_unsigned) ?
const auto conn = (options.sign && !force_unsigned) ?
std::make_shared<SigRpcTlsClient>(
key,
server_address.hostname,
server_address.port,
options.server_address.hostname,
options.server_address.port,
nullptr,
tls_cert) :
std::make_shared<RpcTlsClient>(
server_address.hostname, server_address.port, nullptr, tls_cert);
options.server_address.hostname,
options.server_address.port,
nullptr,
tls_cert);
conn->set_prefix("users");
// Report ciphersuite of first client (assume it is the same for each)
if (verbosity >= 1 && is_first)
if (is_first)
{
std::cout << "Connected to server via TLS ("
<< conn->get_ciphersuite_name() << ")" << std::endl;
LOG_DEBUG_FMT(
"Connected to server via TLS ({})", conn->get_ciphersuite_name());
}
return conn;
@ -189,37 +318,14 @@ namespace client
});
}
/// Options set from command line
///@{
std::string label; //< Default set in constructor
cli::ParsedAddress server_address;
std::string cert_file, key_file, ca_file, verification_file;
size_t num_transactions = 10000;
size_t thread_count = 1;
size_t session_count = 1;
size_t max_writes_ahead = 0;
size_t latency_rounds = 1;
size_t verbosity = 0;
size_t generator_seed = 42u;
bool sign = false;
bool no_create = false;
bool no_wait = false;
bool write_tx_times = false;
bool randomise = false;
bool check_responses = false;
bool relax_commit_target = false;
///@}
// Everything else has empty stubs and can optionally be overridden. This
// must be provided by derived class
virtual void prepare_transactions() = 0;
virtual void send_creation_transactions(
const std::shared_ptr<RpcTlsClient>& connection)
{}
virtual std::optional<RpcTlsClient::Response> send_creation_transactions()
{
return std::nullopt;
}
virtual bool check_response(const RpcTlsClient::Response& r)
{
@ -240,10 +346,9 @@ namespace client
size_t written;
kick_off_timing();
std::optional<size_t> end_highest_local_commit;
// Repeat for each session
for (size_t session = 1; session <= session_count; ++session)
for (size_t session = 1; session <= options.session_count; ++session)
{
read = 0;
written = 0;
@ -255,24 +360,23 @@ namespace client
blocking_read(read, written, connection);
// Reconnect for each session (except the last)
if (session != session_count)
if (session != options.session_count)
{
reconnect(connection);
}
}
force_global_commit(connection);
wait_for_global_commit();
auto timing_results = end_timing(end_highest_local_commit);
std::cout << timing::timestamp() << "Timing ended" << std::endl;
wait_for_global_commit(trigger_signature(connection));
auto timing_results = end_timing(last_response_commit.index);
LOG_INFO_FMT("Timing ended");
return timing_results;
}
void kick_off_timing()
{
std::cout << timing::timestamp() << "About to begin timing" << std::endl;
LOG_INFO_FMT("About to begin timing");
begin_timing();
std::cout << timing::timestamp() << "Began timing" << std::endl;
LOG_INFO_FMT("Began timing");
}
inline void write(
@ -282,8 +386,10 @@ namespace client
const std::shared_ptr<RpcTlsClient>& connection)
{
// Record time of sent requests
if (timing.has_value())
timing->record_send(tx.method, tx.rpc.id, tx.expects_commit);
if (response_times.is_timing_active())
{
response_times.record_send(tx.method, tx.rpc.id, tx.expects_commit);
}
connection->write(tx.rpc.encoded);
++written;
@ -303,10 +409,10 @@ namespace client
}
// Do blocking reads if we're beyond our write-ahead limit
if (max_writes_ahead > 0) // 0 is a special value allowing unlimited
// write-ahead
if (options.max_writes_ahead > 0) // 0 is a special value allowing
// unlimited write-ahead
{
while (written - read >= max_writes_ahead)
while (written - read >= options.max_writes_ahead)
{
process_reply(connection->read_response());
++read;
@ -333,19 +439,27 @@ namespace client
connection->connect();
}
void force_global_commit(const std::shared_ptr<RpcTlsClient>& connection)
RpcTlsClient::Response trigger_signature(
const std::shared_ptr<RpcTlsClient>& connection)
{
// End with a mkSign RPC to force a final global commit
// Send a mkSign RPC to trigger next global commit
const auto method = "mkSign";
const auto mk_sign = connection->gen_request(method);
if (timing.has_value())
if (response_times.is_timing_active())
{
timing->record_send(method, mk_sign.id, true);
response_times.record_send(method, mk_sign.id, true);
}
connection->write(mk_sign.encoded);
// Do a blocking read for this final response
process_reply(connection->read_response());
const auto response = connection->read_response();
process_reply(response);
const auto commit_ids = timing::parse_commit_ids(response);
LOG_INFO_FMT(
"Triggered signature at {}.{}", commit_ids.term, commit_ids.local);
return response;
}
virtual void verify_params(const nlohmann::json& expected)
@ -357,13 +471,14 @@ namespace client
const auto it = expected.find("seed");
if (it != expected.end())
{
const auto expected_seed = it->get<decltype(generator_seed)>();
if (expected_seed != generator_seed)
const auto expected_seed =
it->get<decltype(options.generator_seed)>();
if (expected_seed != options.generator_seed)
{
throw std::runtime_error(
"Verification file expects seed " +
std::to_string(expected_seed) + ", but currently using " +
std::to_string(generator_seed));
throw std::runtime_error(fmt::format(
"Verification file expects seed {}, but currently using {}",
expected_seed,
options.generator_seed));
}
}
}
@ -372,14 +487,15 @@ namespace client
const auto it = expected.find("transactions");
if (it != expected.end())
{
const auto expected_txs = it->get<decltype(num_transactions)>();
if (expected_txs != num_transactions)
const auto expected_txs =
it->get<decltype(options.num_transactions)>();
if (expected_txs != options.num_transactions)
{
throw std::runtime_error(
"Verification file is only applicable for " +
std::to_string(expected_txs) +
" transactions, but currently running " +
std::to_string(num_transactions));
throw std::runtime_error(fmt::format(
"Verification file is only applicable for {} transactions, but "
"currently running {}",
expected_txs,
options.num_transactions));
}
}
}
@ -388,14 +504,15 @@ namespace client
const auto it = expected.find("sessions");
if (it != expected.end())
{
const auto expected_sessions = it->get<decltype(session_count)>();
if (expected_sessions != session_count)
const auto expected_sessions =
it->get<decltype(options.session_count)>();
if (expected_sessions != options.session_count)
{
throw std::runtime_error(
"Verification file is only applicable for " +
std::to_string(expected_sessions) +
" sessions, but currently running " +
std::to_string(session_count));
throw std::runtime_error(fmt::format(
"Verification file is only applicable for {} sessions, but "
"currently running {}",
expected_sessions,
options.session_count));
}
}
}
@ -408,13 +525,13 @@ namespace client
expected_randomise = it->get<bool>();
}
if (expected_randomise != randomise)
if (expected_randomise != options.randomise)
{
throw std::runtime_error(
"Verification file is only applicable when randomisation is " +
std::string(expected_randomise ? "ON" : "OFF") +
", but this option is currently " +
std::string(randomise ? "ON" : "OFF"));
throw std::runtime_error(fmt::format(
"Verification file is only applicable when randomisation is {}, "
"but this option is currently {}",
expected_randomise ? "ON" : "OFF",
options.randomise ? "ON" : "OFF"));
}
}
}
@ -422,88 +539,14 @@ namespace client
virtual void verify_final_state(const nlohmann::json& expected) {}
public:
PerfBase(const std::string& default_label) :
label(default_label),
rand_generator()
PerfBase(const TOptions& o) :
options(o),
rand_generator(),
// timing gets its own new connection for any requests it wants to send -
// these are never signed
response_times(create_connection())
{}
virtual void setup_parser(CLI::App& app)
{
// Enable config from file
app.set_config("--config");
app.add_option(
"--label",
label,
"Identifier for this client, written to " + std::string(perf_summary));
// Connection details
cli::add_address_option(
app,
server_address,
"--rpc-address",
"Remote node JSON RPC address to which requests should be sent")
->required(true);
app.add_option("--cert", cert_file)
->required(true)
->check(CLI::ExistingFile);
app.add_option("--pk", key_file)
->required(true)
->check(CLI::ExistingFile);
app.add_option("--ca", ca_file)->required(true)->check(CLI::ExistingFile);
app
.add_option(
"--verify",
verification_file,
"Verify results against expectation, specified in file")
->required(false)
->check(CLI::ExistingFile);
app.add_option("--generator-seed", generator_seed);
// Transaction counts and batching details
app.add_option(
"--transactions",
num_transactions,
"The basic number of transactions to send (will actually send this "
"many for each thread, in each session)");
app.add_option("-t,--threads", thread_count);
app.add_option("-s,--sessions", session_count);
app.add_option(
"--max-writes-ahead",
max_writes_ahead,
"How many transactions the client should send without waiting for "
"responses. 0 will send all transactions before blocking for any "
"responses, 1 will minimise latency by serially waiting for each "
"transaction's response, other values may provide a balance between "
"throughput and latency");
app.add_option("--latency-rounds", latency_rounds);
app.add_flag("-v,-V,--verbose", verbosity);
// Boolean flags
app.add_flag("--sign", sign, "Send client-signed transactions");
app.add_flag(
"--no-create", no_create, "Skip creation/setup transactions");
app.add_flag(
"--no-wait",
no_wait,
"Don't wait for transactions to be globally committed");
app.add_flag(
"--write-tx-times",
write_tx_times,
"Write tx sent and received times to csv");
app.add_flag(
"--randomise",
randomise,
"Use non-deterministically random transaction contents each run");
app.add_flag(
"--check-responses",
check_responses,
"Check every JSON response for errors. Potentially slow");
}
void init_connection()
{
// Make sure the connection we're about to use has been initialised
@ -513,20 +556,30 @@ namespace client
}
}
std::shared_ptr<RpcTlsClient> get_connection()
{
init_connection();
return rpc_connection;
}
void send_all_creation_transactions()
{
if (!no_create)
if (!options.no_create)
{
try
{
// Create a new connection for these, rather than a connection which
// will be reused for bulk transactions later
send_creation_transactions(create_connection());
const auto last_response = send_creation_transactions();
if (last_response.has_value())
{
// Ensure creation transactions are globally committed before
// proceeding
wait_for_global_commit(trigger_signature(get_connection()));
}
}
catch (std::exception& e)
{
std::cout << "Exception during creation steps: " << e.what()
<< std::endl;
LOG_FAIL_FMT("Exception during creation steps: {}", e.what());
throw e;
}
}
@ -541,7 +594,7 @@ namespace client
}
catch (std::exception& e)
{
std::cout << "Preparation exception: " << e.what() << std::endl;
LOG_FAIL_FMT("Preparation exception: {}", e.what());
throw e;
}
}
@ -549,7 +602,6 @@ namespace client
timing::Results send_all_prepared_transactions()
{
init_connection();
try
{
// ...send any transactions which were previously prepared
@ -557,62 +609,59 @@ namespace client
}
catch (std::exception& e)
{
std::cout << "Transaction exception: " << e.what() << std::endl;
LOG_FAIL_FMT("Transaction exception: {}", e.what());
throw e;
}
}
void wait_for_global_commit()
void wait_for_global_commit(const timing::CommitPoint& target)
{
if (!no_wait)
if (!options.no_wait)
{
if (!timing.has_value())
{
throw std::logic_error("Unexpected call to wait_for_global_commit");
}
auto commit = timing->wait_for_global_commit({highest_local_commit});
if (verbosity >= 1)
{
std::cout << timing::timestamp() << "Reached stable global commit at "
<< commit << std::endl;
}
response_times.wait_for_global_commit(target);
}
}
void wait_for_global_commit(const RpcTlsClient::Response& response)
{
check_response(response);
const auto response_commit_ids = timing::parse_commit_ids(response);
const timing::CommitPoint cp{response_commit_ids.term,
response_commit_ids.local};
wait_for_global_commit(cp);
}
void begin_timing()
{
if (timing.has_value())
if (response_times.is_timing_active())
{
throw std::logic_error(
"timing is already set - has begin_timing been called multiple "
"times?");
}
// timing gets its own new connection for any requests it wants to send -
// these are never signed
timing.emplace(create_connection(true));
timing->reset_start_time();
response_times.start_timing();
}
timing::Results end_timing(std::optional<size_t> end_highest_local_commit)
timing::Results end_timing(size_t end_highest_local_commit)
{
if (!timing.has_value())
if (!response_times.is_timing_active())
{
throw std::logic_error(
"timing is not set - has begin_timing not been called?");
}
auto results = timing->produce_results(
no_wait, end_highest_local_commit, latency_rounds);
auto results = response_times.produce_results(
options.no_wait, end_highest_local_commit, options.latency_rounds);
if (write_tx_times)
if (options.write_tx_times)
{
timing->write_to_file(label);
response_times.write_to_file(options.label);
}
timing.reset();
response_times.stop_timing();
return results;
}
@ -629,39 +678,36 @@ namespace client
const auto duration = dur_ms / 1000.0;
const auto tx_per_sec = total_txs / duration;
cout << total_txs << " transactions took " << dur_ms << "ms." << endl;
cout << "\t=> " << tx_per_sec << "tx/s" << endl;
LOG_INFO_FMT(
"{} transactions took {}ms.\n"
"=> {}tx/s\n", //< This is grepped for by _print_upload_perf in Python
total_txs,
dur_ms,
tx_per_sec);
// Write latency information, depending on verbosity
if (verbosity >= 1)
LOG_DEBUG_FMT(
" Sends: {}\n"
" Receives: {}\n"
" All txs (local_commit): {}\n"
" Global commit: {}\n",
timing_results.total_sends,
timing_results.total_receives,
timing_results.total_local_commit,
timing_results.total_global_commit);
for (size_t round = 0; round < timing_results.per_round.size(); ++round)
{
const auto indent_1 = " ";
const auto& round_info = timing_results.per_round[round];
cout << "Sends: " << timing_results.total_sends << endl;
cout << "Receives: " << timing_results.total_receives << endl;
cout << indent_1
<< "All txs (local commit): " << timing_results.total_local_commit
<< endl;
cout << indent_1
<< "Global commit: " << timing_results.total_global_commit << endl;
if (verbosity >= 2 && !timing_results.per_round.empty())
{
const auto indent_2 = " ";
for (size_t round = 0; round < timing_results.per_round.size();
++round)
{
const auto& round_info = timing_results.per_round[round];
cout << indent_1 << "Round " << round << " (req ids #"
<< round_info.begin_rpc_id << " to #" << round_info.end_rpc_id
<< ")" << endl;
cout << indent_2 << "Local: " << round_info.local_commit << endl;
cout << indent_2 << "Global: " << round_info.global_commit << endl;
}
}
LOG_TRACE_FMT(
" Round {} (req ids #{} to #{})\n"
" Local: {}\n"
" Global: {}\n",
round,
round_info.begin_rpc_id,
round_info.end_rpc_id,
round_info.local_commit,
round_info.global_commit);
}
// Write perf summary to csv
@ -671,18 +717,19 @@ namespace client
{
// Total number of bytes sent is:
// sessions * sum-per-tx of tx-bytes)
const auto total_bytes = session_count * total_byte_size(prepared_txs);
const auto total_bytes =
options.session_count * total_byte_size(prepared_txs);
perf_summary_csv << duration_cast<milliseconds>(
timing_results.start_time.time_since_epoch())
.count(); // timeStamp
perf_summary_csv << "," << dur_ms; // elapsed
perf_summary_csv << ","
<< (server_address.hostname.find("127.") == 0 ?
label :
label + string("_distributed")); // label
<< (options.server_address.hostname.find("127.") == 0 ?
options.label :
options.label + string("_distributed")); // label
perf_summary_csv << "," << total_bytes; // bytes
perf_summary_csv << "," << thread_count; // allThreads
perf_summary_csv << "," << options.thread_count; // allThreads
perf_summary_csv << "," << (double)dur_ms / total_txs; // latency
perf_summary_csv << "," << total_txs; // SampleCount
@ -700,28 +747,28 @@ namespace client
virtual void run()
{
if (randomise)
if (options.randomise)
{
generator_seed = std::random_device()();
options.generator_seed = std::random_device()();
}
std::cout << "Random choices determined by seed: " << generator_seed
<< std::endl;
rand_generator.seed(generator_seed);
LOG_INFO_FMT(
"Random choices determined by seed: {}", options.generator_seed);
rand_generator.seed(options.generator_seed);
/*
const auto target_core = 0;
if (!pin_to_core(target_core))
{
std::cout << "Failed to pin to core: " << target_core << std::endl;
LOG_FAIL_FMT("Failed to pin to core: {}", target_core);
}
*/
const bool verifying = !verification_file.empty();
const bool verifying = !options.verification_file.empty();
if (verifying)
{
verification_target = files::slurp_json(verification_file);
verification_target = files::slurp_json(options.verification_file);
verify_params(verification_target["params"]);
}
@ -739,20 +786,15 @@ namespace client
pre_timing_body_hook();
if (verbosity >= 1)
{
std::cout << std::endl
<< "Sending " << num_transactions << " transactions from "
<< thread_count << " clients " << session_count << " times..."
<< std::endl;
}
LOG_TRACE_FMT(
"Sending {} transactions from {} clients {} times...",
options.num_transactions,
options.thread_count,
options.session_count);
auto timing_results = send_all_prepared_transactions();
if (verbosity >= 1)
{
std::cout << "Done" << std::endl;
}
LOG_INFO_FMT("Done");
post_timing_body_hook();

Просмотреть файл

@ -6,20 +6,35 @@
#include <nlohmann/json.hpp>
#include <string>
using Base = client::PerfBase;
struct ScenarioPerfClientOptions : public client::PerfOptions
{
size_t repetitions = 1;
std::string scenario_file;
ScenarioPerfClientOptions(CLI::App& app) :
client::PerfOptions("scenario_perf", app)
{
app.add_option("--repetitions", repetitions)->capture_default_str();
app.add_option("--scenario-file", scenario_file)
->required(true)
->check(CLI::ExistingFile);
}
};
using Base = client::PerfBase<ScenarioPerfClientOptions>;
class ScenarioPerfClient : public Base
{
private:
size_t repetitions = 1;
std::string scenario_file;
nlohmann::json scenario_json;
void send_verbose_transactions(
RpcTlsClient::Response send_verbose_transactions(
const std::shared_ptr<RpcTlsClient>& connection, char const* element_name)
{
const auto it = scenario_json.find(element_name);
RpcTlsClient::Response response;
if (it != scenario_json.end())
{
const auto transactions = *it;
@ -31,44 +46,36 @@ private:
element_name));
}
std::cout << fmt::format(
"Sending {} {} transactions",
transactions.size(),
element_name)
<< std::endl;
LOG_INFO_FMT(
"Sending {} {} transactions", transactions.size(), element_name);
for (const auto& transaction : transactions)
{
const auto method = transaction["method"];
const auto params = transaction["params"];
std::cout << fmt::format("Sending {}: {}", method, params.dump(2))
<< std::endl;
const auto response = connection->call(method, params);
LOG_INFO_FMT("Sending {}: {}", method, params.dump(2));
response = connection->call(method, params);
const auto response_body = connection->unpack_body(response);
std::cout << fmt::format(
"Response: {} {}",
response.status,
response_body.dump(2))
<< std::endl;
LOG_INFO_FMT("Response: {} {}", response.status, response_body.dump(2));
}
}
return response;
}
void pre_creation_hook() override
{
scenario_json = files::slurp_json(scenario_file);
scenario_json = files::slurp_json(options.scenario_file);
}
void send_creation_transactions(
const std::shared_ptr<RpcTlsClient>& connection) override
std::optional<RpcTlsClient::Response> send_creation_transactions() override
{
send_verbose_transactions(connection, "setup");
return send_verbose_transactions(get_connection(), "setup");
}
void post_timing_body_hook() override
{
const auto connection = create_connection();
send_verbose_transactions(connection, "cleanup");
send_verbose_transactions(get_connection(), "cleanup");
}
void prepare_transactions() override
@ -85,9 +92,9 @@ private:
}
// Reserve space for transactions
prepared_txs.reserve(transactions.size() * repetitions);
prepared_txs.reserve(transactions.size() * options.repetitions);
for (size_t r = 0; r < repetitions; ++r)
for (size_t r = 0; r < options.repetitions; ++r)
{
for (size_t i = 0; i < transactions.size(); ++i)
{
@ -100,26 +107,16 @@ private:
}
public:
ScenarioPerfClient() : Base("scenario_perf") {}
void setup_parser(CLI::App& app) override
{
Base::setup_parser(app);
app.add_option("--repetitions", repetitions);
app.add_option("--scenario-file", scenario_file)
->required(true)
->check(CLI::ExistingFile);
}
ScenarioPerfClient(const ScenarioPerfClientOptions& o) : Base(o) {}
};
int main(int argc, char** argv)
{
ScenarioPerfClient client;
CLI::App cli_app{"Scenario Perf Client"};
client.setup_parser(cli_app);
ScenarioPerfClientOptions options(cli_app);
CLI11_PARSE(cli_app, argc, argv);
ScenarioPerfClient client(options);
client.run();
return 0;

Просмотреть файл

@ -49,6 +49,12 @@ namespace timing
double variance;
};
struct CommitPoint
{
size_t term;
size_t index;
};
std::string timestamp()
{
std::stringstream ss;
@ -125,6 +131,36 @@ namespace timing
vector<PerRound> per_round;
};
static CommitIDs parse_commit_ids(const RpcTlsClient::Response& response)
{
const auto& h = response.headers;
const auto local_commit_it = h.find(http::headers::CCF_COMMIT);
if (local_commit_it == h.end())
{
throw std::logic_error("Missing commit response header");
}
const auto global_commit_it = h.find(http::headers::CCF_GLOBAL_COMMIT);
if (global_commit_it == h.end())
{
throw std::logic_error("Missing global commit response header");
}
const auto term_it = h.find(http::headers::CCF_TERM);
if (term_it == h.end())
{
throw std::logic_error("Missing term response header");
}
const auto local =
std::strtoul(local_commit_it->second.c_str(), nullptr, 0);
const auto global =
std::strtoul(global_commit_it->second.c_str(), nullptr, 0);
const auto term = std::strtoul(term_it->second.c_str(), nullptr, 0);
return {local, global, term};
}
class ResponseTimes
{
const shared_ptr<RpcTlsClient> net_client;
@ -133,51 +169,7 @@ namespace timing
vector<SentRequest> sends;
vector<ReceivedReply> receives;
bool try_get_commit(
const shared_ptr<RpcTlsClient>& client,
size_t& local,
size_t& global,
size_t& term,
bool record = false)
{
const auto r = client->call("getCommit", nlohmann::json::object());
if (record)
{
record_send("getCommit", r.id, false);
}
if (r.status != HTTP_STATUS_OK)
{
const auto body = client->unpack_body(r);
throw runtime_error("getCommit failed with error: " + body.dump());
}
const auto& h = r.headers;
const auto local_commit_it = h.find(http::headers::CCF_COMMIT);
if (local_commit_it == h.end())
return false;
const auto global_commit_it = h.find(http::headers::CCF_GLOBAL_COMMIT);
if (global_commit_it == h.end())
return false;
const auto term_it = h.find(http::headers::CCF_TERM);
if (term_it == h.end())
return false;
local = std::atoi(local_commit_it->second.c_str());
global = std::atoi(global_commit_it->second.c_str());
term = std::atoi(term_it->second.c_str());
if (record)
{
record_receive(r.id, {{local, global, term}});
}
return true;
}
bool active = false;
public:
ResponseTimes(const shared_ptr<RpcTlsClient>& client) :
@ -187,11 +179,22 @@ namespace timing
ResponseTimes(const ResponseTimes& other) = default;
void reset_start_time()
void start_timing()
{
active = true;
start_time = Clock::now();
}
bool is_timing_active()
{
return active;
}
void stop_timing()
{
active = false;
}
auto get_start_time() const
{
return start_time;
@ -209,35 +212,90 @@ namespace timing
receives.push_back({Clock::now() - start_time, rpc_id, commit});
}
// Repeatedly calls getCommit RPC until local and global_commit match, then
// returns that commit. Calls received_response for each response.
size_t wait_for_global_commit(
std::optional<size_t> at_least = {}, bool record = true)
// Repeatedly calls getCommit RPC until the target index has been committed
// (or will never be committed), checks it was in expected term, returns
// first confirming response. Calls received_response for each response, if
// record is true. Throws on errors, or if target is rolled back
RpcTlsClient::Response wait_for_global_commit(
const CommitPoint& target, bool record = true)
{
size_t local = 0u;
size_t global = 0u;
size_t term = 0u;
auto params = nlohmann::json::object();
params["commit"] = target.index;
bool success = try_get_commit(net_client, local, global, term, true);
auto target = at_least.has_value() ? max(*at_least, local) : local;
constexpr auto get_commit = "getCommit";
using LastPrinted = std::pair<decltype(target), decltype(global)>;
std::optional<LastPrinted> last_printed = std::nullopt;
LOG_INFO_FMT(
"Waiting for global commit {}.{}", target.term, target.index);
while (!success || (global < target))
while (true)
{
auto current = std::make_pair(target, global);
if (!last_printed.has_value() || *last_printed != current)
{
std::cout << timestamp() << "Waiting for " << target << ", at "
<< global << std::endl;
last_printed = current;
}
this_thread::sleep_for(10us);
success = try_get_commit(net_client, local, global, term, record);
}
const auto response = net_client->call(get_commit, params);
return global;
if (record)
{
record_send(get_commit, response.id, false);
}
const auto body = net_client->unpack_body(response);
if (response.status != HTTP_STATUS_OK)
{
throw runtime_error(fmt::format(
"getCommit failed with status {}: {}",
http_status_str(response.status),
body.dump()));
}
const auto commit_ids = parse_commit_ids(response);
if (record)
{
record_receive(response.id, commit_ids);
}
const auto response_term = body["term"].get<size_t>();
if (response_term == 0)
{
// Commit is pending, poll again
this_thread::sleep_for(10us);
continue;
}
else if (response_term < target.term)
{
throw std::logic_error(fmt::format(
"Unexpected situation - {} was committed in old term {} (expected "
"{})",
target.index,
response_term,
target.term));
}
else if (response_term == target.term)
{
// Good, this target commit was committed in the expected term
if (commit_ids.global >= target.index)
{
// ...and this commit has been globally committed
LOG_INFO_FMT(
"Found global commit {}.{}", target.term, target.index);
LOG_INFO_FMT(
" (headers term: {}, local: {}, global: {}",
commit_ids.term,
commit_ids.local,
commit_ids.global);
return response;
}
// else global commit is still pending
continue;
}
else
{
throw std::logic_error(fmt::format(
"Pending transaction was dropped! Looking for {}.{}, but term has "
"advanced to {}",
target.term,
target.index,
response_term));
}
}
}
Results produce_results(
@ -271,16 +329,16 @@ namespace timing
{
if (receive.commit->global >= highest_local_commit.value())
{
std::cout << "global commit match: " << receive.commit->global
<< " for highest local commit: "
<< highest_local_commit.value() << std::endl;
LOG_INFO_FMT(
"Global commit match {} for highest local commit {}",
receive.commit->global,
highest_local_commit.value());
auto was =
duration_cast<milliseconds>(end_time_delta).count() / 1000.0;
auto is =
duration_cast<milliseconds>(receive.receive_time).count() /
1000.0;
std::cout << "duration changing from: " << was << " s to: " << is
<< " s" << std::endl;
LOG_INFO_FMT("Duration changing from {}s to {}s", was, is);
end_time_delta = receive.receive_time;
break;
}
@ -345,9 +403,11 @@ namespace timing
if (tx_latency < 0)
{
std::cerr << "Calculated a negative latency (" << tx_latency
<< ") for RPC " << receive.rpc_id
<< " - duplicate ID causing mismatch?" << std::endl;
LOG_FAIL_FMT(
"Calculated a negative latency ({}) for RPC {} - duplicate "
"ID causing mismatch?",
tx_latency,
receive.rpc_id);
continue;
}
@ -424,11 +484,12 @@ namespace timing
if (!pending_global_commits.empty())
{
const auto& first = pending_global_commits[0];
throw runtime_error(
"Still waiting for " + to_string(pending_global_commits.size()) +
" global commits. First expected is " +
to_string(first.target_commit) + " for a transaction sent at " +
to_string(first.send_time.count()));
throw runtime_error(fmt::format(
"Still waiting for {} global commits. First expected is {} for "
"a transaction sent at {}",
pending_global_commits.size(),
first.target_commit,
first.send_time.count()));
}
}
@ -436,10 +497,10 @@ namespace timing
const auto actual_local_samples = round_local_commit.size();
if (actual_local_samples != expected_local_samples)
{
throw runtime_error(
"Measured " + to_string(actual_local_samples) +
" response times, yet sent " + to_string(expected_local_samples) +
" requests");
throw runtime_error(fmt::format(
"Measured {} response times, yet sent {} requests",
actual_local_samples,
expected_local_samples));
}
}
@ -455,7 +516,7 @@ namespace timing
void write_to_file(const string& filename)
{
std::cout << "Writing timing data to file" << std::endl;
LOG_INFO_FMT("Writing timing data to files");
const auto sent_path = filename + "_sent.csv";
ofstream sent_csv(sent_path, ofstream::out);
@ -467,8 +528,7 @@ namespace timing
sent_csv << sent.send_time.count() << "," << sent.rpc_id << ","
<< sent.method << "," << sent.expects_commit << endl;
}
std::cout << "Wrote " << sends.size() << " entries to " << sent_path
<< std::endl;
LOG_INFO_FMT("Wrote {} entries to {}", sends.size(), sent_path);
}
const auto recv_path = filename + "_recv.csv";
@ -497,8 +557,7 @@ namespace timing
}
recv_csv << endl;
}
std::cout << "Wrote " << receives.size() << " entries to " << recv_path
<< std::endl;
LOG_INFO_FMT("Wrote {} entries to {}", receives.size(), recv_path);
}
}
};

Просмотреть файл

@ -38,7 +38,7 @@ def build_bin_path(bin_name, enclave_type=None, binary_dir="."):
if enclave_type == "virtual":
bin_name = f"{bin_name}.virtual"
return os.path.join(binary_dir, bin_name)
return os.path.join(binary_dir, os.path.normpath(bin_name))
def default_workspace():

Просмотреть файл

@ -315,7 +315,7 @@ class SSHRemote(CmdMixin):
def _cmd(self):
env = " ".join(f"{key}={value}" for key, value in self.env.items())
cmd = " ".join(self.cmd)
return f"cd {self.root} && {env} {cmd} 1>{self.out} 2>{self.err} 0</dev/null"
return f"cd {self.root} && {env} {cmd} 1> {self.out} 2> {self.err} 0< /dev/null"
def _dbg(self):
cmd = " ".join(self.cmd)
@ -327,33 +327,18 @@ class SSHRemote(CmdMixin):
client.connect(self.hostname)
return client
def wait_for_stdout_line(self, line, timeout):
def check_done(self):
client = self._connect_new()
try:
end_time = time.time() + timeout
while time.time() < end_time:
_, stdout, _ = client.exec_command(f"grep -F '{line}' {self.out}")
if stdout.channel.recv_exit_status() == 0:
return
time.sleep(0.1)
raise ValueError(f"{line} not found in stdout after {timeout} seconds")
_, stdout, _ = client.exec_command(f"ps -p {self.pid()}")
return std.out.channel.recv_exit_status() == 0
finally:
client.close()
def check_for_stdout_line(self, line, timeout):
client = self._connect_new()
end_time = time.time() + timeout
while time.time() < end_time:
_, stdout, _ = client.exec_command(f"grep -F '{line}' {self.out}")
if stdout.channel.recv_exit_status() == 0:
return True
time.sleep(0.1)
return False
def print_and_upload_result(self, name, metrics, lines):
def print_and_upload_result(self, name, metrics, line_count):
client = self._connect_new()
try:
_, stdout, _ = client.exec_command(f"tail -{lines} {self.out}")
_, stdout, _ = client.exec_command(f"tail -{line_count} {self.out}")
if stdout.channel.recv_exit_status() == 0:
LOG.success(f"Result for {self.name}, uploaded under {name}:")
self._print_upload_perf(name, metrics, stdout.read().splitlines())
@ -487,38 +472,19 @@ class LocalRemote(CmdMixin):
def _cmd(self):
cmd = " ".join(self.cmd)
return f"cd {self.root} && {cmd} 1>{self.out} 2>{self.err}"
return f"cd {self.root} && {cmd} 1> {self.out} 2> {self.err}"
def _dbg(self):
cmd = " ".join(self.cmd)
return f"cd {self.root} && {DBG} --args {cmd}"
def wait_for_stdout_line(self, line, timeout):
end_time = time.time() + timeout
while time.time() < end_time:
with open(self.out, "rb") as out:
for out_line in out:
if line in out_line.decode():
return
time.sleep(0.1)
raise ValueError(
"{} not found in stdout after {} seconds".format(line, timeout)
)
def check_done(self):
return self.proc.poll() is not None
def check_for_stdout_line(self, line, timeout):
end_time = time.time() + timeout
while time.time() < end_time:
with open(self.out, "rb") as out:
for out_line in out:
if line in out_line.decode():
return True
time.sleep(0.1)
return False
def print_and_upload_result(self, name, metrics, line):
def print_and_upload_result(self, name, metrics, line_count):
with open(self.out, "rb") as out:
lines = out.read().splitlines()
result = lines[-line:]
result = lines[-line_count:]
LOG.success(f"Result for {self.name}, uploaded under {name}:")
self._print_upload_perf(name, metrics, result)
@ -734,11 +700,11 @@ class CCFRemote(object):
LOG.info(f"Could not retrieve {self.profraw}")
return errors, fatal_errors
def wait_for_stdout_line(self, line, timeout=5):
return self.remote.wait_for_stdout_line(line, timeout)
def check_done(self):
return self.remote.check_done()
def print_and_upload_result(self, name, metrics, lines):
self.remote.print_and_upload_result(name, metrics, lines)
def print_and_upload_result(self, name, metrics, line_count):
self.remote.print_and_upload_result(name, metrics, line_count)
def set_perf(self):
self.remote.set_perf()

Просмотреть файл

@ -97,15 +97,8 @@ class CCFRemoteClient(object):
except Exception:
LOG.exception("Failed to shut down {} cleanly".format(self.name))
def wait(self):
try:
self.remote.wait_for_stdout_line(line="Global commit", timeout=5)
except Exception:
LOG.exception("Failed to wait on client {}".format(self.name))
raise
def check_done(self):
return self.remote.check_for_stdout_line(line="Global commit", timeout=5)
return self.remote.check_done()
def print_and_upload_result(self, name, metrics):
self.remote.print_and_upload_result(name, metrics, self.LINES_RESULT_FROM_END)

Просмотреть файл

@ -133,7 +133,7 @@ def run(get_command, args):
done = remote_client.check_done()
# all the clients need to be done
LOG.info(
f"Client {i} has {'completed' if done else 'not completed'} running"
f"Client {i} has {'completed' if done else 'not completed'} running ({time.time() - start_time:.2f}s / {hard_stop_timeout}s)"
)
stop_waiting = stop_waiting and done
if stop_waiting:
@ -143,7 +143,7 @@ def run(get_command, args):
f"Client still running after {hard_stop_timeout}s"
)
time.sleep(0.1)
time.sleep(5)
tx_rates.get_metrics()
for remote_client in clients: