Add 2-tx reconfiguration RPCs/ORCs (#2899)

This commit is contained in:
Christoph M. Wintersteiger 2021-09-03 13:53:00 +01:00 коммит произвёл GitHub
Родитель 43a7e16b6e
Коммит a01bdef015
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
24 изменённых файлов: 661 добавлений и 151 удалений

Просмотреть файл

@ -392,4 +392,9 @@ Evidence inserted in the ledger by a primary producing a snapshot to establish p
Used to persist submitted shares during a recovery.
While the contents themselves are encrypted, the table is public so as to be accessible by nodes bootstrapping a recovery service.
While the contents themselves are encrypted, the table is public so as to be accessible by nodes bootstrapping a recovery service.
``resharings``
~~~~~~~~~~~~~~
Internal information for identity resharing during 2-transaction reconfiguration.

Просмотреть файл

@ -32,6 +32,7 @@
#include "impl/state.h"
#include "impl/view_change_tracker.h"
#include "kv/kv_types.h"
#include "node/node_client.h"
#include "node/node_to_node.h"
#include "node/node_types.h"
#include "node/progress_tracker.h"
@ -149,6 +150,13 @@ namespace aft
bool use_two_tx_reconfig = false;
bool require_identity_for_reconfig = false;
std::shared_ptr<ccf::ResharingTracker> resharing_tracker;
std::unordered_map<kv::ReconfigurationId, kv::NetworkConfiguration>
network_configurations;
std::unordered_map<kv::ReconfigurationId, std::unordered_set<ccf::NodeId>>
orc_sets;
// Node client to trigger submission of RPC requests
std::shared_ptr<ccf::NodeClient> node_client;
// Index at which this node observes its retirement
std::optional<ccf::SeqNo> retirement_idx = std::nullopt;
@ -195,6 +203,7 @@ namespace aft
std::shared_ptr<aft::RequestTracker> request_tracker_,
std::unique_ptr<aft::ViewChangeTracker> view_change_tracker_,
std::shared_ptr<ccf::ResharingTracker> resharing_tracker_,
std::shared_ptr<ccf::NodeClient> rpc_request_context_,
std::chrono::milliseconds request_timeout_,
std::chrono::milliseconds election_timeout_,
std::chrono::milliseconds view_change_timeout_,
@ -216,9 +225,11 @@ namespace aft
request_timeout(request_timeout_),
election_timeout(election_timeout_),
view_change_timeout(view_change_timeout_),
sig_tx_interval(sig_tx_interval_),
resharing_tracker(std::move(resharing_tracker_)),
node_client(rpc_request_context_),
public_only(public_only_),
@ -504,17 +515,34 @@ namespace aft
return state->view_history.initialise(term_history);
}
private:
uint32_t get_bft_offset(const Configuration::Nodes& conf) const
{
uint32_t offset = 0;
if (consensus_type == ConsensusType::BFT && !configurations.empty())
{
auto progress_tracker = store->get_progress_tracker();
auto target = progress_tracker->get_primary_at_last_view_change();
for (; offset < configurations.back().nodes.size(); ++offset)
{
if (
get_primary_at_config(std::get<1>(target), offset, conf) ==
std::get<0>(target))
{
break;
}
}
}
return offset;
}
public:
void add_configuration(
Index idx,
const Configuration::Nodes& conf,
const kv::Configuration::Nodes& conf,
const std::unordered_set<ccf::NodeId>& new_learners = {})
{
std::unordered_set<ccf::NodeId> conf_ids;
for (const auto& [id, _] : conf)
{
conf_ids.insert(id);
}
LOG_DEBUG_FMT("Configurations: add {{{}}}", fmt::join(conf_ids, ", "));
LOG_DEBUG_FMT("Configurations: add {{{}}}", conf);
std::unique_lock<std::mutex> guard(state->lock, std::defer_lock);
// It is safe to call is_follower() by construction as the consensus
@ -527,6 +555,8 @@ namespace aft
if (!use_two_tx_reconfig)
{
assert(new_learners.empty());
// Detect when we are retired by observing a configuration
// from which we are absent following a configuration in which
// we were included. Note that this relies on retirement being
@ -545,25 +575,7 @@ namespace aft
LOG_INFO_FMT("Node retiring at {}", idx);
}
}
uint32_t offset = 0;
if (consensus_type == ConsensusType::BFT && !configurations.empty())
{
auto progress_tracker = store->get_progress_tracker();
auto target = progress_tracker->get_primary_at_last_view_change();
for (; offset < configurations.back().nodes.size(); ++offset)
{
if (
get_primary_at_config(std::get<1>(target), offset, conf) ==
std::get<0>(target))
{
break;
}
}
}
configurations.push_back({idx, std::move(conf), offset});
if (use_two_tx_reconfig)
else
{
if (!new_learners.empty())
{
@ -578,17 +590,81 @@ namespace aft
}
}
}
if (!configurations.empty())
{
for (const auto& [nid, _] : conf)
{
if (
nodes.find(nid) != nodes.end() &&
learners.find(nid) != learners.end() &&
new_learners.find(nid) == new_learners.end())
{
// Promotion of known learner
learners.erase(nid);
}
if (is_learner() && nid == state->my_node_id)
{
LOG_DEBUG_FMT(
"Configurations: observing own promotion, becoming a follower");
replica_state = kv::ReplicaState::Follower;
}
}
}
}
else if (!new_learners.empty())
{
throw std::runtime_error(
"learner requires two-transaction reconfiguration");
}
uint32_t offset = get_bft_offset(conf);
configurations.push_back({idx, std::move(conf), offset, 0});
backup_nodes.clear();
create_and_remove_node_state();
}
void reconfigure(
ccf::SeqNo seqno, const kv::NetworkConfiguration& netconfig)
{
LOG_DEBUG_FMT("Configurations: reconfigure to {{{}}}", netconfig);
std::unique_lock<std::mutex> guard(state->lock, std::defer_lock);
if (is_bft_reexecution() && threading::ThreadMessaging::thread_count > 1)
{
guard.lock();
}
assert(!configurations.empty());
for (const auto& nid : netconfig.nodes)
{
if (nid != state->my_node_id && nodes.find(nid) == nodes.end())
{
LOG_FAIL_FMT("Configurations: node {} is unknown", nid);
return;
}
}
network_configurations[netconfig.rid] = netconfig;
if (orc_sets.find(netconfig.rid) == orc_sets.end())
{
orc_sets[netconfig.rid] = {};
}
if (configurations.back().rid == 0)
{
configurations.back().rid = netconfig.rid;
}
if (resharing_tracker)
{
assert(resharing_tracker);
resharing_tracker->add_network_configuration(netconfig);
if (is_primary())
{
resharing_tracker->reshare(netconfig);
}
}
}
void add_resharing_result(
ccf::SeqNo seqno,
kv::ReconfigurationId rid,
@ -601,34 +677,39 @@ namespace aft
}
}
void add_network_configuration(
ccf::SeqNo seqno, const kv::NetworkConfiguration& config)
// For more info about Observed Reconfiguration Commits see
// https://microsoft.github.io/CCF/main/overview/consensus/bft.html#two-transaction-reconfiguration
bool orc(kv::ReconfigurationId rid, const ccf::NodeId& node_id)
{
LOG_DEBUG_FMT("Configurations: new network config: {{{}}}", config);
std::unique_lock<std::mutex> guard(state->lock, std::defer_lock);
LOG_DEBUG_FMT(
"Configurations: ORC for configuration #{} from {}", rid, node_id);
if (is_bft_reexecution() && threading::ThreadMessaging::thread_count > 1)
const auto oit = orc_sets.find(rid);
if (oit == orc_sets.end())
{
guard.lock();
throw std::logic_error(
fmt::format("Missing ORC set for configuration #{}", rid));
}
if (
use_two_tx_reconfig && !is_learner() && !is_retired() &&
config.nodes.find(state->my_node_id) != config.nodes.end())
const auto ncit = network_configurations.find(rid);
if (ncit == network_configurations.end())
{
// Send/schedule ORCs
throw std::logic_error(fmt::format("Unknown configuration #{}", rid));
}
if (require_identity_for_reconfig)
const auto& ncnodes = ncit->second.nodes;
if (ncnodes.find(node_id) == ncnodes.end())
{
assert(use_two_tx_reconfig);
assert(resharing_tracker);
resharing_tracker->add_network_configuration(config);
if (is_primary())
{
resharing_tracker->reshare(config);
}
throw std::logic_error(fmt::format("Unknown node {}", node_id));
}
oit->second.insert(node_id);
LOG_DEBUG_FMT(
"Configurations: have {} ORCs out of {} for configuration #{}",
oit->second.size(),
ncnodes.size(),
rid);
return oit->second.size() >= get_quorum(ncnodes.size());
}
Configuration::Nodes get_latest_configuration_unsafe() const
@ -1230,7 +1311,7 @@ namespace aft
}
ccf::NodeId get_primary_at_config(
ccf::View view, uint32_t offset, const Configuration::Nodes& conf)
ccf::View view, uint32_t offset, const Configuration::Nodes& conf) const
{
CCF_ASSERT_FMT(
consensus_type == ConsensusType::BFT,
@ -2950,13 +3031,13 @@ namespace aft
// Need 50% + 1 of the total nodes in the current config (including us).
votes_for_me.insert(from);
quorum = get_quorum(cfg);
quorum = get_quorum(cfg.nodes.size());
}
else
{
// Need 50% + 1 of the total nodes, which are the other nodes plus us.
votes_for_me.insert(from);
quorum = ((nodes.size() + 1) / 2) + 1;
quorum = get_quorum(nodes.size() + 1);
}
if (votes_for_me.size() >= quorum)
@ -3086,14 +3167,14 @@ namespace aft
return r;
}
size_t get_quorum(const kv::Configuration& c) const
size_t get_quorum(size_t n) const
{
switch (consensus_type)
{
case CFT:
return (c.nodes.size() / 2) + 1;
return (n / 2) + 1;
case BFT:
return (c.nodes.size() / 3) + 1;
return ((2 * n) / 3) + 1;
default:
return -1;
}
@ -3101,7 +3182,7 @@ namespace aft
bool have_quorum(size_t n, const kv::Configuration& c) const
{
return n >= get_quorum(c);
return n >= get_quorum(c.nodes.size());
}
bool enough_trusted(const kv::Configuration& c) const
@ -3188,8 +3269,9 @@ namespace aft
if (num_trusted(*next) == next->nodes.size())
{
LOG_TRACE_FMT(
"Configurations: all nodes trusted, switching to next "
"configuration");
"Configurations: all nodes trusted, switching to configuration "
"#{}",
next->rid);
if (!is_retiring() && !is_retired())
{
@ -3201,13 +3283,39 @@ namespace aft
}
}
for (auto& [nid, _] : next->nodes)
{
learners.erase(nid);
}
if (
is_learner() &&
next->nodes.find(state->my_node_id) != next->nodes.end())
{
LOG_INFO_FMT(
"Becoming follower {}: {}",
state->my_node_id,
state->current_view);
replica_state = kv::ReplicaState::Follower;
}
configurations.pop_front();
}
else
{
LOG_TRACE_FMT(
"Configurations: not enough trusted nodes for next "
"configuration");
if (
use_two_tx_reconfig && !is_learner() && !is_retired() &&
node_client &&
next->nodes.find(state->my_node_id) != next->nodes.end())
{
LOG_TRACE_FMT(
"Configurations: not enough trusted nodes for configuration "
"#{} ({} out of {}); submitting ORC",
next->rid,
num_trusted(*next),
next->nodes.size());
node_client->schedule_submit_orc(state->my_node_id, next->rid);
}
break;
}
}
@ -3320,32 +3428,6 @@ namespace aft
void create_and_remove_node_state()
{
if (use_two_tx_reconfig && is_learner())
{
for (auto& cfg : configurations)
{
if (cfg.idx > state->commit_idx)
break;
if (
cfg.nodes.find(state->my_node_id) != cfg.nodes.end() &&
learners.find(state->my_node_id) != learners.end())
{
LOG_INFO_FMT("Configurations: ready for promotion");
// Submit promotion RPC here.
learners.erase(state->my_node_id);
// The transition to follower will happen when the reconfiguration
// transaction commits.
LOG_INFO_FMT(
"Becoming follower {}: {}",
state->my_node_id,
state->current_view);
replica_state = kv::ReplicaState::Follower;
}
}
}
// Find all nodes present in any active configuration.
Configuration::Nodes active_nodes;
@ -3383,8 +3465,13 @@ namespace aft
continue;
}
if (nodes.find(node_info.first) == nodes.end())
if (
nodes.find(node_info.first) == nodes.end() ||
!channels->have_channel(node_info.first))
{
LOG_DEBUG_FMT(
"Configurations: create node channel with {}", node_info.first);
// A new node is sent only future entries initially. If it does not
// have prior data, it will communicate that back to the leader.
auto index = state->last_idx + 1;

Просмотреть файл

@ -135,10 +135,15 @@ namespace aft
aft->add_configuration(seqno, conf, learners);
}
void add_network_configuration(
bool orc(kv::ReconfigurationId rid, const ccf::NodeId& node_id) override
{
return aft->orc(rid, node_id);
}
void reconfigure(
ccf::SeqNo seqno, const kv::NetworkConfiguration& config) override
{
return aft->add_network_configuration(seqno, config);
return aft->reconfigure(seqno, config);
}
Configuration::Nodes get_latest_configuration() override

Просмотреть файл

@ -115,6 +115,7 @@ void keep_earliest_append_entries_for_each_target(
nullptr, \
nullptr, \
nullptr, \
nullptr, \
request_timeout, \
election_timeout, \
election_timeout); \

Просмотреть файл

@ -140,6 +140,7 @@ public:
std::make_shared<aft::RequestTracker>(),
nullptr,
nullptr,
nullptr,
ms(10),
ms(100),
ms(100));

