AFT: add an option to execute requests on the backup (#1540)

* When running with BFT mode we now execute requests on the backups
* Start the renaming process from Raft to AFT
* Change the network type from a template to an abstract base class that has normal and test implementation
This commit is contained in:
Alex 2020-09-04 08:49:02 +01:00 коммит произвёл GitHub
Родитель af0533fd08
Коммит 9bd7c0b2bc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
28 изменённых файлов: 1924 добавлений и 865 удалений

Просмотреть файл

@ -29,3 +29,4 @@ resources:
jobs:
- template: .azure-pipelines-templates/daily-matrix.yml

Просмотреть файл

@ -38,6 +38,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/cmake/preproject.cmake
)
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/common.cmake)
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/aft.cmake)
configure_file(
${CCF_DIR}/src/host/version.h.in ${CCF_GENERATED_DIR}/version.h @ONLY
@ -89,6 +90,7 @@ if("sgx" IN_LIST COMPILE_TARGETS)
http_parser.enclave
lua.enclave
secp256k1.enclave
aft.enclave
sss.enclave
)
@ -142,6 +144,7 @@ if("virtual" IN_LIST COMPILE_TARGETS)
http_parser.host
lua.host
secp256k1.host
aft.virtual
sss.host
# TODO: replace with openenclave::oehostverify once OE 0.11 is
# released (OE#3312)
@ -284,14 +287,14 @@ if(BUILD_TESTS)
)
add_unit_test(
raft_test ${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/raft/test/main.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/raft/test/term_history.cpp
raft_test ${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/aft/test/main.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/aft/test/term_history.cpp
)
target_link_libraries(raft_test PRIVATE ${CRYPTO_LIBRARY})
add_unit_test(
raft_enclave_test
${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/raft/test/enclave.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/aft/test/enclave.cpp
)
target_include_directories(raft_enclave_test PRIVATE ${CCFCRYPTO_INC})
target_link_libraries(
@ -443,11 +446,10 @@ if(BUILD_TESTS)
# Raft driver and scenario test
add_executable(
raft_driver
${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/raft/test/driver.cpp
raft_driver ${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/aft/test/driver.cpp
)
use_client_mbedtls(raft_driver)
target_include_directories(raft_driver PRIVATE src/raft)
target_include_directories(raft_driver PRIVATE src/aft)
add_test(
NAME raft_scenario_test

45
cmake/aft.cmake Normal file
Просмотреть файл

@ -0,0 +1,45 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
# AFT
set(AFT_SRC ${CMAKE_CURRENT_SOURCE_DIR}/src/consensus/aft/impl/execution.cpp)
if("sgx" IN_LIST COMPILE_TARGETS)
add_library(aft.enclave STATIC ${AFT_SRC})
target_compile_options(aft.enclave PRIVATE -nostdinc)
target_compile_definitions(
aft.enclave PRIVATE INSIDE_ENCLAVE _LIBCPP_HAS_THREAD_API_PTHREAD
__USE_SYSTEM_ENDIAN_H__
)
set_property(TARGET aft.enclave PROPERTY POSITION_INDEPENDENT_CODE ON)
target_include_directories(
aft.enclave PRIVATE ${CCF_DIR}/src/ds ${OE_TARGET_LIBC}
${PARSED_ARGS_INCLUDE_DIRS} ${EVERCRYPT_INC}
)
use_oe_mbedtls(aft.enclave)
install(
TARGETS aft.enclave
EXPORT ccf
DESTINATION lib
)
endif()
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
if("virtual" IN_LIST COMPILE_TARGETS)
add_library(aft.virtual STATIC ${AFT_SRC})
target_compile_options(aft.virtual PRIVATE -stdlib=libc++)
target_compile_definitions(
aft.virtual PUBLIC INSIDE_ENCLAVE VIRTUAL_ENCLAVE
_LIBCPP_HAS_THREAD_API_PTHREAD
)
set_property(TARGET aft.virtual PROPERTY POSITION_INDEPENDENT_CODE ON)
target_include_directories(aft.virtual PRIVATE SYSTEM ${EVERCRYPT_INC})
use_client_mbedtls(aft.virtual)
install(
TARGETS aft.virtual
EXPORT ccf
DESTINATION lib
)
endif()

Просмотреть файл

@ -0,0 +1,118 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "execution.h"
#include "consensus/aft/request.h"
#include "enclave/rpc_map.h"
#include "enclave/rpc_sessions.h"
#include "http/http_rpc_context.h"
#include "kv/tx.h"
#include "request_message.h"
namespace aft
{
std::unique_ptr<RequestCtx> ExecutorImpl::create_request_ctx(
uint8_t* req_start, size_t req_size)
{
pbft::Request request;
request.deserialise(req_start, req_size);
return create_request_ctx(request);
}
std::unique_ptr<RequestCtx> ExecutorImpl::create_request_ctx(
pbft::Request& request)
{
auto r_ctx = std::make_unique<RequestCtx>();
auto session = std::make_shared<enclave::SessionContext>(
enclave::InvalidSessionId, request.caller_id, request.caller_cert);
r_ctx->ctx = enclave::make_fwd_rpc_context(
session, request.raw, (enclave::FrameFormat)request.frame_format);
const auto actor_opt = http::extract_actor(*r_ctx->ctx);
if (!actor_opt.has_value())
{
throw std::logic_error(fmt::format(
"Failed to extract actor from PBFT request. Method is '{}'",
r_ctx->ctx->get_method()));
}
const auto& actor_s = actor_opt.value();
const auto actor = rpc_map->resolve(actor_s);
auto handler = rpc_map->find(actor);
if (!handler.has_value())
throw std::logic_error(
fmt::format("No frontend associated with actor {}", actor_s));
r_ctx->frontend = handler.value();
return r_ctx;
}
kv::Version ExecutorImpl::execute_request(
std::unique_ptr<RequestMessage> request, bool is_create_request)
{
std::shared_ptr<enclave::RpcContext>& ctx = request->get_request_ctx().ctx;
std::shared_ptr<enclave::RpcHandler>& frontend =
request->get_request_ctx().frontend;
ctx->pbft_raw.resize(request->size());
request->serialize_message(
NoNode, ctx->pbft_raw.data(), ctx->pbft_raw.size());
ctx->is_create_request = is_create_request;
ctx->set_apply_writes(true);
enclave::RpcHandler::ProcessPbftResp rep = frontend->process_pbft(ctx);
frontend->update_merkle_tree();
request->callback(rep.result);
return rep.version;
}
std::unique_ptr<aft::RequestMessage> ExecutorImpl::create_request_message(
const kv::TxHistory::RequestCallbackArgs& args)
{
Request request = {
args.caller_id, args.caller_cert, args.request, {}, args.frame_format};
auto serialized_req = request.serialise();
auto rep_cb = [=](
void*,
kv::TxHistory::RequestID caller_rid,
int status,
std::vector<uint8_t>& data) {
LOG_DEBUG_FMT("AFT reply callback status {}", status);
return rpc_sessions->reply_async(std::get<1>(caller_rid), data);
};
auto ctx = create_request_ctx(serialized_req.data(), serialized_req.size());
return std::make_unique<RequestMessage>(
std::move(serialized_req), args.rid, std::move(ctx), rep_cb);
}
kv::Version ExecutorImpl::commit_replayed_request(kv::Tx& tx)
{
auto tx_view = tx.get_view(pbft_requests_map);
auto req_v = tx_view->get(0);
CCF_ASSERT(
req_v.has_value(),
"Deserialised request but it was not found in the requests map");
pbft::Request request = req_v.value();
auto ctx = create_request_ctx(request);
auto request_message = RequestMessage::deserialize(
request.pbft_raw.data(),
request.pbft_raw.size(),
std::move(ctx),
nullptr);
return execute_request(std::move(request_message), state->commit_idx == 0);
}
}

Просмотреть файл

@ -0,0 +1,79 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/aft/raft_types.h"
#include "consensus/pbft/pbft_requests.h"
#include "enclave/rpc_map.h"
#include "state.h"
namespace enclave
{
class RPCSessions;
class RPCMap;
}
namespace aft
{
class RequestMessage;
struct RequestCtx
{
std::shared_ptr<enclave::RpcContext> ctx;
std::shared_ptr<enclave::RpcHandler> frontend;
};
class Executor
{
public:
virtual ~Executor() = default;
virtual std::unique_ptr<RequestCtx> create_request_ctx(
uint8_t* req_start, size_t req_size) = 0;
virtual std::unique_ptr<RequestCtx> create_request_ctx(
pbft::Request& request) = 0;
virtual kv::Version execute_request(
std::unique_ptr<RequestMessage> request, bool is_create_request) = 0;
virtual std::unique_ptr<aft::RequestMessage> create_request_message(
const kv::TxHistory::RequestCallbackArgs& args) = 0;
virtual kv::Version commit_replayed_request(kv::Tx& tx) = 0;
};
class ExecutorImpl : public Executor
{
public:
ExecutorImpl(
pbft::RequestsMap& pbft_requests_map_,
std::shared_ptr<State> state_,
std::shared_ptr<enclave::RPCMap> rpc_map_,
std::shared_ptr<enclave::RPCSessions> rpc_sessions_) :
pbft_requests_map(pbft_requests_map_),
state(state_),
rpc_map(rpc_map_),
rpc_sessions(rpc_sessions_)
{}
std::unique_ptr<RequestCtx> create_request_ctx(
uint8_t* req_start, size_t req_size) override;
std::unique_ptr<RequestCtx> create_request_ctx(
pbft::Request& request) override;
kv::Version execute_request(
std::unique_ptr<RequestMessage> request, bool is_create_request) override;
std::unique_ptr<aft::RequestMessage> create_request_message(
const kv::TxHistory::RequestCallbackArgs& args) override;
kv::Version commit_replayed_request(kv::Tx& tx) override;
private:
pbft::RequestsMap& pbft_requests_map;
std::shared_ptr<State> state;
std::shared_ptr<enclave::RPCMap> rpc_map;
std::shared_ptr<enclave::RPCSessions> rpc_sessions;
};
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include <cstdint>
#include <limits>
namespace aft
{
class AbstractMessage
{
public:
AbstractMessage() = default;
virtual ~AbstractMessage() = default;
virtual bool should_encrypt() const = 0;
virtual void serialize_message(
aft::NodeId from_node, uint8_t* data, size_t size) const = 0;
virtual size_t size() const = 0;
};
}

Просмотреть файл

@ -0,0 +1,109 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/aft/raft_types.h"
#include "ds/serialized.h"
#include "kv/kv_types.h"
#include "message.h"
#include <memory>
#include <vector>
namespace aft
{
// Request messages have the following format.
#pragma pack(push)
#pragma pack(1)
struct RequestMessageRep : public consensus::ConsensusHeader<RaftMsgType>
{
RequestMessageRep() = default;
RequestMessageRep(
aft::NodeId from_node,
uint16_t command_size_,
uint16_t session_id_,
kv::TxHistory::RequestID rid_) :
consensus::ConsensusHeader<RaftMsgType>(
RaftMsgType::bft_request, from_node),
command_size(command_size_),
session_id(session_id_),
rid(rid_)
{}
uint16_t command_size;
uint16_t session_id; // unique id of client who sends the request
kv::TxHistory::RequestID rid; // unique request identifier
};
#pragma pack(pop)
class RequestMessage : public AbstractMessage
{
public:
RequestMessage(
std::vector<uint8_t> request_,
kv::TxHistory::RequestID rid_,
std::unique_ptr<RequestCtx> ctx_,
ReplyCallback cb_) :
request(std::move(request_)),
rid(rid_),
ctx(std::move(ctx_)),
cb(std::move(cb_))
{}
bool should_encrypt() const override
{
return true;
}
RequestCtx& get_request_ctx() const
{
return *ctx;
}
void callback(std::vector<uint8_t>& data)
{
if (cb != nullptr)
{
cb(nullptr, rid, 0, data);
}
}
void serialize_message(
aft::NodeId from_node, uint8_t* data, size_t size) const override
{
RequestMessageRep rep(from_node, request.size(), 0, rid);
serialized::write(
data,
size,
reinterpret_cast<uint8_t*>(&rep),
sizeof(RequestMessageRep));
serialized::write(data, size, request.data(), request.size());
CCF_ASSERT(size == 0, "allocated buffer is too large");
}
static std::unique_ptr<RequestMessage> deserialize(
const uint8_t* data,
size_t size,
std::unique_ptr<RequestCtx> ctx,
ReplyCallback cb)
{
auto rep = serialized::read<RequestMessageRep>(data, size);
std::vector<uint8_t> request =
serialized::read(data, size, rep.command_size);
return std::make_unique<RequestMessage>(
std::move(request), rep.rid, std::move(ctx), std::move(cb));
}
size_t size() const override
{
return sizeof(RequestMessageRep) + request.size();
}
private:
std::vector<uint8_t> request;
kv::TxHistory::RequestID rid;
std::unique_ptr<RequestCtx> ctx;
ReplyCallback cb;
};
}

Просмотреть файл

@ -0,0 +1,108 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "ds/logger.h"
#include "ds/spin_lock.h"
#include "kv/kv_types.h"
#include "node/rpc/tx_status.h"
#include "tls/key_pair.h"
#include "tls/verifier.h"
#include <map>
#include <set>
namespace aft
{
class ViewHistory
{
// Entry i stores the first version in view i+1
std::vector<kv::Version> views;
public:
static constexpr kv::Consensus::View InvalidView = ccf::VIEW_UNKNOWN;
void initialise(const std::vector<kv::Version>& terms_)
{
views.clear();
for (size_t i = 0; i < terms_.size(); ++i)
{
update(terms_[i], i + 1);
}
LOG_DEBUG_FMT("Initialised views: {}", fmt::join(views, ", "));
}
void update(kv::Version idx, kv::Consensus::View view)
{
LOG_DEBUG_FMT("Updating view to: {} at version: {}", view, idx);
if (!views.empty())
{
const auto current_latest_index = views.back();
if (idx < current_latest_index)
{
throw std::logic_error(fmt::format(
"version must not move backwards ({} < {})",
idx,
current_latest_index));
}
}
for (int64_t i = views.size(); i < view; ++i)
{
views.push_back(idx);
}
LOG_DEBUG_FMT("Resulting views: {}", fmt::join(views, ", "));
}
kv::Consensus::View term_at(kv::Version idx)
{
auto it = upper_bound(views.begin(), views.end(), idx);
// Indices before the version of the first view are unknown
if (it == views.begin())
{
return InvalidView;
}
return (it - views.begin());
}
};
class Replica
{
public:
Replica(kv::NodeId id_, const std::vector<uint8_t>& cert_) :
id(id_),
verifier(tls::make_unique_verifier(cert_))
{}
kv::NodeId get_id() const
{
return id;
}
private:
kv::NodeId id;
tls::VerifierUniquePtr verifier;
};
struct State
{
State(kv::NodeId my_node_id_) :
my_node_id(my_node_id_),
current_view(0),
last_idx(0),
commit_idx(0)
{}
SpinLock lock;
std::map<kv::NodeId, std::shared_ptr<Replica>> configuration;
kv::NodeId my_node_id;
kv::Consensus::View current_view;
kv::Version last_idx;
kv::Version commit_idx;
ViewHistory view_history;
};
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -4,45 +4,45 @@
#include "kv/kv_types.h"
#include "raft.h"
#include "request.h"
#include <memory>
namespace raft
namespace aft
{
// This class acts as an adapter between the generic Consensus API and
// the Raft API, allowing for a mapping between the generic consensus
// terminology and the terminology that is specific to Raft
// the AFT API, allowing for a mapping between the generic consensus
// terminology and the terminology that is specific to AFT
template <class... T>
class RaftConsensus : public kv::Consensus
class Consensus : public kv::Consensus
{
private:
std::unique_ptr<Raft<T...>> raft;
std::unique_ptr<Aft<T...>> aft;
ConsensusType consensus_type;
bool is_open;
public:
RaftConsensus(
std::unique_ptr<Raft<T...>> raft_, ConsensusType consensus_type_) :
Consensus(raft_->id()),
raft(std::move(raft_)),
Consensus(std::unique_ptr<Aft<T...>> raft_, ConsensusType consensus_type_) :
kv::Consensus(raft_->id()),
aft(std::move(raft_)),
consensus_type(consensus_type_),
is_open(false)
{}
bool is_primary() override
{
return raft->is_leader();
return aft->is_leader();
}
bool is_backup() override
{
return raft->is_follower();
return aft->is_follower();
}
void force_become_primary() override
{
raft->force_become_leader();
aft->force_become_leader();
}
void force_become_primary(
@ -51,68 +51,68 @@ namespace raft
const std::vector<kv::Version>& terms,
SeqNo commit_seqno) override
{
raft->force_become_leader(seqno, view, terms, commit_seqno);
aft->force_become_leader(seqno, view, terms, commit_seqno);
}
void init_as_backup(SeqNo seqno, View view) override
{
raft->init_as_follower(seqno, view);
aft->init_as_follower(seqno, view);
}
bool replicate(const kv::BatchVector& entries, View view) override
{
return raft->replicate(entries, view);
return aft->replicate(entries, view);
}
std::pair<View, SeqNo> get_committed_txid() override
{
return raft->get_commit_term_and_idx();
return aft->get_commit_term_and_idx();
}
View get_view(SeqNo seqno) override
{
return raft->get_term(seqno);
return aft->get_term(seqno);
}
View get_view() override
{
return raft->get_term();
return aft->get_term();
}
SeqNo get_committed_seqno() override
{
return raft->get_commit_idx();
return aft->get_commit_idx();
}
NodeId primary() override
{
return raft->leader();
return aft->leader();
}
void recv_message(OArray&& data) override
{
return raft->recv_message(data.data(), data.size());
return aft->recv_message(std::move(data));
}
void add_configuration(
SeqNo seqno, const Configuration::Nodes& conf) override
{
raft->add_configuration(seqno, conf);
aft->add_configuration(seqno, conf);
}
Configuration::Nodes get_latest_configuration() const override
{
return raft->get_latest_configuration();
return aft->get_latest_configuration();
}
void periodic(std::chrono::milliseconds elapsed) override
{
raft->periodic(elapsed);
aft->periodic(elapsed);
}
void enable_all_domains() override
{
raft->enable_all_domains();
aft->enable_all_domains();
}
void open_network() override
@ -121,16 +121,11 @@ namespace raft
return;
}
void emit_signature() override
{
throw std::logic_error(
"Method should not be called when using raft consensus");
}
void emit_signature() override {}
bool on_request(const kv::TxHistory::RequestCallbackArgs&) override
bool on_request(const kv::TxHistory::RequestCallbackArgs& args) override
{
throw ccf::ccf_logic_error("Not implemented");
return true;
return aft->on_request(args);
}
ConsensusType type() override

Просмотреть файл

@ -6,7 +6,7 @@
#include <unordered_set>
namespace raft
namespace aft
{
static constexpr auto replicate_type_raft = kv::ReplicateType::ALL;
static const std::unordered_set<std::string> replicated_tables_raft = {};

Просмотреть файл

@ -4,18 +4,27 @@
#include "consensus/consensus_types.h"
#include "ds/ring_buffer_types.h"
#include "enclave/rpc_context.h"
#include "enclave/rpc_handler.h"
#include "kv/kv_types.h"
#include <chrono>
#include <cstdint>
#include <limits>
namespace raft
namespace aft
{
using Index = int64_t;
using Term = int64_t;
using NodeId = uint64_t;
using Node2NodeMsg = uint64_t;
using ReplyCallback = std::function<bool(
void* owner,
kv::TxHistory::RequestID caller_rid,
int status,
std::vector<uint8_t>& data)>;
static constexpr NodeId NoNode = std::numeric_limits<NodeId>::max();
template <typename S>
@ -30,6 +39,11 @@ namespace raft
virtual void compact(Index v) = 0;
virtual void rollback(Index v, std::optional<Term> t = std::nullopt) = 0;
virtual void set_term(Term t) = 0;
virtual S deserialise_views(
const std::vector<uint8_t>& data,
bool public_only = false,
kv::Term* term = nullptr,
kv::Tx* tx = nullptr) = 0;
};
template <typename T, typename S>
@ -80,6 +94,18 @@ namespace raft
p->set_term(t);
}
}
S deserialise_views(
const std::vector<uint8_t>& data,
bool public_only = false,
kv::Term* term = nullptr,
kv::Tx* tx = nullptr) override
{
auto p = x.lock();
if (p)
return p->deserialise_views(data, public_only, term, tx);
return S::FAILED;
}
};
enum RaftMsgType : Node2NodeMsg
@ -88,6 +114,8 @@ namespace raft
raft_append_entries_response,
raft_request_vote,
raft_request_vote_response,
bft_request,
};
#pragma pack(push, 1)

Просмотреть файл

@ -0,0 +1,76 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "ds/json.h"
#include "kv/map.h"
#include "node/entities.h"
#include <msgpack/msgpack.hpp>
#include <vector>
namespace aft
{
struct Request
{
uint64_t caller_id;
std::vector<uint8_t> caller_cert;
std::vector<uint8_t> raw;
std::vector<uint8_t> pbft_raw;
uint8_t frame_format = enclave::FrameFormat::http;
MSGPACK_DEFINE(caller_id, caller_cert, raw, pbft_raw, frame_format);
std::vector<uint8_t> serialise()
{
bool include_caller = false;
size_t size = sizeof(caller_id) + sizeof(bool) + sizeof(size_t) +
raw.size() + sizeof(size_t) + sizeof(enclave::FrameFormat) +
pbft_raw.size();
if (!caller_cert.empty())
{
size += sizeof(size_t) + caller_cert.size();
include_caller = true;
}
std::vector<uint8_t> serialized_req(size);
auto data_ = serialized_req.data();
auto size_ = serialized_req.size();
serialized::write(data_, size_, caller_id);
serialized::write(data_, size_, include_caller);
if (include_caller)
{
serialized::write(data_, size_, caller_cert.size());
serialized::write(data_, size_, caller_cert.data(), caller_cert.size());
}
serialized::write(data_, size_, raw.size());
serialized::write(data_, size_, raw.data(), raw.size());
serialized::write(data_, size_, pbft_raw.size());
serialized::write(data_, size_, pbft_raw.data(), pbft_raw.size());
serialized::write(data_, size_, frame_format);
return serialized_req;
}
void deserialise(const uint8_t* data_, size_t size_)
{
caller_id = serialized::read<uint64_t>(data_, size_);
auto includes_caller = serialized::read<bool>(data_, size_);
if (includes_caller)
{
auto caller_size = serialized::read<size_t>(data_, size_);
caller_cert = serialized::read(data_, size_, caller_size);
}
auto raw_size = serialized::read<size_t>(data_, size_);
raw = serialized::read(data_, size_, raw_size);
auto pbft_raw_size = serialized::read<size_t>(data_, size_);
pbft_raw = serialized::read(data_, size_, pbft_raw_size);
frame_format = serialized::read<enclave::FrameFormat>(data_, size_);
}
};
DECLARE_JSON_TYPE(Request);
DECLARE_JSON_REQUIRED_FIELDS(
Request, caller_id, caller_cert, raw, pbft_raw, frame_format);
}

Просмотреть файл

Просмотреть файл

@ -2,7 +2,7 @@
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/raft/raft.h"
#include "consensus/aft/raft.h"
#include "ds/logger.h"
#include <chrono>
@ -16,10 +16,14 @@
#include "logging_stub.h"
using ms = std::chrono::milliseconds;
using TRaft = raft::
Raft<raft::LedgerStubProxy, raft::ChannelStubProxy, raft::StubSnapshotter>;
using Store = raft::LoggingStubStore;
using Adaptor = raft::Adaptor<Store, kv::DeserialiseSuccess>;
using TRaft =
aft::Aft<aft::LedgerStubProxy, aft::ChannelStubProxy, aft::StubSnapshotter>;
using Store = aft::LoggingStubStore;
using Adaptor = aft::Adaptor<Store, kv::DeserialiseSuccess>;
std::vector<uint8_t> cert;
kv::Map<size_t, pbft::Request> request_map(
nullptr, "test", kv::SecurityDomain::PUBLIC, true);
class RaftDriver
{
@ -30,8 +34,8 @@ private:
std::shared_ptr<TRaft> raft;
};
std::unordered_map<raft::NodeId, NodeDriver> _nodes;
std::set<std::pair<raft::NodeId, raft::NodeId>> _connections;
std::unordered_map<aft::NodeId, NodeDriver> _nodes;
std::set<std::pair<aft::NodeId, aft::NodeId>> _connections;
public:
RaftDriver(size_t number_of_nodes)
@ -40,15 +44,21 @@ public:
for (size_t i = 0; i < number_of_nodes; ++i)
{
raft::NodeId node_id = i;
aft::NodeId node_id = i;
auto kv = std::make_shared<Store>(node_id);
auto raft = std::make_shared<TRaft>(
ConsensusType::RAFT,
std::make_unique<Adaptor>(kv),
std::make_unique<raft::LedgerStubProxy>(node_id),
std::make_shared<raft::ChannelStubProxy>(),
std::make_shared<raft::StubSnapshotter>(),
node_id,
std::make_unique<aft::LedgerStubProxy>(node_id),
std::make_shared<aft::ChannelStubProxy>(),
std::make_shared<aft::StubSnapshotter>(),
nullptr,
nullptr,
cert,
request_map,
std::make_shared<aft::State>(node_id),
nullptr,
ms(10),
ms(i * 100));
@ -62,20 +72,20 @@ public:
}
}
void log(raft::NodeId first, raft::NodeId second, const std::string& message)
void log(aft::NodeId first, aft::NodeId second, const std::string& message)
{
std::cout << " Node" << first << "->>"
<< "Node" << second << ": " << message << std::endl;
}
void rlog(raft::NodeId first, raft::NodeId second, const std::string& message)
void rlog(aft::NodeId first, aft::NodeId second, const std::string& message)
{
std::cout << " Node" << first << "-->>"
<< "Node" << second << ": " << message << std::endl;
}
void log_msg_details(
raft::NodeId node_id, raft::NodeId tgt_node_id, raft::RequestVote rv)
aft::NodeId node_id, aft::NodeId tgt_node_id, aft::RequestVote rv)
{
std::ostringstream s;
s << "request_vote t: " << rv.term << ", lli: " << rv.last_commit_idx
@ -84,9 +94,7 @@ public:
}
void log_msg_details(
raft::NodeId node_id,
raft::NodeId tgt_node_id,
raft::RequestVoteResponse rv)
aft::NodeId node_id, aft::NodeId tgt_node_id, aft::RequestVoteResponse rv)
{
std::ostringstream s;
s << "request_vote_response t: " << rv.term << ", vg: " << rv.vote_granted;
@ -94,7 +102,7 @@ public:
}
void log_msg_details(
raft::NodeId node_id, raft::NodeId tgt_node_id, raft::AppendEntries ae)
aft::NodeId node_id, aft::NodeId tgt_node_id, aft::AppendEntries ae)
{
std::ostringstream s;
s << "append_entries i: " << ae.idx << ", t: " << ae.term
@ -104,9 +112,9 @@ public:
}
void log_msg_details(
raft::NodeId node_id,
raft::NodeId tgt_node_id,
raft::AppendEntriesResponse aer)
aft::NodeId node_id,
aft::NodeId tgt_node_id,
aft::AppendEntriesResponse aer)
{
std::ostringstream s;
s << "append_entries_response t: " << aer.term
@ -114,7 +122,7 @@ public:
rlog(node_id, tgt_node_id, s.str());
}
void connect(raft::NodeId first, raft::NodeId second)
void connect(aft::NodeId first, aft::NodeId second)
{
std::cout << " Node" << first << "-->Node" << second << ": connect"
<< std::endl;
@ -122,7 +130,7 @@ public:
_connections.insert(std::make_pair(second, first));
}
void periodic_one(raft::NodeId node_id, ms ms_)
void periodic_one(aft::NodeId node_id, ms ms_)
{
std::ostringstream s;
s << "periodic for " << std::to_string(ms_.count()) << " ms";
@ -138,7 +146,7 @@ public:
}
}
void state_one(raft::NodeId node_id)
void state_one(aft::NodeId node_id)
{
std::cout << " Note right of Node" << node_id << ": ";
auto raft = _nodes.at(node_id).raft;
@ -159,7 +167,7 @@ public:
}
template <class Messages>
size_t dispatch_one_queue(raft::NodeId node_id, Messages& messages)
size_t dispatch_one_queue(aft::NodeId node_id, Messages& messages)
{
size_t count = 0;
@ -185,13 +193,23 @@ public:
return count;
}
void dispatch_one(raft::NodeId node_id)
void dispatch_one(aft::NodeId node_id)
{
auto raft = _nodes.at(node_id).raft;
dispatch_one_queue(node_id, raft->channels->sent_request_vote);
dispatch_one_queue(node_id, raft->channels->sent_request_vote_response);
dispatch_one_queue(node_id, raft->channels->sent_append_entries);
dispatch_one_queue(node_id, raft->channels->sent_append_entries_response);
dispatch_one_queue(
node_id,
((aft::ChannelStubProxy*)raft->channels.get())->sent_request_vote);
dispatch_one_queue(
node_id,
((aft::ChannelStubProxy*)raft->channels.get())
->sent_request_vote_response);
dispatch_one_queue(
node_id,
((aft::ChannelStubProxy*)raft->channels.get())->sent_append_entries);
dispatch_one_queue(
node_id,
((aft::ChannelStubProxy*)raft->channels.get())
->sent_append_entries_response);
}
void dispatch_all_once()
@ -210,7 +228,9 @@ public:
_nodes.end(),
0,
[](int acc, auto& node) {
return node.second.raft->channels->sent_msg_count() + acc;
return ((aft::ChannelStubProxy*)node.second.raft->channels.get())
->sent_msg_count() +
acc;
}) &&
iterations++ < 5)
{
@ -219,8 +239,8 @@ public:
}
void replicate(
raft::NodeId node_id,
raft::Index idx,
aft::NodeId node_id,
aft::Index idx,
std::shared_ptr<std::vector<uint8_t>> data)
{
std::cout << " KV" << node_id << "->>Node" << node_id
@ -228,7 +248,7 @@ public:
_nodes.at(node_id).raft->replicate(kv::BatchVector{{idx, data, true}}, 1);
}
void disconnect(raft::NodeId left, raft::NodeId right)
void disconnect(aft::NodeId left, aft::NodeId right)
{
bool noop = true;
auto ltr = std::make_pair(left, right);
@ -250,7 +270,7 @@ public:
}
}
void disconnect_node(raft::NodeId node_id)
void disconnect_node(aft::NodeId node_id)
{
for (auto& node : _nodes)
{
@ -261,7 +281,7 @@ public:
}
}
void reconnect(raft::NodeId left, raft::NodeId right)
void reconnect(aft::NodeId left, aft::NodeId right)
{
std::cout << " Node" << left << "-->Node" << right << ": reconnect"
<< std::endl;
@ -269,7 +289,7 @@ public:
_connections.insert(std::make_pair(right, left));
}
void reconnect_node(raft::NodeId node_id)
void reconnect_node(aft::NodeId node_id)
{
for (auto& node : _nodes)
{

Просмотреть файл

Просмотреть файл

@ -2,14 +2,14 @@
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/raft/raft.h"
#include "consensus/raft/raft_types.h"
#include "consensus/aft/raft.h"
#include "consensus/aft/raft_types.h"
#include <map>
#include <optional>
#include <vector>
namespace raft
namespace aft
{
class LedgerStubProxy
{
@ -68,7 +68,7 @@ namespace raft
void commit(Index idx) {}
};
class ChannelStubProxy
class ChannelStubProxy : public ccf::NodeToNode
{
public:
// Capture what is being sent out
@ -84,44 +84,43 @@ namespace raft
void create_channel(
NodeId peer_id,
const std::string& peer_hostname,
const std::string& peer_service)
const std::string& peer_service) override
{}
void destroy_channel(NodeId peer_id) {}
void destroy_channel(NodeId peer_id) override {}
void destroy_all_channels() {}
void destroy_all_channels() override {}
void close_all_outgoing() {}
bool send_authenticated(
const ccf::NodeMsgType& msg_type, NodeId to, const RequestVote& data)
{
sent_request_vote.push_back(std::make_pair(to, data));
return true;
}
bool send_authenticated(
const ccf::NodeMsgType& msg_type, NodeId to, const AppendEntries& data)
{
sent_append_entries.push_back(std::make_pair(to, data));
return true;
}
void close_all_outgoing() override {}
bool send_authenticated(
const ccf::NodeMsgType& msg_type,
NodeId to,
const RequestVoteResponse& data)
const uint8_t* data,
size_t size) override
{
sent_request_vote_response.push_back(std::make_pair(to, data));
return true;
}
switch (serialized::peek<RaftMsgType>(data, size))
{
case aft::RaftMsgType::raft_append_entries:
sent_append_entries.push_back(
std::make_pair(to, *(AppendEntries*)(data)));
break;
case aft::RaftMsgType::raft_request_vote:
sent_request_vote.push_back(
std::make_pair(to, *(RequestVote*)(data)));
break;
case aft::RaftMsgType::raft_request_vote_response:
sent_request_vote_response.push_back(
std::make_pair(to, *(RequestVoteResponse*)(data)));
break;
case aft::RaftMsgType::raft_append_entries_response:
sent_append_entries_response.push_back(
std::make_pair(to, *(AppendEntriesResponse*)(data)));
break;
default:
throw std::logic_error("unexpected response type");
}
bool send_authenticated(
const ccf::NodeMsgType& msg_type,
NodeId to,
const AppendEntriesResponse& data)
{
sent_append_entries_response.push_back(std::make_pair(to, data));
return true;
}
@ -131,20 +130,39 @@ namespace raft
sent_append_entries.size() + sent_append_entries_response.size();
}
template <class T>
const T& recv_authenticated(const uint8_t*& data, size_t& size)
bool recv_authenticated(
NodeId from_node, CBuffer cb, const uint8_t*& data, size_t& size) override
{
return serialized::overlay<T>(data, size);
return true;
}
void recv_message(OArray&& oa) override {}
void initialize(NodeId self_id, const tls::Pem& network_pkey) override {}
bool send_encrypted(
const ccf::NodeMsgType& msg_type,
CBuffer cb,
NodeId to,
const std::vector<uint8_t>& data) override
{
return true;
}
std::vector<uint8_t> recv_encrypted(
NodeId from_node, CBuffer cb, const uint8_t* data, size_t size) override
{
return {};
}
};
class LoggingStubStore
{
private:
raft::NodeId _id;
aft::NodeId _id;
public:
LoggingStubStore(raft::NodeId id) : _id(id) {}
LoggingStubStore(aft::NodeId id) : _id(id) {}
virtual void compact(Index i)
{
@ -179,12 +197,26 @@ namespace raft
{
return kv::DeserialiseSuccess::PASS;
}
kv::Version current_version()
{
return kv::NoVersion;
}
virtual kv::DeserialiseSuccess deserialise_views(
const std::vector<uint8_t>& data,
bool public_only = false,
kv::Term* term = nullptr,
kv::Tx* tx = nullptr)
{
return kv::DeserialiseSuccess::PASS;
}
};
class LoggingStubStoreSig : public LoggingStubStore
{
public:
LoggingStubStoreSig(raft::NodeId id) : LoggingStubStore(id) {}
LoggingStubStoreSig(aft::NodeId id) : LoggingStubStore(id) {}
kv::DeserialiseSuccess deserialise(
const std::vector<uint8_t>& data,

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,36 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "consensus/raft/raft.h"
#include "consensus/aft/raft.h"
#include <doctest/doctest.h>
using namespace raft;
using namespace aft;
TEST_CASE("Advancing term history" * doctest::test_suite("termhistory"))
{
TermHistory history;
ViewHistory history;
{
INFO("Initial history is completely unknown");
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(2) == TermHistory::InvalidTerm);
CHECK(history.term_at(3) == TermHistory::InvalidTerm);
CHECK(history.term_at(4) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == ViewHistory::InvalidView);
CHECK(history.term_at(3) == ViewHistory::InvalidView);
CHECK(history.term_at(4) == ViewHistory::InvalidView);
}
{
INFO("Advancing index gives term for current and future indices");
history.update(1, 1);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == 1);
CHECK(history.term_at(2) == 1);
CHECK(history.term_at(3) == 1);
CHECK(history.term_at(4) == 1);
history.update(2, 1);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == 1);
CHECK(history.term_at(2) == 1);
CHECK(history.term_at(3) == 1);
@ -40,14 +40,14 @@ TEST_CASE("Advancing term history" * doctest::test_suite("termhistory"))
{
INFO("Advancing term increases term of affected indices");
history.update(3, 2);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == 1);
CHECK(history.term_at(2) == 1);
CHECK(history.term_at(3) == 2);
CHECK(history.term_at(4) == 2);
history.update(4, 3);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == 1);
CHECK(history.term_at(2) == 1);
CHECK(history.term_at(3) == 2);
@ -59,22 +59,22 @@ TEST_CASE("Edge case term histories" * doctest::test_suite("termhistory"))
{
{
INFO("Index skips leave unknown indices");
TermHistory history;
ViewHistory history;
history.update(3, 1);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(2) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == ViewHistory::InvalidView);
CHECK(history.term_at(3) == 1);
CHECK(history.term_at(4) == 1);
}
{
INFO("Term skips advance to given term");
TermHistory history;
ViewHistory history;
history.update(3, 2);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(2) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == ViewHistory::InvalidView);
CHECK(history.term_at(3) == 2);
CHECK(history.term_at(4) == 2);
}
@ -82,10 +82,10 @@ TEST_CASE("Edge case term histories" * doctest::test_suite("termhistory"))
{
INFO(
"Subsequent calls on same term must not move backward from term start");
TermHistory history;
ViewHistory history;
history.update(2, 2);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == 2);
CHECK(history.term_at(3) == 2);
CHECK(history.term_at(4) == 2);
@ -94,8 +94,8 @@ TEST_CASE("Edge case term histories" * doctest::test_suite("termhistory"))
CHECK_NOTHROW(history.update(3, 2));
CHECK_NOTHROW(history.update(2, 2));
CHECK_NOTHROW(history.update(4, 2));
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == 2);
CHECK(history.term_at(3) == 2);
CHECK(history.term_at(4) == 2);
@ -105,24 +105,24 @@ TEST_CASE("Edge case term histories" * doctest::test_suite("termhistory"))
{
INFO("Highest matching term is returned");
TermHistory history;
ViewHistory history;
history.update(2, 2);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == 2);
CHECK(history.term_at(3) == 2);
CHECK(history.term_at(4) == 2);
history.update(2, 4);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == 4);
CHECK(history.term_at(3) == 4);
CHECK(history.term_at(4) == 4);
history.update(2, 3);
CHECK(history.term_at(0) == TermHistory::InvalidTerm);
CHECK(history.term_at(1) == TermHistory::InvalidTerm);
CHECK(history.term_at(0) == ViewHistory::InvalidView);
CHECK(history.term_at(1) == ViewHistory::InvalidView);
CHECK(history.term_at(2) == 4);
CHECK(history.term_at(3) == 4);
CHECK(history.term_at(4) == 4);
@ -133,7 +133,7 @@ TEST_CASE("Initialised term histories" * doctest::test_suite("termhistory"))
{
{
INFO("Initialise validates the given term history");
TermHistory history;
ViewHistory history;
CHECK_NOTHROW(history.initialise({}));
CHECK_NOTHROW(history.initialise({1}));
CHECK_NOTHROW(history.initialise({2}));
@ -147,17 +147,17 @@ TEST_CASE("Initialised term histories" * doctest::test_suite("termhistory"))
{
INFO("Initialise overwrites term history");
TermHistory history;
ViewHistory history;
history.update(5, 1);
history.update(10, 2);
history.update(20, 3);
CHECK(history.term_at(4) == TermHistory::InvalidTerm);
CHECK(history.term_at(4) == ViewHistory::InvalidView);
CHECK(history.term_at(8) == 1);
CHECK(history.term_at(19) == 2);
CHECK(history.term_at(20) == 3);
history.initialise({6});
CHECK(history.term_at(4) == TermHistory::InvalidTerm);
CHECK(history.term_at(4) == ViewHistory::InvalidView);
CHECK(history.term_at(8) == 1);
CHECK(history.term_at(19) == 1);
CHECK(history.term_at(20) == 1);

Просмотреть файл

@ -26,6 +26,12 @@ namespace consensus
template <typename T>
struct ConsensusHeader
{
ConsensusHeader() = default;
ConsensusHeader(T msg_, ccf::NodeId from_node_) :
msg(msg_),
from_node(from_node_)
{}
T msg;
ccf::NodeId from_node;
};

Просмотреть файл

@ -4,6 +4,7 @@
//#define USE_MPSCQ
#include "ds/ccf_assert.h"
#include "ds/logger.h"
#include "ds/thread_ids.h"
#ifdef USE_MPSCQ
@ -11,6 +12,7 @@
#endif
#include <atomic>
#include <chrono>
#include <cstddef>
namespace threading
@ -57,6 +59,25 @@ namespace threading
ThreadMsg* local_msg = nullptr;
#endif
struct TimerEntry
{
std::chrono::milliseconds time_offset;
uint64_t counter;
};
struct TimerEntryCompare
{
bool operator()(const TimerEntry& lhs, const TimerEntry& rhs) const
{
if (lhs.time_offset != rhs.time_offset)
{
return lhs.time_offset < rhs.time_offset;
}
return lhs.counter < rhs.counter;
}
};
public:
Task()
{
@ -117,7 +138,43 @@ namespace threading
#endif
}
TimerEntry add_task_after(
std::unique_ptr<ThreadMsg> item, std::chrono::milliseconds ms)
{
TimerEntry entry = {time_offset + ms, time_entry_counter++};
timer_map.emplace(entry, std::move(item));
return entry;
}
bool cancel_timer_task(TimerEntry timer_entry)
{
auto num_erased = timer_map.erase(timer_entry);
CCF_ASSERT(num_erased <= 1, "Too many items erased");
return num_erased != 0;
}
void tick(std::chrono::milliseconds elapsed)
{
time_offset += elapsed;
while (!timer_map.empty() &&
timer_map.begin()->first.time_offset <= time_offset)
{
auto it = timer_map.begin();
auto& cb = it->second->cb;
auto msg = std::move(it->second);
timer_map.erase(it);
cb(std::move(msg));
}
}
private:
std::chrono::milliseconds time_offset;
uint64_t time_entry_counter = 0;
std::map<TimerEntry, std::unique_ptr<ThreadMsg>, TimerEntryCompare>
timer_map;
#ifndef USE_MPSCQ
void reverse_local_messages()
{
@ -236,6 +293,39 @@ namespace threading
task.add_task(reinterpret_cast<ThreadMsg*>(msg.release()));
}
template <typename Payload>
void add_task_after(
std::unique_ptr<Tmsg<Payload>> msg, std::chrono::milliseconds ms)
{
Task& task = tasks[get_current_thread_id()];
task.add_task_after(std::move(msg), ms);
}
void tick(std::chrono::milliseconds elapsed)
{
struct TickMsg
{
TickMsg(std::chrono::milliseconds elapsed_, Task& task_) :
elapsed(elapsed_),
task(task_)
{}
std::chrono::milliseconds elapsed;
Task& task;
};
for (auto& task : tasks)
{
auto msg = std::make_unique<Tmsg<TickMsg>>(
[](std::unique_ptr<Tmsg<TickMsg>> msg) {
msg->data.task.tick(msg->data.elapsed);
},
elapsed,
task);
task.add_task(msg.release());
}
}
static uint16_t get_execution_thread(uint32_t i)
{
uint16_t tid = MAIN_THREAD_ID;

Просмотреть файл

@ -15,7 +15,6 @@
#include "node/notifier.h"
#include "node/rpc/forwarder.h"
#include "node/rpc/node_frontend.h"
#include "node/timer.h"
#include "rpc_map.h"
#include "rpc_sessions.h"
@ -30,7 +29,6 @@ namespace enclave
ccf::NetworkState network;
ccf::ShareManager share_manager;
std::shared_ptr<ccf::NodeToNode> n2n_channels;
ccf::Timers timers;
std::shared_ptr<RPCMap> rpc_map;
std::shared_ptr<RPCSessions> rpcsessions;
std::unique_ptr<ccf::NodeState> node;
@ -72,7 +70,7 @@ namespace enclave
writer_factory(basic_writer_factory, enclave_config.writer_config),
network(consensus_type_),
share_manager(network),
n2n_channels(std::make_shared<ccf::NodeToNode>(writer_factory)),
n2n_channels(std::make_shared<ccf::NodeToNodeImpl>(writer_factory)),
rpc_map(std::make_shared<RPCMap>()),
rpcsessions(std::make_shared<RPCSessions>(writer_factory, rpc_map)),
cmd_forwarder(std::make_shared<ccf::Forwarder<ccf::NodeToNode>>(
@ -88,12 +86,7 @@ namespace enclave
to_host = writer_factory.create_writer_to_outside();
node = std::make_unique<ccf::NodeState>(
writer_factory,
network,
rpcsessions,
context.notifier,
timers,
share_manager);
writer_factory, network, rpcsessions, context.notifier, share_manager);
rpc_map->register_frontend<ccf::ActorsType::members>(
std::make_unique<ccf::MemberRpcFrontend>(
@ -225,7 +218,7 @@ namespace enclave
std::chrono::milliseconds elapsed_ms(ms_count);
logger::config::tick(elapsed_ms);
node->tick(elapsed_ms);
timers.tick(elapsed_ms);
threading::ThreadMessaging::thread_messaging.tick(elapsed_ms);
// When recovering, no signature should be emitted while the
// public ledger is being read
if (!node->is_reading_public_ledger())

Просмотреть файл

@ -9,6 +9,11 @@
#include <stdint.h>
#include <vector>
namespace kv
{
class Tx;
}
namespace enclave
{
class RpcHandler

Просмотреть файл

@ -2,8 +2,7 @@
// Licensed under the Apache 2.0 License.
#pragma once
#include "consensus/pbft/pbft_types.h"
#include "consensus/raft/raft_types.h"
#include "consensus/aft/raft_types.h"
#include "host/timer.h"
#include "ledger.h"
#include "node/node_types.h"
@ -234,10 +233,8 @@ namespace asynchost
auto msg_type = serialized::read<ccf::NodeMsgType>(data, size);
if (
msg_type == ccf::NodeMsgType::consensus_msg &&
(serialized::peek<raft::RaftMsgType>(data, size) ==
raft::raft_append_entries ||
serialized::peek<pbft::PbftMsgType>(data, size) ==
pbft::pbft_append_entries))
(serialized::peek<aft::RaftMsgType>(data, size) ==
aft::raft_append_entries))
{
// Parse the indices to be sent to the recipient.
auto p = data;

Просмотреть файл

@ -7,11 +7,11 @@
#include "code_id.h"
#include "config.h"
#include "consensus.h"
#include "consensus/aft/raft_tables.h"
#include "consensus/pbft/pbft_new_views.h"
#include "consensus/pbft/pbft_pre_prepares.h"
#include "consensus/pbft/pbft_requests.h"
#include "consensus/pbft/pbft_tables.h"
#include "consensus/raft/raft_tables.h"
#include "entities.h"
#include "governance_history.h"
#include "kv/map.h"
@ -102,7 +102,7 @@ namespace ccf
tables(
(consensus_type == ConsensusType::RAFT) ?
std::make_shared<kv::Store>(
raft::replicate_type_raft, raft::replicated_tables_raft) :
aft::replicate_type_raft, aft::replicated_tables_raft) :
std::make_shared<kv::Store>(
pbft::replicate_type_pbft, pbft::replicated_tables_pbft)),
members(

Просмотреть файл

@ -3,8 +3,8 @@
#pragma once
#include "call_types.h"
#include "consensus/aft/raft_consensus.h"
#include "consensus/ledger_enclave.h"
#include "consensus/raft/raft_consensus.h"
#include "crypto/crypto_box.h"
#include "ds/logger.h"
#include "enclave/rpc_sessions.h"
@ -22,7 +22,6 @@
#include "secret_share.h"
#include "share_manager.h"
#include "snapshotter.h"
#include "timer.h"
#include "tls/25519.h"
#include "tls/client.h"
#include "tls/entropy.h"
@ -63,9 +62,8 @@ namespace std
namespace ccf
{
using RaftConsensusType =
raft::RaftConsensus<consensus::LedgerEnclave, NodeToNode, Snapshotter>;
using RaftType =
raft::Raft<consensus::LedgerEnclave, NodeToNode, Snapshotter>;
aft::Consensus<consensus::LedgerEnclave, NodeToNode, Snapshotter>;
using RaftType = aft::Aft<consensus::LedgerEnclave, NodeToNode, Snapshotter>;
template <typename T>
class StateMachine
@ -154,7 +152,6 @@ namespace ccf
std::shared_ptr<Forwarder<NodeToNode>> cmd_forwarder;
std::shared_ptr<enclave::RPCSessions> rpcsessions;
ccf::Notifier& notifier;
Timers& timers;
std::shared_ptr<kv::TxHistory> history;
std::shared_ptr<kv::AbstractTxEncryptor> encryptor;
@ -162,11 +159,6 @@ namespace ccf
ShareManager& share_manager;
std::shared_ptr<Snapshotter> snapshotter;
//
// join protocol
//
std::shared_ptr<Timer> join_timer;
//
// recovery
//
@ -188,7 +180,6 @@ namespace ccf
NetworkState& network,
std::shared_ptr<enclave::RPCSessions> rpcsessions,
ccf::Notifier& notifier,
Timers& timers,
ShareManager& share_manager) :
sm(State::uninitialized),
self(INVALID_ID),
@ -199,7 +190,6 @@ namespace ccf
network(network),
rpcsessions(rpcsessions),
notifier(notifier),
timers(timers),
share_manager(share_manager)
{
::EverCrypt_AutoConfig2_init();
@ -477,8 +467,6 @@ namespace ccf
sm.advance(State::partOfNetwork);
}
join_timer.reset();
LOG_INFO_FMT(
"Node has now joined the network as node {}: {}",
self,
@ -526,17 +514,35 @@ namespace ccf
initiate_join(config);
join_timer = timers.new_timer(
std::chrono::milliseconds(config.joining.join_timer),
[this, &config]() {
if (sm.check(State::pending))
struct JoinTimeMsg
{
JoinTimeMsg(NodeState& self_, CCFConfig& config_) :
self(self_),
config(config_)
{}
NodeState& self;
CCFConfig& config;
};
auto join_timer_msg = std::make_unique<threading::Tmsg<JoinTimeMsg>>(
[](std::unique_ptr<threading::Tmsg<JoinTimeMsg>> msg) {
if (msg->data.self.sm.check(State::pending))
{
initiate_join(config);
return true;
msg->data.self.initiate_join(msg->data.config);
auto delay =
std::chrono::milliseconds(msg->data.config.joining.join_timer);
threading::ThreadMessaging::thread_messaging.add_task_after(
std::move(msg), delay);
}
return false;
});
join_timer->start();
},
*this,
config);
threading::ThreadMessaging::thread_messaging.add_task_after(
std::move(join_timer_msg),
std::chrono::milliseconds(config.joining.join_timer));
}
//
@ -1543,13 +1549,21 @@ namespace ccf
setup_n2n_channels();
setup_cmd_forwarder();
auto shared_state = std::make_shared<aft::State>(self);
auto raft = std::make_unique<RaftType>(
std::make_unique<raft::Adaptor<kv::Store, kv::DeserialiseSuccess>>(
network.consensus_type,
std::make_unique<aft::Adaptor<kv::Store, kv::DeserialiseSuccess>>(
network.tables),
std::make_unique<consensus::LedgerEnclave>(writer_factory),
n2n_channels,
snapshotter,
self,
rpcsessions,
rpc_map,
node_cert.raw(),
network.pbft_requests_map,
shared_state,
std::make_shared<aft::ExecutorImpl>(
network.pbft_requests_map, shared_state, rpc_map, rpcsessions),
std::chrono::milliseconds(consensus_config.raft_request_timeout),
std::chrono::milliseconds(consensus_config.raft_election_timeout),
public_only);

Просмотреть файл

@ -15,17 +15,106 @@ namespace ccf
{
class NodeToNode
{
public:
virtual ~NodeToNode() = default;
virtual void create_channel(
NodeId peer_id,
const std::string& peer_hostname,
const std::string& peer_service) = 0;
virtual void destroy_channel(NodeId peer_id) = 0;
virtual void close_all_outgoing() = 0;
virtual void destroy_all_channels() = 0;
template <class T>
bool send_authenticated(
const NodeMsgType& msg_type, NodeId to, const T& data)
{
return send_authenticated(
msg_type, to, reinterpret_cast<const uint8_t*>(&data), sizeof(T));
}
template <>
bool send_authenticated(
const NodeMsgType& msg_type, NodeId to, const std::vector<uint8_t>& data)
{
return send_authenticated(msg_type, to, data.data(), data.size());
}
virtual bool send_authenticated(
const ccf::NodeMsgType& msg_type,
NodeId to,
const uint8_t* data,
size_t size) = 0;
template <class T>
const T& recv_authenticated(const uint8_t*& data, size_t& size)
{
auto& t = serialized::overlay<T>(data, size);
if (!recv_authenticated(t.from_node, asCb(t), data, size))
{
throw std::logic_error(fmt::format(
"Invalid authenticated node2node message from node {}", t.from_node));
}
return t;
}
virtual bool recv_authenticated(
NodeId from_node, CBuffer cb, const uint8_t*& data, size_t& size) = 0;
virtual void recv_message(OArray&& oa) = 0;
virtual void initialize(NodeId self_id, const tls::Pem& network_pkey) = 0;
virtual bool send_encrypted(
const NodeMsgType& msg_type,
CBuffer cb,
NodeId to,
const std::vector<uint8_t>& data) = 0;
template <class T>
bool send_encrypted(
const NodeMsgType& msg_type,
NodeId to,
const std::vector<uint8_t>& data,
const T& msg_hdr)
{
return send_encrypted(msg_type, asCb(msg_hdr), to, data);
}
template <class T>
std::pair<T, std::vector<uint8_t>> recv_encrypted(
const uint8_t* data, size_t size)
{
auto t = serialized::read<T>(data, size);
std::vector<uint8_t> plain =
recv_encrypted(t.from_node, asCb(t), data, size);
return std::make_pair(t, plain);
}
virtual std::vector<uint8_t> recv_encrypted(
NodeId from_node, CBuffer cb, const uint8_t* data, size_t size) = 0;
};
class NodeToNodeImpl : public NodeToNode
{
private:
NodeId self;
std::unique_ptr<ChannelManager> channels;
ringbuffer::AbstractWriterFactory& writer_factory;
public:
NodeToNode(ringbuffer::AbstractWriterFactory& writer_factory_) :
NodeToNodeImpl(ringbuffer::AbstractWriterFactory& writer_factory_) :
writer_factory(writer_factory_)
{}
void initialize(NodeId self_id, const tls::Pem& network_pkey)
void initialize(NodeId self_id, const tls::Pem& network_pkey) override
{
self = self_id;
channels =
@ -33,7 +122,9 @@ namespace ccf
}
void create_channel(
NodeId peer_id, const std::string& hostname, const std::string& service)
NodeId peer_id,
const std::string& hostname,
const std::string& service) override
{
if (peer_id == self)
{
@ -43,7 +134,7 @@ namespace ccf
channels->create_channel(peer_id, hostname, service);
}
void destroy_channel(NodeId peer_id)
void destroy_channel(NodeId peer_id) override
{
if (peer_id == self)
{
@ -53,56 +144,41 @@ namespace ccf
channels->destroy_channel(peer_id);
}
void close_all_outgoing()
void close_all_outgoing() override
{
channels->close_all_outgoing();
}
void destroy_all_channels()
void destroy_all_channels() override
{
channels->destroy_all_channels();
}
template <class T>
bool send_authenticated(
const NodeMsgType& msg_type, NodeId to, const T& data)
const ccf::NodeMsgType& msg_type,
NodeId to,
const uint8_t* data,
size_t size) override
{
auto& n2n_channel = channels->get(to);
return n2n_channel.send(msg_type, asCb(data));
return n2n_channel.send(msg_type, {data, size});
}
template <>
bool send_authenticated(
const NodeMsgType& msg_type, NodeId to, const std::vector<uint8_t>& data)
bool recv_authenticated(
NodeId from_node, CBuffer cb, const uint8_t*& data, size_t& size) override
{
auto& n2n_channel = channels->get(to);
return n2n_channel.send(msg_type, data);
auto& n2n_channel = channels->get(from_node);
return n2n_channel.recv_authenticated(cb, data, size);
}
template <class T>
bool send_encrypted(
const NodeMsgType& msg_type,
CBuffer cb,
NodeId to,
const std::vector<uint8_t>& data,
const T& msg_hdr)
const std::vector<uint8_t>& data) override
{
auto& n2n_channel = channels->get(to);
return n2n_channel.send(msg_type, asCb(msg_hdr), data);
}
template <class T>
const T& recv_authenticated(const uint8_t*& data, size_t& size)
{
auto& t = serialized::overlay<T>(data, size);
auto& n2n_channel = channels->get(t.from_node);
if (!n2n_channel.recv_authenticated(asCb(t), data, size))
{
throw std::logic_error(fmt::format(
"Invalid authenticated node2node message from node {}", t.from_node));
}
return t;
return n2n_channel.send(msg_type, cb, data);
}
template <class T>
@ -126,21 +202,19 @@ namespace ccf
return t;
}
template <class T>
std::pair<T, std::vector<uint8_t>> recv_encrypted(
const uint8_t* data, size_t size)
std::vector<uint8_t> recv_encrypted(
NodeId from_node, CBuffer cb, const uint8_t* data, size_t size) override
{
auto t = serialized::read<T>(data, size);
auto& n2n_channel = channels->get(t.from_node);
auto& n2n_channel = channels->get(from_node);
auto plain = n2n_channel.recv_encrypted(asCb(t), data, size);
auto plain = n2n_channel.recv_encrypted(cb, data, size);
if (!plain.has_value())
{
throw std::logic_error(fmt::format(
"Invalid encrypted node2node message from node {}", t.from_node));
"Invalid encrypted node2node message from node {}", from_node));
}
return std::make_pair(t, plain.value());
return plain.value();
}
template <class T>
@ -182,7 +256,7 @@ namespace ccf
n2n_channel.load_peer_signed_public(true, data, size);
}
void recv_message(OArray&& oa)
void recv_message(OArray&& oa) override
{
const uint8_t* data = oa.data();
size_t size = oa.size();

Просмотреть файл

@ -1,137 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "ds/logger.h"
#include "ds/spin_lock.h"
#include <chrono>
#include <functional>
#include <set>
namespace ccf
{
using TimerCallback = std::function<bool()>;
class TickingTimer
{
public:
virtual ~TickingTimer() {}
virtual void tick(std::chrono::milliseconds elapsed_) = 0;
};
class Timer
{
public:
virtual ~Timer() {}
virtual void start() = 0;
};
/**
* A timer class to trigger actions periodically.
*
* A shared_ptr to a new Timer is returned by Timers::new_timer(period, cb).
* The timer is only active when it is started (start()) and marked as expired
* when the callback is triggered. If the callback returns true, the timer
* continues ticking. Otherwise, the timer expires and will tick again only
* when it is explicitly re-started (start()).
*
* Note that if the timer's period is smaller than the period at which it is
* ticked, the callback is only called once per period.
*
**/
class TimerImpl : public TickingTimer, public Timer
{
private:
enum TimerState
{
STOPPED = 0,
STARTED,
EXPIRED
};
SpinLock lock;
std::chrono::milliseconds period;
std::chrono::milliseconds elapsed;
TimerCallback cb;
TimerState state;
public:
TimerImpl(std::chrono::milliseconds period_, TimerCallback cb_) :
period(period_),
elapsed(0),
cb(cb_),
state(TimerState::STOPPED)
{}
void start()
{
std::lock_guard<SpinLock> guard(lock);
state = TimerState::STARTED;
}
void tick(std::chrono::milliseconds elapsed_)
{
std::lock_guard<SpinLock> guard(lock);
if (state != TimerState::STARTED)
return;
elapsed += elapsed_;
if (elapsed >= period)
{
state = TimerState::EXPIRED;
if (cb())
state = TimerState::STARTED;
using namespace std::chrono_literals;
elapsed = 0ms;
}
}
};
class Timers
{
private:
SpinLock lock;
std::set<
std::weak_ptr<TickingTimer>,
std::owner_less<std::weak_ptr<TickingTimer>>>
timers;
public:
Timers() {}
void tick(std::chrono::milliseconds elapsed)
{
std::lock_guard<SpinLock> guard(lock);
auto it = timers.begin();
while (it != timers.end())
{
auto t = it->lock();
if (t)
{
t->tick(elapsed);
it++;
}
else
{
it = timers.erase(it);
}
}
}
std::shared_ptr<Timer> new_timer(
std::chrono::milliseconds period, TimerCallback cb_)
{
std::lock_guard<SpinLock> guard(lock);
auto timer = std::make_shared<TimerImpl>(period, cb_);
timers.emplace(timer);
return std::static_pointer_cast<Timer>(timer);
}
};
}