This commit is contained in:
olgavrou 2019-05-21 09:27:22 +01:00 коммит произвёл GitHub
Родитель b10b931e7b
Коммит 3d0eb8fc36
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 226 добавлений и 116 удалений

Просмотреть файл

@ -1,7 +1,8 @@
steps:
- script: |
gzip -vk *_sent.csv *_recv.csv
displayName: Compress tx time csvs
gzip -vk *_metrics.json
displayName: Compress tx time metrics
workingDirectory: build
- task: AzureCLI@1
@ -14,4 +15,5 @@ steps:
az storage blob upload -f perf_summary.csv -c perf -n $AGENT_NAME/${BUILD_SOURCEBRANCHNAME}_${BUILD_BUILDNUMBER}.csv --account-name ccfperf --subscription $(subscription)
az storage blob upload-batch -d tx-times/$AGENT_NAME/${BUILD_SOURCEBRANCHNAME}_${BUILD_BUILDNUMBER} -s. --pattern "*_sent.csv.gz" --account-name ccfperf --subscription $(subscription)
az storage blob upload-batch -d tx-times/$AGENT_NAME/${BUILD_SOURCEBRANCHNAME}_${BUILD_BUILDNUMBER} -s. --pattern "*_recv.csv.gz" --account-name ccfperf --subscription $(subscription)
az storage blob upload-batch -d metrics/$AGENT_NAME/${BUILD_SOURCEBRANCHNAME}_${BUILD_BUILDNUMBER} -s. --pattern "*_metrics.json.gz" --account-name ccfperf --subscription $(subscription)
workingDirectory: build

Просмотреть файл

@ -85,7 +85,6 @@ if(DEBUG_CONFIG)
add_definitions(-DDEBUG_CONFIG)
endif()
option(USE_NLJSON_KV_SERIALISER "Use nlohmann JSON as the KV serialiser" OFF)
if (USE_NLJSON_KV_SERIALISER)
add_definitions(-DUSE_NLJSON_KV_SERIALISER)

Просмотреть файл

@ -22,6 +22,7 @@ if(BUILD_TESTS)
VERIFICATION_FILE ${CMAKE_CURRENT_LIST_DIR}/tests/verify_small_bank.json
ADDITIONAL_ARGS
--max-writes-ahead 1000
--metrics-file small_bank_metrics.json
)
add_perf_test(
@ -33,6 +34,7 @@ if(BUILD_TESTS)
ADDITIONAL_ARGS
--label Small_Bank_Client_Sigs
--max-writes-ahead 1000 --sign
--metrics-file small_bank_sigs_metrics.json
)
add_perf_test(
@ -43,5 +45,6 @@ if(BUILD_TESTS)
ADDITIONAL_ARGS
--label Small_Bank_WarmupCooldown
--max-writes-ahead 1 --warmup 1000 --cooldown 1000
--metrics-file small_bank_wc_metrics.json
)
endif()

Просмотреть файл

@ -71,11 +71,11 @@ namespace ccf
};
};
struct GetTxHist
struct GetMetrics
{
struct Out
{
nlohmann::json tx_hist;
nlohmann::json metrics;
};
};
}

Просмотреть файл

@ -17,7 +17,7 @@ namespace ccf
struct GeneralProcs
{
static constexpr auto GET_COMMIT = "getCommit";
static constexpr auto GET_TX_HIST = "getTxHist";
static constexpr auto GET_METRICS = "getMetrics";
static constexpr auto MK_SIGN = "mkSign";
static constexpr auto GET_LEADER_INFO = "getLeaderInfo";
};

Просмотреть файл