Просмотреть файл

@ -177,6 +177,11 @@ namespace aft
void set_endorsed_node_cert(const crypto::Pem&) override {}
bool have_channel(const ccf::NodeId& nid) const override
{
return true;
}
bool send_authenticated(
const ccf::NodeId& to,
ccf::NodeMsgType msg_type,

Просмотреть файл

@ -31,6 +31,7 @@ DOCTEST_TEST_CASE("Single node startup" * doctest::test_suite("single"))
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
election_timeout,
ms(1000));
@ -76,6 +77,7 @@ DOCTEST_TEST_CASE("Single node commit" * doctest::test_suite("single"))
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
election_timeout,
ms(1000));
@ -132,6 +134,7 @@ DOCTEST_TEST_CASE(
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -148,6 +151,7 @@ DOCTEST_TEST_CASE(
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -164,6 +168,7 @@ DOCTEST_TEST_CASE(
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));
@ -304,6 +309,7 @@ DOCTEST_TEST_CASE(
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -320,6 +326,7 @@ DOCTEST_TEST_CASE(
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -336,6 +343,7 @@ DOCTEST_TEST_CASE(
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));
@ -460,6 +468,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -476,6 +485,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -492,6 +502,7 @@ DOCTEST_TEST_CASE("Multiple nodes late join" * doctest::test_suite("multiple"))
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));
@ -603,6 +614,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -619,6 +631,7 @@ DOCTEST_TEST_CASE("Recv append entries logic" * doctest::test_suite("multiple"))
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -779,6 +792,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(20),
ms(1000));
@ -795,6 +809,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(100),
ms(1000));
@ -811,6 +826,7 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
nullptr,
nullptr,
nullptr,
nullptr,
request_timeout,
ms(50),
ms(1000));

Просмотреть файл

@ -16,6 +16,7 @@
#include <chrono>
#include <functional>
#include <limits>
#include <list>
#include <memory>
#include <set>
#include <string>
@ -79,6 +80,8 @@ namespace kv
DECLARE_JSON_TYPE(TxID);
DECLARE_JSON_REQUIRED_FIELDS(TxID, term, version)
using ReconfigurationId = uint64_t;
struct Configuration
{
struct NodeInfo
@ -99,6 +102,7 @@ namespace kv
ccf::SeqNo idx;
Nodes nodes;
uint32_t bft_offset;
ReconfigurationId rid;
};
inline void to_json(nlohmann::json& j, const Configuration::NodeInfo& ni)
@ -134,7 +138,7 @@ namespace kv
{ReplicaState::Retiring, "Retiring"}});
DECLARE_JSON_TYPE(Configuration);
DECLARE_JSON_REQUIRED_FIELDS(Configuration, idx, nodes);
DECLARE_JSON_REQUIRED_FIELDS(Configuration, idx, nodes, bft_offset, rid);
struct ConsensusDetails
{
@ -148,8 +152,6 @@ namespace kv
DECLARE_JSON_REQUIRED_FIELDS(ConsensusDetails, configs, acks, state);
DECLARE_JSON_OPTIONAL_FIELDS(ConsensusDetails, learners);
using ReconfigurationId = uint64_t;
struct NetworkConfiguration
{
ReconfigurationId rid;
@ -174,12 +176,13 @@ namespace kv
virtual Configuration::Nodes get_latest_configuration() = 0;
virtual Configuration::Nodes get_latest_configuration_unsafe() const = 0;
virtual ConsensusDetails get_details() = 0;
virtual void add_network_configuration(
virtual void reconfigure(
ccf::SeqNo seqno, const NetworkConfiguration& config) = 0;
virtual void add_resharing_result(
ccf::SeqNo seqno,
ReconfigurationId rid,
const ccf::ResharingResult& result) = 0;
virtual bool orc(kv::ReconfigurationId rid, const NodeId& node_id) = 0;
};
class ConsensusHook
@ -707,6 +710,29 @@ namespace kv
}
FMT_BEGIN_NAMESPACE
template <>
struct formatter<kv::Configuration::Nodes>
{
template <typename ParseContext>
auto parse(ParseContext& ctx)
{
return ctx.begin();
}
template <typename FormatContext>
auto format(const kv::Configuration::Nodes& nodes, FormatContext& ctx)
-> decltype(ctx.out())
{
std::set<ccf::NodeId> node_ids;
for (auto& [nid, _] : nodes)
{
node_ids.insert(nid);
}
return format_to(ctx.out(), "{{{}}}", fmt::join(node_ids, " "));
}
};
template <>
struct formatter<kv::NetworkConfiguration>
{
@ -724,4 +750,5 @@ struct formatter<kv::NetworkConfiguration>
ctx.out(), "{}:{{{}}}", config.rid, fmt::join(config.nodes, " "));
}
};
FMT_END_NAMESPACE

