Only send notifications from leader (#388)

* Notifier only sends notifications from primary

* Only create notification server when it is used

* Check commit notifications

* Format

* One notification-per-commit

* Formatting

* Avoid nullptr access

* Format
This commit is contained in:
Eddy Ashton 2019-09-23 16:30:12 +01:00 коммит произвёл GitHub
Родитель c7568876f6
Коммит 2ea708dad3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 71 добавлений и 54 удалений

Просмотреть файл

@ -45,7 +45,7 @@ namespace enclave
writer_factory(circuit, enclave_config->writer_config),
rpcsessions(writer_factory),
n2n_channels(std::make_shared<ccf::NodeToNode>(writer_factory)),
node(writer_factory, network, rpcsessions),
node(writer_factory, network, rpcsessions, notifier),
notifier(writer_factory),
cmd_forwarder(
std::make_shared<ccf::Forwarder>(rpcsessions, n2n_channels)),

Просмотреть файл

@ -13,6 +13,7 @@
#include "history.h"
#include "networkstate.h"
#include "nodetonode.h"
#include "notifier.h"
#include "rpc/consts.h"
#include "rpc/frontend.h"
#include "rpc/serialization.h"
@ -120,6 +121,7 @@ namespace ccf
std::shared_ptr<enclave::RpcMap> rpc_map;
std::shared_ptr<NodeToNode> n2n_channels;
enclave::RPCSessions& rpcsessions;
ccf::Notifier& notifier;
std::shared_ptr<kv::TxHistory> history;
std::shared_ptr<kv::AbstractTxEncryptor> encryptor;
@ -147,14 +149,16 @@ namespace ccf
NodeState(
ringbuffer::AbstractWriterFactory& writer_factory,
NetworkState& network,
enclave::RPCSessions& rpcsessions) :
enclave::RPCSessions& rpcsessions,
ccf::Notifier& notifier) :
sm(State::uninitialized),
self(INVALID_ID),
node_kp(tls::make_key_pair()),
writer_factory(writer_factory),
to_host(writer_factory.create_writer_to_outside()),
network(network),
rpcsessions(rpcsessions)
rpcsessions(rpcsessions),
notifier(notifier)
{
::EverCrypt_AutoConfig2_init();
}
@ -1133,6 +1137,8 @@ namespace ccf
network.tables->set_consensus(consensus);
notifier.set_consensus(consensus);
// When a node is added, even locally, inform the host so that it can
// map the node id to a hostname and service and inform raft so that it
// can add a new active configuration.
@ -1256,6 +1262,8 @@ namespace ccf
network.tables->set_consensus(consensus);
notifier.set_consensus(consensus);
// When a node is added, even locally, inform the host so that it can
// map the node id to a hostname and service and inform pbft
network.nodes.set_local_hook(

Просмотреть файл

@ -2,8 +2,10 @@
// Licensed under the Apache 2.0 License.
#pragma once
#include "ds/logger.h"
#include "ds/ringbuffer_types.h"
#include "enclave/interface.h"
#include "kv/kvtypes.h"
#include "rpc/frontend.h"
namespace ccf
@ -12,6 +14,7 @@ namespace ccf
{
private:
std::unique_ptr<ringbuffer::AbstractWriter> to_host;
std::shared_ptr<kv::Consensus> consensus = nullptr;
public:
Notifier(ringbuffer::AbstractWriterFactory& writer_factory_) :
@ -20,7 +23,26 @@ namespace ccf
void notify(const std::vector<uint8_t>& data) override
{
RINGBUFFER_WRITE_MESSAGE(AdminMessage::notification, to_host, data);
if (consensus == nullptr)
{
LOG_FAIL_FMT("Unable to send notification - no consensus has been set");
return;
}
if (consensus->is_primary())
{
LOG_DEBUG_FMT("Sending notification");
RINGBUFFER_WRITE_MESSAGE(AdminMessage::notification, to_host, data);
}
else
{
LOG_DEBUG_FMT("Ignoring notification - not leader");
}
}
void set_consensus(const std::shared_ptr<kv::Consensus>& c)
{
consensus = c;
}
};
}

Просмотреть файл

@ -28,9 +28,8 @@ def run(args):
primary, (backup,) = network.start_and_join(args)
with primary.management_client() as mc:
check_commit = infra.ccf.Checker(mc)
check_commit = infra.ccf.Checker(mc, notifications.get_queue())
check = infra.ccf.Checker()
check_notification = infra.ccf.Checker(None, notifications.get_queue())
msg = "Hello world"
msg2 = "Hello there"
@ -41,7 +40,7 @@ def run(args):
check_commit(
c.rpc("LOG_record", {"id": 42, "msg": msg}), result=True
)
check_notification(
check_commit(
c.rpc("LOG_record", {"id": 43, "msg": msg2}), result=True
)
check(c.rpc("LOG_get", {"id": 42}), result={"msg": msg})

Просмотреть файл

@ -20,55 +20,42 @@ from loguru import logger as LOG
def run(args):
hosts = ["localhost"]
with infra.notification.notification_server(args.notify_server) as notifications:
with infra.ccf.network(
hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
primary, _ = network.start_and_join(args)
with infra.ccf.network(
hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
primary, _ = network.start_and_join(args)
with primary.management_client() as mc:
check_commit = infra.ccf.Checker(mc)
check = infra.ccf.Checker()
with primary.management_client() as mc:
check_commit = infra.ccf.Checker(mc)
check = infra.ccf.Checker()
msg = "Hello world"
msg2 = "Hello there"
msg = "Hello world"
msg2 = "Hello there"
LOG.debug("Write/Read on primary")
with primary.user_client(format="json") as c:
check_commit(c.rpc("LOG_record", {"id": 42, "msg": msg}), result=True)
check_commit(c.rpc("LOG_record", {"id": 43, "msg": msg2}), result=True)
check(c.rpc("LOG_get", {"id": 42}), result={"msg": msg})
check(c.rpc("LOG_get", {"id": 43}), result={"msg": msg2})
LOG.debug("Write/Read on primary")
with primary.user_client(format="json") as c:
LOG.debug("Write/Read large messages on primary")
with primary.user_client(format="json") as c:
id = 44
# For larger values of p, PBFT crashes since the size of the
# request is bigger than the max size supported by PBFT
# (Max_message_size)
for p in range(10, 13):
long_msg = "X" * (2 ** p)
check_commit(
c.rpc("LOG_record", {"id": 42, "msg": msg}), result=True
c.rpc("LOG_record", {"id": id, "msg": long_msg}), result=True
)
check_commit(
c.rpc("LOG_record", {"id": 43, "msg": msg2}), result=True
)
check(c.rpc("LOG_get", {"id": 42}), result={"msg": msg})
check(c.rpc("LOG_get", {"id": 43}), result={"msg": msg2})
LOG.debug("Write/Read large messages on primary")
with primary.user_client(format="json") as c:
id = 44
# For larger values of p, PBFT crashes since the size of the
# request is bigger than the max size supported by PBFT
# (Max_message_size)
for p in range(10, 13):
long_msg = "X" * (2 ** p)
check_commit(
c.rpc("LOG_record", {"id": id, "msg": long_msg}),
result=True,
)
check(c.rpc("LOG_get", {"id": id}), result={"msg": long_msg})
id += 1
check(c.rpc("LOG_get", {"id": id}), result={"msg": long_msg})
id += 1
if __name__ == "__main__":
args = e2e_args.cli_args()
args.package = "libloggingenc"
notify_server_host = "localhost"
args.notify_server = (
notify_server_host
+ ":"
+ str(infra.net.probably_free_local_port(notify_server_host))
)
run(args)

Просмотреть файл

@ -65,10 +65,4 @@ if __name__ == "__main__":
sys.exit(0)
args.package = "libloggingenc"
notify_server_host = "localhost"
args.notify_server = (
notify_server_host
+ ":"
+ str(infra.net.probably_free_local_port(notify_server_host))
)
run(args)

Просмотреть файл

@ -522,6 +522,7 @@ class Checker:
def __init__(self, management_client=None, notification_queue=None):
self.management_client = management_client
self.notification_queue = notification_queue
self.notified_commit = 0
def __call__(self, rpc_result, result=None, error=None, timeout=2):
if error is not None:
@ -548,8 +549,14 @@ class Checker:
if self.notification_queue:
for i in range(timeout * 10):
for q in list(self.notification_queue.queue):
if json.loads(q)["commit"] >= rpc_result.commit:
while self.notification_queue.not_empty:
notification = self.notification_queue.get()
n = json.loads(notification)["commit"]
assert (
n > self.notified_commit
), f"Received notification of commit {n} after commit {self.notified_commit}"
self.notified_commit = n
if n >= rpc_result.commit:
return
time.sleep(0.5)
raise TimeoutError("Timed out waiting for notification")