@ -3,10 +3,10 @@
#pragma once
#include "consts.h"
#include "ds/buffer.h"
#include "ds/histogram.h"
#include "enclave/rpchandler.h"
#include "forwarder.h"
#include "jsonrpc.h"
#include "metrics.h"
#include "node/certs.h"
#include "node/clientsignatures.h"
#include "node/consensus.h"
@ -18,10 +18,6 @@
#include <utility>
#include <vector>
#define HIST_MAX (1 << 17)
#define HIST_MIN 1
#define HIST_BUCKET_GRANULARITY 5
namespace ccf
{
class RpcFrontend : public enclave::RpcHandler, public ForwardedRpcHandler
@ -74,15 +70,10 @@ namespace ccf
kv::TxHistory* history;
size_t sig_max_tx = 1000;
size_t tx_count = 0;
using Hist =
histogram::Histogram<int, HIST_MIN, HIST_MAX, HIST_BUCKET_GRANULARITY>;
histogram::Global<Hist> global =
histogram::Global<Hist>("histogram", __FILE__, __LINE__);
Hist histogram = Hist(global);
std::chrono::milliseconds sig_max_ms = std::chrono::milliseconds(1000);
std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
bool request_storing_disabled = false;
metrics::Metrics metrics;
void update_raft()
{
@ -207,20 +198,9 @@ namespace ccf
"Failed to get commit info from Raft");
};
auto get_tx_hist = [this](Store::Tx& tx, const nlohmann::json& params) {
nlohmann::json result;
nlohmann::json hist;
result["low"] = histogram.get_low();
result["high"] = histogram.get_high();
result["overflow"] = histogram.get_overflow();
result["underflow"] = histogram.get_underflow();
auto range_counts = histogram.get_range_count();
for (auto const& [range, count] : range_counts)
{
hist[range] = count;
}
result["histogram"] = hist;
return jsonrpc::success(GetTxHist::Out{result});
auto get_metrics = [this](Store::Tx& tx, const nlohmann::json& params) {
auto result = metrics.get_metrics();
return jsonrpc::success(GetMetrics::Out{result});
};
auto make_signature =
@ -261,7 +241,7 @@ namespace ccf
};
install(GeneralProcs::GET_COMMIT, get_commit, Read);
install(GeneralProcs::GET_TX_HIST, get_tx_hist, Read);
install(GeneralProcs::GET_METRICS, get_metrics, Read);
install(GeneralProcs::MK_SIGN, make_signature, Write);
install(GeneralProcs::GET_LEADER_INFO, get_leader_info, Read);
}
@ -649,13 +629,9 @@ namespace ccf
void tick(std::chrono::milliseconds elapsed) override
{
// calculate how many tx/sec we have processed in this tick
auto duration = elapsed.count() / 1000.0;
auto tx_rate = tx_count / duration;
metrics.track_tx_rates(elapsed, tx_count);
// reset tx_counter for next tick interval
tx_count = 0;
histogram.record(tx_rate);
// TODO(#refactoring): move this to NodeState::tick
if ((raft != nullptr) && raft->is_leader())
{
@ -664,6 +640,7 @@ namespace ccf
ms_to_sig -= elapsed;
return;
}
ms_to_sig = sig_max_ms;
if (history && tables.commit_gap() > 0)
history->emit_signature();

93
src/node/rpc/metrics.h Normal file
Просмотреть файл

@ -0,0 +1,93 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "ds/histogram.h"
#include "ds/logger.h"
#include <nlohmann/json.hpp>
#define HIST_MAX (1 << 17)
#define HIST_MIN 1
#define HIST_BUCKET_GRANULARITY 5
#define TX_RATE_BUCKETS_LEN 4000
namespace metrics
{
class Metrics
{
private:
size_t tick_count = 0;
double tx_time_passed[TX_RATE_BUCKETS_LEN] = {};
size_t tx_rates[TX_RATE_BUCKETS_LEN] = {};
std::chrono::milliseconds rate_time_elapsed = std::chrono::milliseconds(0);
using Hist =
histogram::Histogram<int, HIST_MIN, HIST_MAX, HIST_BUCKET_GRANULARITY>;
histogram::Global<Hist> global =
histogram::Global<Hist>("histogram", __FILE__, __LINE__);
Hist histogram = Hist(global);
nlohmann::json get_histogram_results()
{
nlohmann::json result;
nlohmann::json hist;
result["low"] = histogram.get_low();
result["high"] = histogram.get_high();
result["overflow"] = histogram.get_overflow();
result["underflow"] = histogram.get_underflow();
auto range_counts = histogram.get_range_count();
for (auto const& [range, count] : range_counts)
{
if (count > 0)
{
hist[range] = count;
}
}
result["buckets"] = hist;
return result;
}
nlohmann::json get_tx_rates()
{
nlohmann::json result;
for (size_t i = 0; i < TX_RATE_BUCKETS_LEN; ++i)
{
if (tx_rates[i] > 0)
{
result[std::to_string(i)]["rate"] = tx_rates[i];
result[std::to_string(i)]["duration"] = tx_time_passed[i];
}
}
return result;
}
public:
nlohmann::json get_metrics()
{
nlohmann::json result;
result["histogram"] = get_histogram_results();
result["tx_rates"] = get_tx_rates();
return result;
}
void track_tx_rates(
const std::chrono::milliseconds& elapsed, size_t tx_count)
{
// calculate how many tx/sec we have processed in this tick
auto duration = elapsed.count() / 1000.0;
auto tx_rate = tx_count / duration;
histogram.record(tx_rate);
// keep time since beginning
rate_time_elapsed += elapsed;
if (tx_rate > 0)
{
if (tick_count < TX_RATE_BUCKETS_LEN)
{
auto rate_duration = rate_time_elapsed.count() / 1000.0;
tx_rates[tick_count] = tx_rate;
tx_time_passed[tick_count] = rate_duration;
}
tick_count++;
}
}
};
}

Просмотреть файл

@ -13,5 +13,5 @@ namespace ccf
ADD_JSON_TRANSLATORS(JoinNetworkNodeToNode::In, raw_fresh_key)
ADD_JSON_TRANSLATORS(JoinNetworkNodeToNode::Out, id, network_secrets, version)
ADD_JSON_TRANSLATORS(GetCommit::Out, term, commit)
ADD_JSON_TRANSLATORS(GetTxHist::Out, tx_hist)
ADD_JSON_TRANSLATORS(GetMetrics::Out, metrics)
}