Просмотреть файл

@ -152,10 +152,15 @@ namespace kv::test
const std::unordered_set<NodeId>& learners = {}) override
{}
void add_network_configuration(
void reconfigure(
ccf::SeqNo seqno, const NetworkConfiguration& config) override
{}
virtual bool orc(kv::ReconfigurationId rid, const NodeId& node_id) override
{
return false;
}
Configuration::Nodes get_latest_configuration_unsafe() const override
{
return {};

Просмотреть файл

@ -1009,6 +1009,11 @@ namespace ccf
}
}
bool have_channel(const NodeId& nid) const
{
return channels.find(nid) != channels.end();
}
std::shared_ptr<Channel> get(const NodeId& peer_id)
{
std::lock_guard<std::mutex> guard(lock);

Просмотреть файл

@ -114,7 +114,7 @@ namespace ccf
for (const auto& nc : configs)
{
consensus->add_network_configuration(version, nc);
consensus->reconfigure(version, nc);
}
}
};

138
src/node/http_node_client.h Normal file
Просмотреть файл

@ -0,0 +1,138 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "node/node_client.h"
#include "node/rpc/node_call_types.h"
#include "node/rpc/serdes.h"
#include "node/rpc/serialization.h"
namespace ccf
{
class HTTPNodeClient : public NodeClient
{
public:
HTTPNodeClient(
std::shared_ptr<enclave::RPCMap> rpc_map,
crypto::KeyPairPtr node_sign_kp,
const crypto::Pem& self_signed_node_cert_,
const std::optional<crypto::Pem>& endorsed_node_cert_) :
NodeClient(
rpc_map, node_sign_kp, self_signed_node_cert_, endorsed_node_cert_)
{}
virtual ~HTTPNodeClient() {}
inline bool make_request(http::Request& request)
{
const auto& node_cert = endorsed_node_cert.has_value() ?
endorsed_node_cert.value() :
self_signed_node_cert;
auto node_cert_der = crypto::cert_pem_to_der(node_cert);
const auto key_id = crypto::Sha256Hash(node_cert_der).hex_str();
http::sign_request(request, node_sign_kp, key_id);
std::vector<uint8_t> packed = request.build_request();
auto node_session = std::make_shared<enclave::SessionContext>(
enclave::InvalidSessionId, node_cert.raw());
auto ctx = enclave::make_rpc_context(node_session, packed);
ctx->execute_on_node = false;
const auto actor_opt = http::extract_actor(*ctx);
if (!actor_opt.has_value())
{
throw std::logic_error("Unable to get actor");
}
const auto actor = rpc_map->resolve(actor_opt.value());
auto frontend_opt = rpc_map->find(actor);
if (!frontend_opt.has_value())
{
throw std::logic_error(
"RpcMap::find returned invalid (empty) frontend");
}
auto frontend = frontend_opt.value();
frontend->process(ctx);
auto rs = ctx->get_response_status();
if (rs != HTTP_STATUS_OK)
{
auto ser_res = ctx->serialise_response();
std::string str((char*)ser_res.data(), ser_res.size());
LOG_FAIL_FMT("Request failed: {}", str);
}
return rs == HTTP_STATUS_OK;
}
bool submit_orc(const NodeId& from, kv::ReconfigurationId rid) override
{
LOG_DEBUG_FMT("Configurations: submit ORC for #{} from {}", rid, from);
ObservedReconfigurationCommit::In ps = {from, rid};
http::Request request(fmt::format(
"/{}/{}", ccf::get_actor_prefix(ccf::ActorsType::nodes), "orc"));
request.set_header(
http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON);
auto body = serdes::pack(ps, serdes::Pack::Text);
request.set_body(&body);
return make_request(request);
}
struct AsyncORCTaskMsg
{
AsyncORCTaskMsg(
HTTPNodeClient* client_,
const NodeId& from_,
kv::ReconfigurationId rid_,
size_t retries_ = 10) :
client(client_),
from(from_),
rid(rid_),
retries(retries_)
{}
HTTPNodeClient* client;
NodeId from;
kv::ReconfigurationId rid;
size_t retries;
};
static void orc_cb(std::unique_ptr<threading::Tmsg<AsyncORCTaskMsg>> msg)
{
if (!msg->data.client->submit_orc(msg->data.from, msg->data.rid))
{
if (--msg->data.retries > 0)
{
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::get_execution_thread(
threading::MAIN_THREAD_ID),
std::move(msg));
}
else
{
LOG_DEBUG_FMT(
"Failed request; giving up as there are no more retries left");
}
}
}
virtual void schedule_submit_orc(
const NodeId& from, kv::ReconfigurationId rid) override
{
auto msg = std::make_unique<threading::Tmsg<AsyncORCTaskMsg>>(
orc_cb, this, from, rid);
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::get_execution_thread(
threading::MAIN_THREAD_ID),
std::move(msg));
}
};
}

38
src/node/node_client.h Normal file
Просмотреть файл

@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/aft/raft_types.h"
#include "crypto/pem.h"
#include "enclave/rpc_sessions.h"
namespace ccf
{
class NodeClient
{
protected:
std::shared_ptr<enclave::RPCMap> rpc_map;
crypto::KeyPairPtr node_sign_kp;
const crypto::Pem& self_signed_node_cert;
const std::optional<crypto::Pem>& endorsed_node_cert = std::nullopt;
public:
NodeClient(
std::shared_ptr<enclave::RPCMap> rpc_map_,
crypto::KeyPairPtr node_sign_kp_,
const crypto::Pem& self_signed_node_cert_,
const std::optional<crypto::Pem>& endorsed_node_cert_) :
rpc_map(rpc_map_),
node_sign_kp(node_sign_kp_),
self_signed_node_cert(self_signed_node_cert_),
endorsed_node_cert(endorsed_node_cert_)
{}
virtual ~NodeClient() {}
virtual bool submit_orc(const NodeId& from, kv::ReconfigurationId rid) = 0;
virtual void schedule_submit_orc(
const NodeId& from, kv::ReconfigurationId rid) = 0;
};
}

Просмотреть файл

@ -20,6 +20,7 @@
#include "hooks.h"
#include "js/wrap.h"
#include "network_state.h"
#include "node/http_node_client.h"
#include "node/jwt_key_auto_refresh.h"
#include "node/progress_tracker.h"
#include "node/reconfig_id.h"
@ -2018,9 +2019,16 @@ namespace ccf
tracker_store,
std::chrono::milliseconds(consensus_config.raft_election_timeout));
auto shared_state = std::make_shared<aft::State>(self);
auto resharing_tracker =
std::make_shared<ccf::SplitIdentityResharingTracker>(
shared_state, rpc_map, node_sign_kp, self_signed_node_cert);
shared_state,
rpc_map,
node_sign_kp,
self_signed_node_cert,
endorsed_node_cert);
auto node_client = std::make_shared<HTTPNodeClient>(
rpc_map, node_sign_kp, self_signed_node_cert, endorsed_node_cert);
kv::ReplicaState initial_state =
(network.consensus_type == ConsensusType::BFT &&
@ -2041,6 +2049,7 @@ namespace ccf
request_tracker,
std::move(view_change_tracker),
std::move(resharing_tracker),
node_client,
std::chrono::milliseconds(consensus_config.raft_request_timeout),
std::chrono::milliseconds(consensus_config.raft_election_timeout),
std::chrono::milliseconds(consensus_config.bft_view_change_timeout),

Просмотреть файл

@ -38,6 +38,8 @@ namespace ccf
virtual void destroy_all_channels() = 0;
virtual bool have_channel(const NodeId& nid) const = 0;
template <class T>
bool send_authenticated(const NodeId& to, NodeMsgType type, const T& data)
{
@ -213,6 +215,11 @@ namespace ccf
channels->destroy_all_channels();
}
bool have_channel(const NodeId& nid) const override
{
return channels->have_channel(nid);
}
bool send_authenticated(
const NodeId& to,
NodeMsgType type,

Просмотреть файл

@ -41,9 +41,6 @@ namespace ccf
ccf::NetworkTables& tables, kv::Tx& tx, kv::NetworkConfiguration& config)
{
config.rid = get_next_reconfiguration_id(tables, tx);
LOG_DEBUG_FMT(
"Configurations: adding new entry to network_configurations table: {}",
config);
auto nconfigs = tx.rw(tables.network_configurations);
nconfigs->put(config.rid, config);
nconfigs->put(CONFIG_COUNT_KEY, {config.rid, {}});

Просмотреть файл

@ -36,7 +36,8 @@ namespace ccf
{
if (opt_rr.has_value())
{
LOG_DEBUG_FMT("New resharing result for configuration #{}.", rid);
LOG_DEBUG_FMT(
"Resharings: new resharing result for configuration #{}.", rid);
results.try_emplace(rid, opt_rr.value());
}
}
@ -75,11 +76,13 @@ namespace ccf
std::shared_ptr<aft::State> shared_state_,
std::shared_ptr<enclave::RPCMap> rpc_map_,
crypto::KeyPairPtr node_sign_kp_,
const crypto::Pem& node_cert_) :
const crypto::Pem& self_signed_node_cert_,
const std::optional<crypto::Pem>& endorsed_node_cert_) :
shared_state(shared_state_),
rpc_map(rpc_map_),
node_sign_kp(node_sign_kp_),
node_cert(node_cert_)
self_signed_node_cert(self_signed_node_cert_),
endorsed_node_cert(endorsed_node_cert_)
{}
virtual ~SplitIdentityResharingTracker() {}
@ -96,6 +99,9 @@ namespace ccf
LOG_DEBUG_FMT("Resharings: start resharing for configuration #{}", rid);
sessions.emplace(rid, ResharingSession(config));
const auto& node_cert = endorsed_node_cert.has_value() ?
endorsed_node_cert.value() :
self_signed_node_cert;
auto msg = std::make_unique<threading::Tmsg<UpdateResharingTaskMsg>>(
update_resharing_cb, rid, rpc_map, node_sign_kp, node_cert);
@ -216,7 +222,8 @@ namespace ccf
std::shared_ptr<aft::State> shared_state;
std::shared_ptr<enclave::RPCMap> rpc_map;
crypto::KeyPairPtr node_sign_kp;
const crypto::Pem& node_cert;
const crypto::Pem& self_signed_node_cert;
const std::optional<crypto::Pem>& endorsed_node_cert;
std::unordered_map<kv::ReconfigurationId, ResharingSession> sessions;
std::unordered_map<kv::ReconfigurationId, ResharingResult> results;
std::unordered_map<kv::ReconfigurationId, kv::NetworkConfiguration> configs;

Просмотреть файл

@ -179,4 +179,15 @@ namespace ccf
size_t peak_allocated_heap_size = 0;
};
};
struct ObservedReconfigurationCommit
{
struct In
{
NodeId from;
kv::ReconfigurationId reconfiguration_id;
};
using Out = void;
};
}

Просмотреть файл

@ -14,6 +14,7 @@
#include "node/network_state.h"
#include "node/quote.h"
#include "node/reconfig_id.h"
#include "node/rpc/error.h"
#include "node/session_metrics.h"
#include "node_interface.h"
@ -229,7 +230,13 @@ namespace ccf
kv::NetworkConfiguration nc =
get_latest_network_configuration(network, tx);
nc.nodes.insert(joining_node_id);
add_new_network_reconfiguration(network, tx, nc);
if (
node_status == NodeStatus::TRUSTED ||
node_status == NodeStatus::LEARNER)
{
add_new_network_reconfiguration(network, tx, nc);
}
LOG_INFO_FMT("Node {} added as {}", joining_node_id, node_status);
@ -1317,6 +1324,84 @@ namespace ccf
.set_forwarding_required(endpoints::ForwardingRequired::Always)
.set_openapi_hidden(true)
.install();
auto orc_handler = [this](auto& args, const nlohmann::json& params) {
const auto in = params.get<ObservedReconfigurationCommit::In>();
LOG_DEBUG_FMT(
"ORC for configuration #{} from {}", in.reconfiguration_id, in.from);
if (consensus == nullptr)
{
return make_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::ConsensusTypeMismatch,
fmt::format("No consensus"));
}
if (consensus->type() != ConsensusType::BFT)
{
auto primary_id = consensus->primary();
if (!primary_id.has_value())
{
return make_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
"Primary unknown");
}
if (primary_id.value() != context.get_node_state().get_node_id())
{
return make_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::NodeCannotHandleRequest,
"Only the primary accepts ORCs");
}
}
if (consensus->orc(in.reconfiguration_id, in.from))
{
LOG_DEBUG_FMT(
"Configurations: sufficient number of ORCs, updating nodes in "
"configuration #{}",
in.reconfiguration_id);
auto ncfgs = args.tx.ro(network.network_configurations);
auto nodes = args.tx.rw(network.nodes);
auto nc = ncfgs->get(in.reconfiguration_id);
for (auto nid : nc->nodes)
{
auto node_info = nodes->get(nid);
if (!node_info.has_value())
{
return make_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::InternalError,
fmt::format("Missing node information for {}", nid));
}
if (node_info->status == NodeStatus::LEARNER)
{
node_info->status = NodeStatus::TRUSTED;
nodes->put(nid, *node_info);
}
else if (node_info->status == NodeStatus::RETIRING)
{
node_info->status = NodeStatus::RETIRED;
nodes->put(nid, *node_info);
}
}
}
return make_success();
};
make_endpoint(
"/orc",
HTTP_POST,
json_adapter(orc_handler),
{std::make_shared<NodeCertAuthnPolicy>()})
.set_forwarding_required(endpoints::ForwardingRequired::Always)
.set_openapi_hidden(true)
.install();
}
};

