Host fairness with load monitor (#1480)

This commit is contained in:
Eddy Ashton 2020-08-12 14:30:17 +01:00 коммит произвёл GitHub
Родитель 05e0805db3
Коммит b915ec1926
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 391 добавлений и 28 удалений

Просмотреть файл

@ -97,6 +97,7 @@ function(sign_app_library name app_oe_conf_path enclave_sign_key_path)
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/lib${name}.so.debuggable
COMMAND
cp ${app_oe_conf_path} ${DEBUG_CONF_NAME} && (grep
-q
"Debug\=.*"
${DEBUG_CONF_NAME}
&&
@ -129,6 +130,7 @@ function(sign_app_library name app_oe_conf_path enclave_sign_key_path)
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/lib${name}.so.signed
COMMAND
cp ${app_oe_conf_path} ${SIGNED_CONF_NAME} && (grep
-q
"Debug\=.*"
${SIGNED_CONF_NAME}
&&

Просмотреть файл

@ -25,36 +25,54 @@ namespace messaging
using logic_error::logic_error;
};
struct Counts
{
size_t messages;
size_t bytes;
};
template <typename MessageType>
using MessageCounts = std::unordered_map<MessageType, Counts>;
template <typename MessageType>
class Dispatcher
{
public:
using MessageCounts = MessageCounts<MessageType>;
private:
// Store a name to distinguish error messages
char const* const name;
std::map<MessageType, Handler> handlers;
std::map<MessageType, char const*> message_labels;
MessageCounts message_counts;
std::string get_error_prefix()
{
return std::string("[") + std::string(name) + std::string("] ");
}
static std::string build_message_name(
MessageType m, char const* s = nullptr)
{
return std::string("<") + (s == nullptr ? "unknown" : s) + ":" +
std::to_string(m) + ">";
}
std::string get_message_name(MessageType m)
char const* get_message_name(MessageType m)
{
const auto it = message_labels.find(m);
if (it == message_labels.end())
{
return build_message_name(m);
return "unknown";
}
return build_message_name(m, it->second);
return it->second;
}
static std::string decorate_message_name(MessageType m, char const* s)
{
return fmt::format("<{}:{}>", s, m);
}
std::string get_decorated_message_name(MessageType m)
{
return decorate_message_name(m, get_message_name(m));
}
public:
@ -81,8 +99,9 @@ namespace messaging
{
throw already_handled(
get_error_prefix() + "MessageType " + std::to_string(m) +
" already handled by " + get_message_name(m) +
", cannot set handler for " + build_message_name(m, message_label));
" already handled by " + get_decorated_message_name(m) +
", cannot set handler for " +
decorate_message_name(m, message_label));
}
LOG_DEBUG_FMT("Setting handler for {} ({})", message_label, m);
@ -109,7 +128,7 @@ namespace messaging
throw no_handler(
get_error_prefix() +
"Can't remove non-existent handler for this message: " +
get_message_name(m));
get_decorated_message_name(m));
}
handlers.erase(it);
@ -139,11 +158,33 @@ namespace messaging
{
throw no_handler(
get_error_prefix() +
"No handler for this message: " + get_message_name(m));
"No handler for this message: " + get_decorated_message_name(m));
}
// Handlers may register or remove handlers, so iterator is invalidated
it->second(data, size);
auto& counts = message_counts[m];
counts.messages++;
counts.bytes += size;
}
MessageCounts retrieve_message_counts()
{
MessageCounts current;
std::swap(message_counts, current);
return current;
}
nlohmann::json convert_message_counts(const MessageCounts& mc)
{
auto j = nlohmann::json::object();
for (const auto& it : mc)
{
j[get_message_name(it.first)] = {{"count", it.second.messages},
{"bytes", it.second.bytes}};
}
return j;
}
};

Просмотреть файл

@ -194,12 +194,21 @@ TEST_CASE("Basic message loop" * doctest::test_suite("messaging"))
{
test_filler.write(echo_out);
REQUIRE_THROWS_AS(bp.run(loop_src), messaging::no_handler);
const auto counts = bp.get_dispatcher().retrieve_message_counts();
REQUIRE(counts.empty());
}
SUBCASE("Message handlers can finish the loop")
{
test_filler.write(finish);
REQUIRE(bp.run(loop_src) == 1);
const auto counts = bp.get_dispatcher().retrieve_message_counts();
REQUIRE(counts.size() == 1);
REQUIRE(counts.find(finish) != counts.end());
REQUIRE(counts.at(finish).messages == 1);
REQUIRE(counts.at(finish).bytes == 0);
}
SUBCASE("Message handlers can affect external state")
@ -209,6 +218,15 @@ TEST_CASE("Basic message loop" * doctest::test_suite("messaging"))
test_filler.write(finish);
REQUIRE(bp.run(loop_src) == 2);
REQUIRE(x == new_x);
const auto counts = bp.get_dispatcher().retrieve_message_counts();
REQUIRE(counts.size() == 2);
REQUIRE(counts.find(set_x) != counts.end());
REQUIRE(counts.at(set_x).messages == 1);
REQUIRE(counts.at(set_x).bytes == sizeof(new_x));
REQUIRE(counts.find(finish) != counts.end());
REQUIRE(counts.at(finish).messages == 1);
REQUIRE(counts.at(finish).bytes == 0);
}
SUBCASE("Message handlers can communicate through the writer")
@ -228,6 +246,15 @@ TEST_CASE("Basic message loop" * doctest::test_suite("messaging"))
REQUIRE(data[i] == actual[i]);
}
}) == 1);
const auto counts = bp.get_dispatcher().retrieve_message_counts();
REQUIRE(counts.size() == 2);
REQUIRE(counts.find(echo) != counts.end());
REQUIRE(counts.at(echo).messages == 1);
REQUIRE(counts.at(echo).bytes == actual.size());
REQUIRE(counts.find(finish) != counts.end());
REQUIRE(counts.at(finish).messages == 1);
REQUIRE(counts.at(finish).bytes == 0);
}
SUBCASE("Dispatcher can be accessed directly")
@ -249,6 +276,12 @@ TEST_CASE("Basic message loop" * doctest::test_suite("messaging"))
dispatcher.remove_message_handler(set_x), messaging::no_handler);
REQUIRE_THROWS_AS(
dispatcher.dispatch(set_x, nullptr, 0), messaging::no_handler);
const auto counts = bp.get_dispatcher().retrieve_message_counts();
REQUIRE(counts.size() == 1);
REQUIRE(counts.find(set_x) != counts.end());
REQUIRE(counts.at(set_x).messages == 1);
REQUIRE(counts.at(set_x).bytes == 0);
}
}

Просмотреть файл

@ -35,6 +35,7 @@ namespace enclave
std::shared_ptr<RPCSessions> rpcsessions;
std::unique_ptr<ccf::NodeState> node;
std::shared_ptr<ccf::Forwarder<ccf::NodeToNode>> cmd_forwarder;
ringbuffer::WriterPtr to_host = nullptr;
CCFConfig ccf_config;
StartType start_type;
@ -84,6 +85,8 @@ namespace enclave
logger::config::msg() = AdminMessage::log_msg;
logger::config::writer() = writer_factory.create_writer_to_outside();
to_host = writer_factory.create_writer_to_outside();
node = std::make_unique<ccf::NodeState>(
writer_factory,
network,
@ -204,12 +207,21 @@ namespace enclave
});
DISPATCHER_SET_MESSAGE_HANDLER(
bp, AdminMessage::tick, [this](const uint8_t* data, size_t size) {
bp,
AdminMessage::tick,
[this, &bp](const uint8_t* data, size_t size) {
auto [ms_count] =
ringbuffer::read_message<AdminMessage::tick>(data, size);
if (ms_count > 0)
{
const auto message_counts =
bp.get_dispatcher().retrieve_message_counts();
const auto j =
bp.get_dispatcher().convert_message_counts(message_counts);
RINGBUFFER_WRITE_MESSAGE(
AdminMessage::work_stats, to_host, j.dump());
std::chrono::milliseconds elapsed_ms(ms_count);
logger::config::tick(elapsed_ms);
node->tick(elapsed_ms);
@ -351,18 +363,16 @@ namespace enclave
}
});
auto w = writer_factory.create_writer_to_outside();
LOG_INFO_FMT("Enclave stopped successfully. Stopping host...");
RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, w);
RINGBUFFER_WRITE_MESSAGE(AdminMessage::stopped, to_host);
return true;
}
#ifndef VIRTUAL_ENCLAVE
catch (const std::exception& e)
{
auto w = writer_factory.create_writer_to_outside();
RINGBUFFER_WRITE_MESSAGE(
AdminMessage::fatal_error_msg, w, std::string(e.what()));
AdminMessage::fatal_error_msg, to_host, std::string(e.what()));
return false;
}
#endif
@ -395,9 +405,8 @@ namespace enclave
#ifndef VIRTUAL_ENCLAVE
catch (const std::exception& e)
{
auto w = writer_factory.create_writer_to_outside();
RINGBUFFER_WRITE_MESSAGE(
AdminMessage::fatal_error_msg, w, std::string(e.what()));
AdminMessage::fatal_error_msg, to_host, std::string(e.what()));
return false;
}
#endif

Просмотреть файл

@ -93,7 +93,10 @@ enum AdminMessage : ringbuffer::Message
DEFINE_RINGBUFFER_MSG_TYPE(notification),
/// Periodically update based on current time. Host -> Enclave
DEFINE_RINGBUFFER_MSG_TYPE(tick)
DEFINE_RINGBUFFER_MSG_TYPE(tick),
/// Notify the host of work done since last message. Enclave -> Host
DEFINE_RINGBUFFER_MSG_TYPE(work_stats)
};
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
@ -110,3 +113,4 @@ DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(AdminMessage::stopped);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
AdminMessage::notification, std::vector<uint8_t>);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(AdminMessage::tick, size_t);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(AdminMessage::work_stats, std::string);

Просмотреть файл

@ -20,7 +20,9 @@ namespace asynchost
class HandleRingbufferImpl
{
private:
static constexpr size_t max_messages = 128;
// Maximum number of outbound ringbuffer messages which will be processed in
// a single iteration
static constexpr size_t max_messages = 256;
messaging::BufferProcessor& bp;
ringbuffer::Reader& r;
@ -67,11 +69,8 @@ namespace asynchost
{
// On each uv loop iteration...
// ...read (and process) all outbound ringbuffer messages...
while (bp.read_n(max_messages, r) > 0)
{
continue;
}
// ...read (and process) some outbound ringbuffer messages...
bp.read_n(max_messages, r);
// ...flush any pending inbound messages...
nbwf.flush_all_inbound();

110
src/host/load_monitor.h Normal file
Просмотреть файл

@ -0,0 +1,110 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "ds/messaging.h"
#include "timer.h"
namespace asynchost
{
class LoadMonitorImpl
{
using TClock = std::chrono::system_clock;
std::chrono::milliseconds last_update;
messaging::Dispatcher<ringbuffer::Message>& dispatcher;
std::fstream host_output_file;
std::fstream enclave_output_file;
nlohmann::json enclave_counts;
public:
LoadMonitorImpl(messaging::BufferProcessor& bp) :
dispatcher(bp.get_dispatcher())
{
dispatcher.retrieve_message_counts();
last_update = std::chrono::duration_cast<std::chrono::milliseconds>(
TClock::now().time_since_epoch());
host_output_file.open("host_load.log", std::fstream::out);
enclave_output_file.open("enclave_load.log", std::fstream::out);
enclave_counts = nlohmann::json::object();
// Register message handler for work_stats message from enclave
DISPATCHER_SET_MESSAGE_HANDLER(
bp, AdminMessage::work_stats, [this](const uint8_t* data, size_t size) {
auto [dumped_json] =
ringbuffer::read_message<AdminMessage::work_stats>(data, size);
nlohmann::json j;
try
{
j = nlohmann::json::parse(dumped_json);
}
catch (const nlohmann::json::parse_error& e)
{
LOG_FAIL_FMT("Received unparseable work_stats from enclave");
return;
}
for (const auto& [outer_key, outer_value] : j.items())
{
for (const auto& [inner_key, inner_value] : outer_value.items())
{
auto& outer_obj = enclave_counts[outer_key];
auto it = outer_obj.find(inner_key);
if (it == outer_obj.end())
{
outer_obj[inner_key] = inner_value;
}
else
{
const auto prev = it.value().get<size_t>();
outer_obj[inner_key] = prev + inner_value.get<size_t>();
}
}
}
});
}
void on_timer()
{
const auto message_counts = dispatcher.retrieve_message_counts();
const auto time_now =
std::chrono::duration_cast<std::chrono::milliseconds>(
TClock::now().time_since_epoch());
if (!message_counts.empty())
{
auto j = nlohmann::json::object();
j["start_time_ms"] = last_update.count();
j["end_time_ms"] = time_now.count();
{
j["ringbuffer_messages"] =
dispatcher.convert_message_counts(message_counts);
const auto line = j.dump();
host_output_file.write(line.data(), line.size());
host_output_file << std::endl;
}
{
j["ringbuffer_messages"] = enclave_counts;
enclave_counts = nlohmann::json::object();
const auto line = j.dump();
enclave_output_file.write(line.data(), line.size());
enclave_output_file << std::endl;
}
last_update = time_now;
}
}
};
using LoadMonitor = proxy_ptr<Timer<LoadMonitorImpl>>;
}

Просмотреть файл

@ -9,6 +9,7 @@
#include "ds/stacktrace_utils.h"
#include "enclave.h"
#include "handle_ring_buffer.h"
#include "load_monitor.h"
#include "node_connections.h"
#include "notify_connections.h"
#include "rpc_connections.h"
@ -32,6 +33,8 @@ using namespace std::chrono_literals;
::timespec logger::config::start{0, 0};
size_t asynchost::TCPImpl::remaining_read_quota;
void print_version(size_t)
{
std::cout << "CCF host: " << ccf::ccf_version << std::endl;
@ -512,9 +515,15 @@ int main(int argc, char** argv)
logger::config::set_start(s);
});
// reset the inbound-TCP processing quota each iteration
asynchost::ResetTCPReadQuota reset_tcp_quota;
// regularly update the time given to the enclave
asynchost::TimeUpdater time_updater(1);
// regularly record some load statistics
asynchost::LoadMonitor load_monitor(500, bp);
// handle outbound messages from the enclave
asynchost::HandleRingbuffer handle_ringbuffer(
bp, circuit.read_from_inside(), non_blocking_factory);

Просмотреть файл

@ -3,6 +3,7 @@
#pragma once
#include "../ds/logger.h"
#include "before_io.h"
#include "dns.h"
#include "proxy.h"
@ -52,6 +53,10 @@ namespace asynchost
static constexpr int backlog = 128;
static constexpr size_t max_read_size = 16384;
// Each uv iteration, read only a capped amount from all sockets.
static constexpr auto max_read_quota = max_read_size * 4;
static size_t remaining_read_quota;
enum Status
{
FRESH,
@ -129,6 +134,11 @@ namespace asynchost
}
public:
static void reset_read_quota()
{
remaining_read_quota = max_read_quota;
}
void set_behaviour(std::unique_ptr<TCPBehaviour> b)
{
behaviour = std::move(b);
@ -571,6 +581,14 @@ namespace asynchost
void on_alloc(size_t suggested_size, uv_buf_t* buf)
{
auto alloc_size = std::min(suggested_size, max_read_size);
alloc_size = std::min(alloc_size, remaining_read_quota);
remaining_read_quota -= alloc_size;
LOG_TRACE_FMT(
"Allocating {} bytes for TCP read ({} of quota remaining)",
alloc_size,
remaining_read_quota);
buf->base = new char[alloc_size];
buf->len = alloc_size;
}
@ -594,6 +612,12 @@ namespace asynchost
return;
}
if (sz == UV_ENOBUFS)
{
LOG_DEBUG_FMT("TCP on_read reached allocation quota");
return;
}
if (sz < 0)
{
assert_status(CONNECTED, DISCONNECTED);
@ -647,4 +671,17 @@ namespace asynchost
connect_resolved();
}
};
class ResetTCPReadQuotaImpl
{
public:
ResetTCPReadQuotaImpl() {}
void before_io()
{
TCPImpl::reset_read_quota();
}
};
using ResetTCPReadQuota = proxy_ptr<BeforeIO<ResetTCPReadQuotaImpl>>;
}

119
tests/plot_node_load.py Normal file
Просмотреть файл

@ -0,0 +1,119 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import argparse
import datetime
import json
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import os
# For consistency between plots we want a function from label (name) to colour.
# We could do this programatically from the hashes to handle general values, but
# this is sufficient and makes it simple to group similar messages with similar colours
LABELS_TO_COLOURS = {
# Processed on Host
"AdminMessage::log_msg": "dimgray",
"AdminMessage::notification": "silver",
"AdminMessage::work_stats": "gainsboro",
"ccf::add_node": "lime",
"ccf::node_outbound": "darkgreen",
"consensus::ledger_append": "red",
"consensus::ledger_get": "indianred",
"consensus::ledger_commit": "maroon",
"consensus::ledger_truncate": "rosybrown",
"tls::tls_closed": "darkkhaki",
"tls::tls_connect": "khaki",
"tls::tls_outbound": "gold",
"tls::tls_stop": "goldenrod",
# Processed in enclave
"AdminMessage::tick": "dimgray",
"ccf::node_inbound": "darkgreen",
"consensus::ledger_entry": "red",
"tls::tls_close": "darkkhaki",
"tls::tls_inbound": "gold",
"tls::tls_start": "goldenrod",
# Processed in both
"OversizedMessage::fragment": "slategray",
}
def num_to_bytes_formatter(n, _):
suffixes = ("B", "KB", "MB", "GB")
i = 0
while n >= 1024 and i < len(suffixes) - 1:
n /= 1024.0
i += 1
return f"{n:,.2f} {suffixes[i]}"
def plot_stacked(jsons, key):
labels = []
for j in jsons:
for label in j["ringbuffer_messages"].keys():
if label not in labels:
labels.append(label)
labels.sort()
colours = []
default_colour = "black"
for i, label in enumerate(labels):
try:
colours.append(LABELS_TO_COLOURS[label])
except KeyError:
print(f"No colour for '{label}', defaulting to {default_colour}")
colours.append(default_colour)
xs = []
ys = [[] for _ in range(len(labels))]
for j in jsons:
xs.append(j["end_time_ms"])
messages = j["ringbuffer_messages"]
for i, label in enumerate(labels):
try:
count = messages[label][key]
except KeyError:
count = 0
ys[i].append(count)
def ms_to_date_formatter(ms, _):
s = ms / 1000.0
return datetime.datetime.fromtimestamp(s).strftime("%H:%M:%S")
_, ax = plt.subplots()
plt.title(f"Ringbuffer messages - {key}")
plt.ylabel(f"{key}")
plt.ticklabel_format(useOffset=False)
ax.xaxis.set_major_formatter(ms_to_date_formatter)
ax.locator_params(axis="x", nbins=5)
ax.xaxis.set_minor_locator(ticker.MultipleLocator(1000))
if key == "bytes":
ax.yaxis.set_major_formatter(num_to_bytes_formatter)
ax.stackplot(xs, ys, colors=colours, labels=labels)
ax.legend(prop={"size": 8})
path_without_ext, _ = os.path.splitext(args.load_file.name)
output_path = f"{path_without_ext}_{key}.png"
print(f"Saving plot to {output_path}")
plt.savefig(output_path, bbox_inches="tight")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"load_file",
type=argparse.FileType("r"),
help="Path to load log file to be parsed",
)
args = parser.parse_args()
lines = args.load_file.readlines()
jsons = [json.loads(line) for line in lines]
plot_stacked(jsons, "count")
plot_stacked(jsons, "bytes")