Просмотреть файл

@ -31,6 +31,11 @@ def cli_args(add=lambda x: None, accept_unknown=False):
default="all",
help="Send client requests only to primary, only to followers, or to all nodes",
)
parser.add_argument(
"--metrics-file",
default="metrics.json",
help="Path to json file where the transaction rate metrics will be saved to",
)
parser.add_argument("-t", "--threads", help="Number of client threads", default=1)
parser.add_argument(
"-f",

Просмотреть файл

@ -220,13 +220,15 @@ class RPCLogger:
def log_response(self, response):
LOG.debug(
"#{} {}".format(
response.id,
{
k: v
for k, v in (response.__dict__ or {}).items()
if not k.startswith("_")
},
truncate(
"#{} {}".format(
response.id,
{
k: v
for k, v in (response.__dict__ or {}).items()
if not k.startswith("_")
},
)
)
)

Просмотреть файл

@ -2,6 +2,8 @@
# Licensed under the Apache 2.0 License.
import json
import infra.proc
import collections
from statistics import mean, harmonic_mean, median, pstdev
from loguru import logger as LOG
@ -13,73 +15,91 @@ class TxRates:
self.get_histogram = False
self.primary = primary
self.same_commit_count = 0
self.data = {}
self.histogram_data = {}
self.tx_rates_data = []
self.all_metrics = {}
self.commit = 0
with open("getTxHist.json", "w") as gtxrf:
gtxrf.write('{"id":1,"jsonrpc":"2.0","method":"getTxHist","params":{}}\n')
with open("getCommit.json", "w") as gcf:
gcf.write('{"id":1,"jsonrpc":"2.0","method":"getCommit","params":{}}\n')
def __str__(self):
out_list = ["----------- tx rates -----------"]
out_list.append("----- mean ----: " + str(mean(self.tx_rates_data)))
out_list.append(
"----- harmonic mean ----: " + str(harmonic_mean(self.tx_rates_data))
)
out_list.append(
"---- standard deviation ----: " + str(pstdev(self.tx_rates_data))
)
out_list.append("----- median ----: " + str(median(self.tx_rates_data)))
out_list.append("---- max ----: " + str(max(self.tx_rates_data)))
out_list.append("---- min ----: " + str(min(self.tx_rates_data)))
out_list.append("----------- tx rates histogram -----------")
out_list.append(json.dumps(self.histogram_data, indent=4))
return "\n".join(out_list)
def save_results(self, output_file):
with open(output_file, "w") as mfile:
json.dump(self.all_metrics, mfile)
def process_next(self):
rv = infra.proc.ccall(
"./client",
"--host={}".format(self.primary.host),
"--port={}".format(self.primary.tls_port),
"--ca=networkcert.pem",
"userrpc",
"--cert=user1_cert.pem",
"--pk=user1_privk.pem",
"--req=getCommit.json",
log_output=False,
)
print(rv.stdout.decode())
result = rv.stdout.decode().split("\n")[1]
result = json.loads(result)
next_commit = result["result"]["commit"]
if self.commit == next_commit:
self.same_commit_count += 1
else:
self.same_commit_count = 0
with self.primary.user_client(format="json") as client:
rv = client.rpc("getCommit", {})
result = rv.to_dict()
next_commit = result["result"]["commit"]
if self.commit == next_commit:
self.same_commit_count += 1
else:
self.same_commit_count = 0
self.commit = next_commit
self.commit = next_commit
if self.same_commit_count > COMMIT_COUNT_CUTTOF:
self._get_hist()
self._get_metrics()
return False
return True
def print_results(self):
for key in sorted(self.data.keys()):
print(key + " : " + str(self.data[key]))
def _get_metrics(self):
with self.primary.user_client(format="json") as client:
rv = client.rpc("getMetrics", {})
result = rv.to_dict()
result = result["result"]["metrics"]
self.all_metrics = result
def save_results(self):
with open("tx_rates.txt", "w") as file:
for key in sorted(self.data.keys()):
file.write(key + " : " + str(self.data[key]))
file.write("\n")
all_rates = []
all_durations = []
rates = result.get("tx_rates")
if rates is None:
LOG.info("No tx rate metrics found...")
else:
for key in rates:
all_rates.append(rates[key]["rate"])
all_durations.append(float(rates[key]["duration"]))
self.tx_rates_data = all_rates
def _get_hist(self):
rv = infra.proc.ccall(
"./client",
"--host={}".format(self.primary.host),
"--port={}".format(self.primary.tls_port),
"--ca=networkcert.pem",
"userrpc",
"--cert=user1_cert.pem",
"--pk=user1_privk.pem",
"--req=getTxHist.json",
log_output=False,
)
histogram = result.get("histogram")
if histogram is None:
LOG.info("No histogram metrics found...")
else:
histogram_buckets = histogram["buckets"]
result = rv.stdout.decode().split("\n")[1]
result = json.loads(result)
histogram = result["result"]["tx_hist"]["histogram"]
LOG.info("Filtering histogram results...")
for key in histogram:
if histogram[key] > 0:
self.data[key] = histogram[key]
self.data["low"] = result["result"]["tx_hist"]["low"]
self.data["high"] = result["result"]["tx_hist"]["high"]
self.data["underflow"] = result["result"]["tx_hist"]["underflow"]
self.data["overflow"] = result["result"]["tx_hist"]["overflow"]
LOG.info("Filtering histogram results...")
hist_data = {}
for key in histogram_buckets:
if histogram_buckets[key] > 0:
range_1, range_2 = key.split("..")
hist_data[int(range_1)] = (range_2, histogram_buckets[key])
self.histogram_data["histogram"] = {}
buckets = []
rates = []
for key, value_tuple in sorted(hist_data.items(), key=lambda x: x[0]):
self.histogram_data["histogram"][
str(key) + ".." + value_tuple[0]
] = value_tuple[1]
buckets.append(str(key) + ".." + value_tuple[0])
rates.append(value_tuple[1])
self.histogram_data["low"] = histogram["low"]
self.histogram_data["high"] = histogram["high"]
self.histogram_data["underflow"] = histogram["underflow"]
self.histogram_data["overflow"] = histogram["overflow"]

Просмотреть файл

@ -97,6 +97,7 @@ class SSHRemote(CmdMixin):
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.root = os.path.join("/tmp", tmpdir_name(name))
self.name = name
def _rc(self, cmd):
LOG.info("[{}] {}".format(self.hostname, cmd))
@ -166,7 +167,9 @@ class SSHRemote(CmdMixin):
for filename in ("err", "out"):
try:
filepath = os.path.join(self.root, filename)
local_filepath = "{}_{}".format(self.hostname, filename)
local_filepath = "{}_{}_{}".format(
self.hostname, filename, self.name
)
session.get(filepath, local_filepath)
LOG.info("Downloaded {}".format(local_filepath))
except Exception:
@ -191,7 +194,10 @@ class SSHRemote(CmdMixin):
"""
LOG.info("[{}] closing".format(self.hostname))
self.get_logs()
log_errors("{}_out".format(self.hostname), "{}_err".format(self.hostname))
log_errors(
"{}_out_{}".format(self.hostname, self.name),
"{}_err_{}".format(self.hostname, self.name),
)
self.client.close()
def restart(self):
@ -336,7 +342,7 @@ class LocalRemote(CmdMixin):
for _ in range(timeout):
with open(os.path.join(self.root, "out"), "rb") as out:
for out_line in out:
if out_line.strip().decode() == line.strip():
if line.strip() in out_line.strip().decode():
return
time.sleep(1)
raise ValueError(

Просмотреть файл

@ -80,3 +80,9 @@ class CCFRemoteClient(object):
self.remote.stop()
except Exception:
LOG.exception("Failed to shut down {} cleanly".format(self.name))
def wait(self):
try:
self.remote.wait_for_stdout_line(line="Global commit", timeout=5)
except Exception:
LOG.exception("Failed to wait on client {}".format(self.name))

Просмотреть файл

@ -84,12 +84,10 @@ def run_client(args, primary, command_args):
"--config={}".format(args.config),
]
command += command_args
if args.network_only:
LOG.info("Client can be run with {}".format(" ".join(command)))
while True:
time.sleep(60)
else:
infra.proc.ccall(*command).check_returncode()
LOG.info("Client can be run with {}".format(" ".join(command)))
while True:
time.sleep(60)
def run(build_directory, get_command, args):
@ -109,13 +107,12 @@ def run(build_directory, get_command, args):
command_args = get_command_args(args, get_command)
client_hosts = args.client_nodes
if args.network_only or client_hosts is None:
if args.network_only:
run_client(args, primary, command_args)
else:
nodes = filter_nodes(primary, followers, args.send_tx_to)
clients = []
client_hosts = args.client_nodes or ["localhost"]
for client_id, client_host in enumerate(client_hosts):
node = nodes[client_id % len(nodes)]
remote = configure_remote_client(
@ -132,13 +129,13 @@ def run(build_directory, get_command, args):
continue_processing = tx_rates.process_next()
time.sleep(1)
if not continue_processing:
time.sleep(15)
for remote in clients:
remote.wait()
remote.stop()
break
tx_rates.print_results()
tx_rates.save_results()
LOG.info(f"Rates: {tx_rates}")
tx_rates.save_results(args.metrics_file)
except KeyboardInterrupt:
for remote in clients: