From 2ea708dad3c981802e2d4f2eebfe5df5dc8e3aeb Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 23 Sep 2019 16:30:12 +0100 Subject: [PATCH] Only send notifications from leader (#388) * Notifier only sends notifications from primary * Only create notification server when it is used * Check commit notifications * Format * One notification-per-commit * Formatting * Avoid nullptr access * Format --- src/enclave/enclave.h | 2 +- src/node/nodestate.h | 12 ++++++-- src/node/notifier.h | 24 ++++++++++++++- tests/e2e_logging.py | 5 ++- tests/e2e_logging_pbft.py | 65 ++++++++++++++++----------------------- tests/governance.py | 6 ---- tests/infra/ccf.py | 11 +++++-- 7 files changed, 71 insertions(+), 54 deletions(-) diff --git a/src/enclave/enclave.h b/src/enclave/enclave.h index 6aebe92df..86c2b2be3 100644 --- a/src/enclave/enclave.h +++ b/src/enclave/enclave.h @@ -45,7 +45,7 @@ namespace enclave writer_factory(circuit, enclave_config->writer_config), rpcsessions(writer_factory), n2n_channels(std::make_shared(writer_factory)), - node(writer_factory, network, rpcsessions), + node(writer_factory, network, rpcsessions, notifier), notifier(writer_factory), cmd_forwarder( std::make_shared(rpcsessions, n2n_channels)), diff --git a/src/node/nodestate.h b/src/node/nodestate.h index 61352e09a..68f3e884a 100644 --- a/src/node/nodestate.h +++ b/src/node/nodestate.h @@ -13,6 +13,7 @@ #include "history.h" #include "networkstate.h" #include "nodetonode.h" +#include "notifier.h" #include "rpc/consts.h" #include "rpc/frontend.h" #include "rpc/serialization.h" @@ -120,6 +121,7 @@ namespace ccf std::shared_ptr rpc_map; std::shared_ptr n2n_channels; enclave::RPCSessions& rpcsessions; + ccf::Notifier& notifier; std::shared_ptr history; std::shared_ptr encryptor; @@ -147,14 +149,16 @@ namespace ccf NodeState( ringbuffer::AbstractWriterFactory& writer_factory, NetworkState& network, - enclave::RPCSessions& rpcsessions) : + enclave::RPCSessions& rpcsessions, + ccf::Notifier& notifier) : sm(State::uninitialized), self(INVALID_ID), node_kp(tls::make_key_pair()), writer_factory(writer_factory), to_host(writer_factory.create_writer_to_outside()), network(network), - rpcsessions(rpcsessions) + rpcsessions(rpcsessions), + notifier(notifier) { ::EverCrypt_AutoConfig2_init(); } @@ -1133,6 +1137,8 @@ namespace ccf network.tables->set_consensus(consensus); + notifier.set_consensus(consensus); + // When a node is added, even locally, inform the host so that it can // map the node id to a hostname and service and inform raft so that it // can add a new active configuration. @@ -1256,6 +1262,8 @@ namespace ccf network.tables->set_consensus(consensus); + notifier.set_consensus(consensus); + // When a node is added, even locally, inform the host so that it can // map the node id to a hostname and service and inform pbft network.nodes.set_local_hook( diff --git a/src/node/notifier.h b/src/node/notifier.h index 0a68c8e7a..daa0cfb0d 100644 --- a/src/node/notifier.h +++ b/src/node/notifier.h @@ -2,8 +2,10 @@ // Licensed under the Apache 2.0 License. #pragma once +#include "ds/logger.h" #include "ds/ringbuffer_types.h" #include "enclave/interface.h" +#include "kv/kvtypes.h" #include "rpc/frontend.h" namespace ccf @@ -12,6 +14,7 @@ namespace ccf { private: std::unique_ptr to_host; + std::shared_ptr consensus = nullptr; public: Notifier(ringbuffer::AbstractWriterFactory& writer_factory_) : @@ -20,7 +23,26 @@ namespace ccf void notify(const std::vector& data) override { - RINGBUFFER_WRITE_MESSAGE(AdminMessage::notification, to_host, data); + if (consensus == nullptr) + { + LOG_FAIL_FMT("Unable to send notification - no consensus has been set"); + return; + } + + if (consensus->is_primary()) + { + LOG_DEBUG_FMT("Sending notification"); + RINGBUFFER_WRITE_MESSAGE(AdminMessage::notification, to_host, data); + } + else + { + LOG_DEBUG_FMT("Ignoring notification - not leader"); + } + } + + void set_consensus(const std::shared_ptr& c) + { + consensus = c; } }; } diff --git a/tests/e2e_logging.py b/tests/e2e_logging.py index bd49c616f..21a83630e 100644 --- a/tests/e2e_logging.py +++ b/tests/e2e_logging.py @@ -28,9 +28,8 @@ def run(args): primary, (backup,) = network.start_and_join(args) with primary.management_client() as mc: - check_commit = infra.ccf.Checker(mc) + check_commit = infra.ccf.Checker(mc, notifications.get_queue()) check = infra.ccf.Checker() - check_notification = infra.ccf.Checker(None, notifications.get_queue()) msg = "Hello world" msg2 = "Hello there" @@ -41,7 +40,7 @@ def run(args): check_commit( c.rpc("LOG_record", {"id": 42, "msg": msg}), result=True ) - check_notification( + check_commit( c.rpc("LOG_record", {"id": 43, "msg": msg2}), result=True ) check(c.rpc("LOG_get", {"id": 42}), result={"msg": msg}) diff --git a/tests/e2e_logging_pbft.py b/tests/e2e_logging_pbft.py index 68453974e..15c755891 100644 --- a/tests/e2e_logging_pbft.py +++ b/tests/e2e_logging_pbft.py @@ -20,55 +20,42 @@ from loguru import logger as LOG def run(args): hosts = ["localhost"] - with infra.notification.notification_server(args.notify_server) as notifications: + with infra.ccf.network( + hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + primary, _ = network.start_and_join(args) - with infra.ccf.network( - hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb - ) as network: - primary, _ = network.start_and_join(args) + with primary.management_client() as mc: + check_commit = infra.ccf.Checker(mc) + check = infra.ccf.Checker() - with primary.management_client() as mc: - check_commit = infra.ccf.Checker(mc) - check = infra.ccf.Checker() + msg = "Hello world" + msg2 = "Hello there" - msg = "Hello world" - msg2 = "Hello there" + LOG.debug("Write/Read on primary") + with primary.user_client(format="json") as c: + check_commit(c.rpc("LOG_record", {"id": 42, "msg": msg}), result=True) + check_commit(c.rpc("LOG_record", {"id": 43, "msg": msg2}), result=True) + check(c.rpc("LOG_get", {"id": 42}), result={"msg": msg}) + check(c.rpc("LOG_get", {"id": 43}), result={"msg": msg2}) - LOG.debug("Write/Read on primary") - with primary.user_client(format="json") as c: + LOG.debug("Write/Read large messages on primary") + with primary.user_client(format="json") as c: + id = 44 + # For larger values of p, PBFT crashes since the size of the + # request is bigger than the max size supported by PBFT + # (Max_message_size) + for p in range(10, 13): + long_msg = "X" * (2 ** p) check_commit( - c.rpc("LOG_record", {"id": 42, "msg": msg}), result=True + c.rpc("LOG_record", {"id": id, "msg": long_msg}), result=True ) - check_commit( - c.rpc("LOG_record", {"id": 43, "msg": msg2}), result=True - ) - check(c.rpc("LOG_get", {"id": 42}), result={"msg": msg}) - check(c.rpc("LOG_get", {"id": 43}), result={"msg": msg2}) - - LOG.debug("Write/Read large messages on primary") - with primary.user_client(format="json") as c: - id = 44 - # For larger values of p, PBFT crashes since the size of the - # request is bigger than the max size supported by PBFT - # (Max_message_size) - for p in range(10, 13): - long_msg = "X" * (2 ** p) - check_commit( - c.rpc("LOG_record", {"id": id, "msg": long_msg}), - result=True, - ) - check(c.rpc("LOG_get", {"id": id}), result={"msg": long_msg}) - id += 1 + check(c.rpc("LOG_get", {"id": id}), result={"msg": long_msg}) + id += 1 if __name__ == "__main__": args = e2e_args.cli_args() args.package = "libloggingenc" - notify_server_host = "localhost" - args.notify_server = ( - notify_server_host - + ":" - + str(infra.net.probably_free_local_port(notify_server_host)) - ) run(args) diff --git a/tests/governance.py b/tests/governance.py index 55ad20b4f..60801d728 100644 --- a/tests/governance.py +++ b/tests/governance.py @@ -65,10 +65,4 @@ if __name__ == "__main__": sys.exit(0) args.package = "libloggingenc" - notify_server_host = "localhost" - args.notify_server = ( - notify_server_host - + ":" - + str(infra.net.probably_free_local_port(notify_server_host)) - ) run(args) diff --git a/tests/infra/ccf.py b/tests/infra/ccf.py index de3ff1edc..24c7efce2 100644 --- a/tests/infra/ccf.py +++ b/tests/infra/ccf.py @@ -522,6 +522,7 @@ class Checker: def __init__(self, management_client=None, notification_queue=None): self.management_client = management_client self.notification_queue = notification_queue + self.notified_commit = 0 def __call__(self, rpc_result, result=None, error=None, timeout=2): if error is not None: @@ -548,8 +549,14 @@ class Checker: if self.notification_queue: for i in range(timeout * 10): - for q in list(self.notification_queue.queue): - if json.loads(q)["commit"] >= rpc_result.commit: + while self.notification_queue.not_empty: + notification = self.notification_queue.get() + n = json.loads(notification)["commit"] + assert ( + n > self.notified_commit + ), f"Received notification of commit {n} after commit {self.notified_commit}" + self.notified_commit = n + if n >= rpc_result.commit: return time.sleep(0.5) raise TimeoutError("Timed out waiting for notification")