Просмотреть файл

@ -150,4 +150,8 @@ namespace ccf
DECLARE_JSON_TYPE(UpdateResharing::In)
DECLARE_JSON_REQUIRED_FIELDS(UpdateResharing::In, rid)
}
DECLARE_JSON_TYPE(ObservedReconfigurationCommit::In)
DECLARE_JSON_REQUIRED_FIELDS(
ObservedReconfigurationCommit::In, from, reconfiguration_id);
}

Просмотреть файл

@ -791,6 +791,41 @@ const actions = new Map([
ccf.strToBuf(endorsed_node_cert)
);
}
if (serviceConfig.reconfiguration_type == "TwoTransaction") {
const latest_id_raw = ccf.kv[
"public:ccf.gov.nodes.network.configurations"
].get(getSingletonKvKey());
if (latest_id_raw === undefined) {
throw new Error("Network configuration could not be found");
}
const latest_id = ccf.bufToJsonCompatible(latest_id_raw);
const rid_buf = new ArrayBuffer(8);
new DataView(rid_buf).setUint32(0, latest_id.rid, true);
const latest_config_raw =
ccf.kv["public:ccf.gov.nodes.network.configurations"].get(
rid_buf
);
if (latest_config_raw === undefined) {
throw new Error("Network configuration could not be found");
}
const latest_config = ccf.bufToJsonCompatible(latest_config_raw);
latest_config.nodes.push(args.node_id);
latest_config.rid++;
new DataView(rid_buf).setUint32(0, latest_config.rid, true);
ccf.kv["public:ccf.gov.nodes.network.configurations"].set(
rid_buf,
ccf.jsonCompatibleToBuf(latest_config)
);
latest_config.nodes = {};
latest_id.rid = latest_config.rid;
ccf.kv["public:ccf.gov.nodes.network.configurations"].set(
getSingletonKvKey(),
ccf.jsonCompatibleToBuf(latest_id)
);
}
}
}
),
@ -834,6 +869,42 @@ const actions = new Map([
ccf.strToBuf(args.node_id),
ccf.jsonCompatibleToBuf(node_obj)
);
if (serviceConfig.reconfiguration_type == "TwoTransaction") {
const latest_id_raw = ccf.kv[
"public:ccf.gov.nodes.network.configurations"
].get(getSingletonKvKey());
if (latest_id_raw === undefined) {
throw new Error("Network configuration could not be found");
}
const latest_id = ccf.bufToJsonCompatible(latest_id_raw);
const rid_buf = new ArrayBuffer(8);
new DataView(rid_buf).setUint32(0, latest_id.rid, true);
const latest_config_raw =
ccf.kv["public:ccf.gov.nodes.network.configurations"].get(
rid_buf
);
if (latest_config_raw === undefined) {
throw new Error("Network configuration could not be found");
}
const latest_config = ccf.bufToJsonCompatible(latest_config_raw);
const idx = latest_config.nodes.indexOf(args.node_id);
if (idx > -1) {
latest_config.nodes.splice(idx, 1);
}
latest_config.rid++;
new DataView(rid_buf).setUint32(0, latest_config.rid, true);
ccf.kv["public:ccf.gov.nodes.network.configurations"].set(
rid_buf,
ccf.jsonCompatibleToBuf(latest_config)
);
latest_config.nodes = {};
latest_id.rid = latest_config.rid;
ccf.kv["public:ccf.gov.nodes.network.configurations"].set(
getSingletonKvKey(),
ccf.jsonCompatibleToBuf(latest_id)
);
}
}
}
),

Просмотреть файл

