Add support for compacting the KV from ePBFT (#405)

Add support for compacting the KV from ePBFT
This commit is contained in:
Alex 2019-10-04 09:35:15 +01:00 коммит произвёл GitHub
Родитель f9ccfe64cc
Коммит 079e830948
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 73 добавлений и 11 удалений

Просмотреть файл

@ -4,6 +4,7 @@
#include "consensus/ledgerenclave.h"
#include "consensus/pbft/pbftconfig.h"
#include "consensus/pbft/pbfttypes.h"
#include "ds/logger.h"
#include "enclave/rpcmap.h"
#include "enclave/rpcsessions.h"
@ -14,6 +15,7 @@
#include "libbyz/libbyz.h"
#include "libbyz/network.h"
#include "libbyz/receive_message_base.h"
#include "node/consensustypes.h"
#include "node/nodetypes.h"
#include "node/rpc/jsonrpc.h"
@ -106,9 +108,17 @@ namespace pbft
enclave::RPCSessions& rpcsessions;
std::unique_ptr<consensus::LedgerEnclave> ledger;
SeqNo global_commit_seqno;
std::unique_ptr<pbft::Store> store;
struct register_global_commit_info
{
pbft::Store* store;
SeqNo* global_commit_seqno;
} register_global_commit_ctx;
public:
Pbft(
std::unique_ptr<pbft::Store> store_,
std::shared_ptr<ChannelProxy> channels_,
NodeId id,
std::unique_ptr<consensus::LedgerEnclave> ledger_,
@ -118,7 +128,8 @@ namespace pbft
channels(channels_),
rpcsessions(rpcsessions_),
ledger(std::move(ledger_)),
global_commit_seqno(0)
global_commit_seqno(1),
store(std::move(store_))
{
// configure replica
GeneralInfo general_info;
@ -203,8 +214,24 @@ namespace pbft
ledger->put_entry(entry);
};
auto global_commit_cb = [](kv::Version version, void* ctx) {
auto p = static_cast<register_global_commit_info*>(ctx);
if (version == kv::NoVersion || version < *p->global_commit_seqno)
{
return;
}
*p->global_commit_seqno = version;
p->store->compact(version);
};
message_receiver_base->register_append_ledger_entry_cb(
append_ledger_entry_cb, ledger.get());
register_global_commit_ctx.store = store.get();
register_global_commit_ctx.global_commit_seqno = &global_commit_seqno;
message_receiver_base->register_global_commit(
global_commit_cb, &register_global_commit_ctx);
}
bool on_request(const kv::TxHistory::RequestCallbackArgs& args) override
@ -303,13 +330,6 @@ namespace pbft
const std::vector<std::tuple<SeqNo, std::vector<uint8_t>, bool>>& entries)
override
{
for (auto&& [seqno, data, globally_committable] : entries)
{
if (seqno != global_commit_seqno + 1)
return false;
global_commit_seqno = seqno;
}
return true;
}

Просмотреть файл

@ -138,6 +138,7 @@ namespace pbft
std::begin(rep.merkle_root.h),
std::end(rep.merkle_root.h),
std::begin(info.merkle_root));
info.ctx = rep.version;
outb->size = rep.result.size();
auto outb_ptr = (uint8_t*)outb->contents;

Просмотреть файл

@ -24,4 +24,36 @@ namespace pbft
NodeId from_node;
};
#pragma pack(pop)
class Store
{
public:
virtual ~Store() {}
virtual void compact(Index v) = 0;
virtual void rollback(Index v) = 0;
};
template <typename T>
class Adaptor : public pbft::Store
{
private:
std::weak_ptr<T> x;
public:
Adaptor(std::shared_ptr<T> x) : x(x) {}
void compact(Index v)
{
auto p = x.lock();
if (p)
p->compact(v);
}
void rollback(Index v)
{
auto p = x.lock();
if (p)
p->rollback(v);
}
};
}

Просмотреть файл

@ -23,6 +23,7 @@ namespace enclave
{
std::vector<uint8_t> result;
crypto::Sha256Hash merkle_root;
kv::Version version;
};
virtual ProcessPbftResp process_pbft(

Просмотреть файл

@ -1001,7 +1001,9 @@ namespace kv
auto h = store->get_history();
if (h != nullptr)
{
h->add_result(req_id, version);
// This tx does not have a write set, so this is a read only tx
// because of this we are returning NoVersion
h->add_result(req_id, NoVersion);
}
return CommitSuccess::OK;
}

Просмотреть файл

@ -1286,6 +1286,7 @@ namespace ccf
accept_user_connections();
consensus = std::make_shared<PbftConsensusType>(
std::make_unique<pbft::Adaptor<Store>>(network.tables),
n2n_channels,
self,
std::make_unique<consensus::LedgerEnclave>(writer_factory),

Просмотреть файл

@ -588,6 +588,7 @@ namespace ccf
// TODO(#PBFT): Refactor this with process_forwarded().
Store::Tx tx;
crypto::Sha256Hash merkle_root;
kv::Version version = kv::NoVersion;
auto pack = detect_pack(input);
if (!pack.has_value())
@ -614,9 +615,13 @@ namespace ccf
auto& unsigned_rpc = *rpc_;
bool has_updated_merkle_root = false;
auto cb = [&merkle_root, &has_updated_merkle_root](
auto cb = [&merkle_root, &version, &has_updated_merkle_root](
kv::TxHistory::ResultCallbackArgs args) -> bool {
merkle_root = args.merkle_root;
if (args.version != kv::NoVersion)
{
version = args.version;
}
has_updated_merkle_root = true;
return true;
};
@ -636,7 +641,7 @@ namespace ccf
// if (history)
// history->add_response(reqid, rv);
return {jsonrpc::pack(rep.value(), pack.value()), merkle_root};
return {jsonrpc::pack(rep.value(), pack.value()), merkle_root, version};
}
/** Process a serialised input forwarded from another node