@ -301,9 +301,7 @@ class Consortium:
)
assert r.body.json()["status"] == expected
def trust_node(
self, remote_node, node_id, expected_status=NodeStatus.TRUSTED, timeout=3
):
def trust_node(self, remote_node, node_id, timeout=3):
if not self._check_node_exists(remote_node, node_id, NodeStatus.PENDING):
raise ValueError(f"Node {node_id} does not exist in state PENDING")
@ -319,9 +317,11 @@ class Consortium:
timeout=timeout,
)
if not self._check_node_exists(remote_node, node_id, expected_status):
if not self._check_node_exists(
remote_node, node_id, NodeStatus.TRUSTED
) and not self._check_node_exists(remote_node, node_id, NodeStatus.LEARNER):
raise ValueError(
f"Node {node_id} does not exist in state {expected_status}"
f"Node {node_id} does not exist in state {NodeStatus.TRUSTED} or {NodeStatus.LEARNER}"
)
def remove_member(self, remote_node, member_to_remove):
@ -546,6 +546,8 @@ class Consortium:
with remote_node.client() as c:
r = c.get(f"/node/network/nodes/{node_id}")
print(f"r={r}")
if r.status_code != http.HTTPStatus.OK.value or (
node_status and r.body.json()["status"] != node_status.value
):

Просмотреть файл

@ -635,14 +635,13 @@ class Network:
raise StartupSnapshotIsOld from e
raise
def trust_node(self, node, args, expected_status=NodeStatus.TRUSTED):
def trust_node(self, node, args):
primary, _ = self.find_primary()
try:
if self.status is ServiceStatus.OPEN:
self.consortium.trust_node(
primary,
node.node_id,
expected_status,
timeout=ceil(args.join_timer * 2 / 1000),
)
# Here, quote verification has already been run when the node

Просмотреть файл

@ -355,39 +355,37 @@ def test_retiring_nodes_emit_at_most_one_signature(network, args):
@reqs.description("Adding a learner without snapshot")
def test_learner_catches_up(network, args):
args.join_timer = args.join_timer * 2
num_nodes_before = len(network.nodes)
new_node = network.create_node("local://localhost")
network.join_node(new_node, args.package, args, from_snapshot=False)
network.trust_node(new_node, args, ccf.ledger.NodeStatus.LEARNER)
network.trust_node(new_node, args)
with new_node.client() as c:
s = c.get("/node/state")
assert s.body.json()["node_id"] == new_node.node_id
assert (
s.body.json()["startup_seqno"] == 0
), "Node started without snapshot but reports startup seqno != 0"
# No promotion yet, check that the node is still a learner
s = c.get("/node/network/nodes/self")
assert s.body.json()["status"] == "Learner"
rj = s.body.json()
assert rj["status"] == "Learner" or rj["status"] == "Trusted"
s = c.get("/node/commit")
tx = s.body.json()["transaction_id"]
assert tx != "0.0" and tx != "2.0"
network.wait_for_node_commit_sync()
primary, _ = network.find_primary()
network.consortium.wait_for_node_to_exist_in_store(
primary,
new_node.node_id,
timeout=3,
node_status=(ccf.ledger.NodeStatus.TRUSTED),
)
with primary.client() as c:
s = c.get("/node/consensus")
rj = s.body.json()
assert new_node.node_id in rj["details"]["learners"]
assert len(rj["details"]["learners"]) == 0
# At this point, there should be two configurations. The active one
# without the learner and the following one, which cannot be
# activated without promoting the node.
assert len(rj["details"]["configs"]) == 2
# At this point, there should be exactly one configuration, which includes the new node.
print(rj)
assert len(rj["details"]["configs"]) == 1
c0 = rj["details"]["configs"][0]["nodes"]
c1 = rj["details"]["configs"][1]["nodes"]
assert len(c0) == num_nodes_before and new_node.node_id not in c0
assert len(c1) == num_nodes_before + 1 and new_node.node_id in c1
assert len(c0) == num_nodes_before + 1 and new_node.node_id in c0
return network
@ -400,22 +398,9 @@ def test_learner_does_not_take_part(network, args):
new_node = network.create_node("local://localhost")
network.join_node(new_node, args.package, args, from_snapshot=False)
network.trust_node(new_node, args, ccf.ledger.NodeStatus.LEARNER)
with new_node.client() as c:
s = c.get("/node/state")
assert s.body.json()["node_id"] == new_node.node_id
assert (
s.body.json()["startup_seqno"] == 0
), "Node started without snapshot but reports startup seqno != 0"
network.trust_node(new_node, args)
# No promotion yet, check that the node is still a learner
s = c.get("/node/network/nodes/self")
assert s.body.json()["status"] == "Learner"
s = c.get("/node/commit")
tx = s.body.json()["transaction_id"]
assert tx != "0.0" and tx != "2.0"
network.wait_for_node_commit_sync()
# No way to keep the new node suspended in learner state?
for b in f_backups:
b.suspend()
@ -467,7 +452,7 @@ def run(args):
test_retiring_nodes_emit_at_most_one_signature(network, args)
else:
test_learner_catches_up(network, args)
test_learner_does_not_take_part(network, args)
# test_learner_does_not_take_part(network, args)
test_retire_backup(network, args)