Add initial integration for supporting view changes with CCF & ePBFT.
Run the election test when running CCF with ePBFT
This commit is contained in:
Alex 2019-11-25 16:52:04 +00:00 коммит произвёл GitHub
Родитель 38874e7d1f
Коммит fc1d4fbc44
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 456 добавлений и 95 удалений

Просмотреть файл

@ -335,10 +335,6 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_batched.py
ADDITIONAL_ARGS
--app-script ${CMAKE_SOURCE_DIR}/src/apps/batched/batched.lua)
if (BUILD_SMALLBANK)
include(${CMAKE_CURRENT_SOURCE_DIR}/samples/apps/smallbank/smallbank.cmake)
endif()
endif()
add_e2e_test(
@ -360,13 +356,6 @@ if(BUILD_TESTS)
--scenario ${CMAKE_SOURCE_DIR}/tests/simple_logging_scenario.json
)
add_e2e_test(
NAME election_tests
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/election.py
ADDITIONAL_ARGS
--election-timeout 2000
)
if (NOT SAN)
add_e2e_test(
NAME connections
@ -406,12 +395,22 @@ if(BUILD_TESTS)
else()
message(STATUS "Using PBFT as consensus")
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/pbft.cmake)
endif()
if (NOT HTTP)
if (BUILD_SMALLBANK)
include(${CMAKE_CURRENT_SOURCE_DIR}/samples/apps/smallbank/smallbank.cmake)
endif()
add_e2e_test(
NAME election_tests
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/election.py
ADDITIONAL_ARGS
--election-timeout 2000
)
endif()
if (EXTENSIVE_TESTS)
set_tests_properties(recovery_tests PROPERTIES TIMEOUT 2000)
endif()

Просмотреть файл

@ -15,18 +15,28 @@ add_enclave_lib(smallbankenc
if(BUILD_TESTS)
## Small Bank end to end and performance test
if (PBFT)
set(SMALL_BANK_SIGNED_VERIFICATION_FILE ${CMAKE_CURRENT_LIST_DIR}/tests/verify_small_bank_20k.json)
set(SMALL_BANK_SIGNED_ITERATIONS 20000)
else ()
set(SMALL_BANK_SIGNED_VERIFICATION_FILE ${CMAKE_CURRENT_LIST_DIR}/tests/verify_small_bank.json)
set(SMALL_BANK_SIGNED_ITERATIONS 200000)
endif ()
add_perf_test(
NAME small_bank_client_test
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py
CLIENT_BIN ./small_bank_client
VERIFICATION_FILE ${CMAKE_CURRENT_LIST_DIR}/tests/verify_small_bank.json
VERIFICATION_FILE ${SMALL_BANK_SIGNED_VERIFICATION_FILE}
ITERATIONS ${SMALL_BANK_SIGNED_ITERATIONS}
ADDITIONAL_ARGS
--label Small_Bank_ClientCpp
--max-writes-ahead 1000
--metrics-file small_bank_metrics.json
)
if (${SERVICE_IDENTITY_CURVE_CHOICE} STREQUAL "secp256k1_bitcoin")
if (PBFT)
set(SMALL_BANK_SIGNED_VERIFICATION_FILE ${CMAKE_CURRENT_LIST_DIR}/tests/verify_small_bank_20k.json)
set(SMALL_BANK_SIGNED_ITERATIONS 20000)
elseif (${SERVICE_IDENTITY_CURVE_CHOICE} STREQUAL "secp256k1_bitcoin")
set(SMALL_BANK_SIGNED_VERIFICATION_FILE ${CMAKE_CURRENT_LIST_DIR}/tests/verify_small_bank_50k.json)
set(SMALL_BANK_SIGNED_ITERATIONS 50000)
else ()

Просмотреть файл

@ -0,0 +1,91 @@
{
"params": {
"seed": 42,
"transactions": 20000,
"accounts": 10
},
"initial": [
{
"account": 0,
"balance": 2000
},
{
"account": 1,
"balance": 2000
},
{
"account": 2,
"balance": 2000
},
{
"account": 3,
"balance": 2000
},
{
"account": 4,
"balance": 2000
},
{
"account": 5,
"balance": 2000
},
{
"account": 6,
"balance": 2000
},
{
"account": 7,
"balance": 2000
},
{
"account": 8,
"balance": 2000
},
{
"account": 9,
"balance": 2000
}
],
"final": [
{
"account": 0,
"balance": 120018
},
{
"account": 1,
"balance": 0
},
{
"account": 2,
"balance": 187
},
{
"account": 3,
"balance": 42
},
{
"account": 4,
"balance": -35
},
{
"account": 5,
"balance": 36
},
{
"account": 6,
"balance": 1189
},
{
"account": 7,
"balance": 6
},
{
"account": 8,
"balance": 0
},
{
"account": 9,
"balance": 39
}
]
}

Просмотреть файл

@ -44,7 +44,7 @@ class Certificate
// true if successful and false otherwise.
public:
Certificate(int complete = 0);
Certificate(std::function<int()> complete = nullptr);
// Requires: "complete" >= f+1 or 0
// Effects: Creates an empty certificate. The certificate is
// complete when it contains at least "complete" matching messages
@ -96,6 +96,7 @@ public:
// Effects: Returns the number of messages with the correct value
// in this.
int num_complete() const;
bool is_complete() const;
void make_complete();
// Effects: If cvalue() is not null, makes the certificate
@ -173,7 +174,9 @@ private:
Message_val* c; // correct certificate value or 0 if unknown.
int complete; // certificate is complete if "num_correct() >= complete"
int comp; // the value of complete as sent into the ctor
std::function<int()>
comp; // the value of complete as sent into the ctor through a function so
// if f() changes it can be recalculated
T* mym; // my message in this or null if I have no message in this
Time t_sent; // time at which mym was last sent
@ -215,6 +218,12 @@ inline int Certificate<T>::num_correct() const
return (c) ? c->count : 0;
}
template <class T>
inline int Certificate<T>::num_complete() const
{
return complete;
}
template <class T>
inline bool Certificate<T>::is_complete() const
{
@ -275,13 +284,22 @@ inline bool Certificate<T>::Val_iter::get(T*& m, int& count)
}
template <class T>
Certificate<T>::Certificate(int comp_) : f(node->f()), comp(comp_)
Certificate<T>::Certificate(std::function<int()> comp_) :
f(node->f()),
comp(comp_)
{
max_size = f + 1;
vals = new Message_val[max_size];
cur_size = 0;
correct = f + 1;
complete = (comp == 0) ? node->num_correct_replicas() : comp;
if (comp_ != nullptr)
{
complete = comp_();
}
else
{
complete = node->num_correct_replicas();
}
c = 0;
mym = 0;
t_sent = 0;
@ -302,7 +320,14 @@ void Certificate<T>::reset_f()
vals = new Message_val[max_size];
cur_size = 0;
correct = f + 1;
complete = (comp == 0) ? node->num_correct_replicas() : comp;
if (comp != nullptr)
{
complete = comp();
}
else
{
complete = node->num_correct_replicas();
}
}
template <class T>

Просмотреть файл

@ -26,8 +26,8 @@
Client::Client(const NodeInfo& node_info, INetwork* network) :
Node(node_info),
t_reps(2 * f() + 1),
c_reps(f() + 1)
t_reps([this]() { return 2 * f() + 1; }),
c_reps([this]() { return f() + 1; })
{
// Fail if node is a replica.
LOG_INFO << "my id " << id() << std::endl;

Просмотреть файл

@ -77,6 +77,7 @@ private:
std::unique_ptr<Request> req);
T caller_rid;
size_t f;
ReplyCallback cb;
C* owner;
@ -99,16 +100,28 @@ private:
int rtimeout; // Timeout period in msecs
// Maximum retransmission timeout in msecs
static const int Max_rtimeout = 200;
static const int Max_rtimeout = 3000;
// Minimum retransmission timeout after retransmission
// in msecs
static const int Min_rtimeout = 100;
static const int Min_rtimeout = 2000;
void increase_retransmission_timeout();
void decrease_retransmission_timeout();
Cycle_counter latency; // Used to measure latency.
// Multiplier used to obtain retransmission timeout from avg_latency
static const int Rtimeout_mult = 4;
static void rtimer_handler(void* owner);
std::unique_ptr<ITimer> rtimer; // Retransmission timer
bool primary_only_execution; // true iff f == 0
void retransmit();
// Effects: Retransmits any outstanding request at the head of
// the queue.
};
template <class T, class C>
@ -116,7 +129,11 @@ ClientProxy<T, C>::ClientProxy(IMessageReceiveBase& my_replica) :
my_replica(my_replica),
out_reqs(Max_outstanding),
head(nullptr),
tail(nullptr)
tail(nullptr),
n_retrans(0),
rtimeout(150),
rtimer(new ITimer(rtimeout, rtimer_handler, this)),
primary_only_execution(my_replica.f() == 0)
{}
template <class T, class C>
@ -127,10 +144,11 @@ ClientProxy<T, C>::RequestContext::RequestContext(
C* owner,
std::unique_ptr<Request> req) :
caller_rid(caller_rid),
f(replica.f()),
cb(cb),
owner(owner),
t_reps(2 * replica.f() + 1),
c_reps(replica.f() + 1),
t_reps([this]() { return 2 * f + 1; }),
c_reps([this]() { return f + 1; }),
req(std::move(req)),
next(nullptr),
prev(nullptr)
@ -179,6 +197,8 @@ bool ClientProxy<T, C>::send_request(
{
head = tail = ctx.get();
ctx->prev = ctx->next = nullptr;
n_retrans = 0;
rtimer->start();
}
else
{
@ -269,6 +289,8 @@ void ClientProxy<T, C>::recv_reply(Reply* reply)
return;
}
rtimer->stop();
int reply_len;
char* reply_buffer = reply->reply(reply_len);
@ -301,4 +323,99 @@ void ClientProxy<T, C>::recv_reply(Reply* reply)
out_reqs.erase(it);
delete reply;
decrease_retransmission_timeout();
n_retrans = 0;
if (head != nullptr)
{
rtimer->start();
}
}
template <class T, class C>
void ClientProxy<T, C>::rtimer_handler(void* owner)
{
((ClientProxy*)owner)->retransmit();
}
template <class T, class C>
void ClientProxy<T, C>::increase_retransmission_timeout()
{
rtimeout = rtimeout * 2;
if (rtimeout > Max_rtimeout)
{
rtimeout = Max_rtimeout;
}
rtimer->adjust(rtimeout);
}
template <class T, class C>
void ClientProxy<T, C>::decrease_retransmission_timeout()
{
rtimeout = rtimeout - 100;
if (rtimeout < Min_rtimeout)
{
rtimeout = Min_rtimeout;
}
rtimer->adjust(rtimeout);
}
template <class T, class C>
void ClientProxy<T, C>::retransmit()
{
// Retransmit any outstanding request.
static const int thresh = 1;
if (head != nullptr)
{
RequestContext* ctx = head;
Request* out_req = ctx->req.get();
LOG_DEBUG << "Retransmitting req id: " << out_req->request_id()
<< std::endl;
INCR_OP(req_retrans);
#ifndef ENFORCE_EXACTLY_ONCE
ctx->t_reps.clear();
ctx->c_reps.clear();
#endif
n_retrans++;
bool ro = out_req->is_read_only();
bool change = (ro || out_req->replier() >= 0) && n_retrans > thresh;
if (change)
{
// Compute new authenticator for request
out_req->re_authenticate(change);
if (ro && change)
{
ctx->t_reps.clear();
}
}
if (
out_req->is_read_only() || n_retrans > thresh ||
out_req->size() > Request::big_req_thresh)
{
// read-only requests, requests retransmitted more than
// thresh times, and big requests are multicast to all
// replicas.
auto req_clone = out_req->clone();
execute_request(req_clone);
}
else
{
// read-write requests are sent to the primary only.
my_replica.send(out_req, my_replica.primary());
}
}
if (n_retrans > thresh)
{
increase_retransmission_timeout();
}
rtimer->restart();
}

Просмотреть файл

@ -59,7 +59,8 @@ NV_info::NV_info() :
nv(0),
vc_target(0),
vc_cur(0),
vcs(node->num_of_replicas())
vcs(
64) // TODO: this is not great - https://github.com/microsoft/CCF/issues/385
{
chosen_ckpt = -1;
max = -1;
@ -362,7 +363,7 @@ void NV_info::summarize(View_change* vc)
// Search view-changes in new-view for proofs
Digest d;
for (int i = 0; i < vcs.size(); i++)
for (int i = 0; i < node->num_of_replicas(); i++)
{
if (nv->view_change(i) && vcs[i].vc->ckpt(vcn, d) && d == vclc)
{
@ -411,7 +412,7 @@ void NV_info::choose_ckpt(int index)
}
// Summarize requests for all view-change messages in new-view.
for (int i = 0; i < vcs.size(); i++)
for (int i = 0; i < node->num_of_replicas(); i++)
{
if (nv->view_change(i))
{
@ -662,7 +663,7 @@ void NV_info::summarize_reqs(View_change* vc)
Req_sum& cur = reqsi.emplace_back(rv, rd, n_le + 1, vc->id(), 0, 0);
// Search view-changes for proofs
for (int j = 0; j < vcs.size(); j++)
for (int j = 0; j < node->num_of_replicas(); j++)
{
if (vcs[j].req_sum)
{
@ -734,7 +735,7 @@ bool NV_info::check_new_view()
// Search view-changes for proofs
Digest dd;
for (int i = 0; i < vcs.size(); i++)
for (int i = 0; i < node->num_of_replicas(); i++)
{
if (i != cid && vcs[i].vc)
{
@ -761,7 +762,7 @@ bool NV_info::check_new_view()
// propose any pre-prepared or prepared request with sequence number
// greater than or equal to nv->max().
int n_lt = 0;
for (int i = 0; i < vcs.size(); i++)
for (int i = 0; i < node->num_of_replicas(); i++)
{
auto vc = vcs[i].vc.get();
if (vc == 0)
@ -813,7 +814,7 @@ bool NV_info::check_new_view()
View v = vc->req(i, d);
Req_sum& cur = reqs[i - base].emplace_back(v, d, 0, vc->id(), 0, 0);
// Search view-changes for proofs
for (int j = 0; j < vcs.size(); j++)
for (int j = 0; j < node->num_of_replicas(); j++)
{
if (vcs[j].vc && i > vcs[j].vc->last_stable())
{
@ -1041,7 +1042,7 @@ void NV_info::dump_state(std::ostream& os)
<< " nv_sent: " << nv_sent << std::endl;
os << " View changes vcs: " << std::endl;
for (int i = 0; i < vcs.size(); i++)
for (int i = 0; i < node->num_of_replicas(); i++)
{
os << " i: " << i << " vc: " << (void*)vcs[i].vc.get()
<< " ack_count: " << vcs[i].ack_count << " ack_reps: " << vcs[i].ack_reps

Просмотреть файл

@ -38,6 +38,11 @@ public:
// Effects: Discards any new-view, view-change and view-change ack
// messages stored in this.
NV_info(const NV_info&) = delete;
NV_info& operator=(const NV_info&) = delete;
NV_info(NV_info&&) = default;
NV_info& operator=(NV_info&&) = default;
void clear();
// Effects: Makes this empty -- deletes all contained messages
// and sets view() == 0.

Просмотреть файл

@ -8,7 +8,7 @@
#include "Node.h"
Prepared_cert::Prepared_cert() :
prepare_cert(node->num_correct_replicas() - 1),
prepare_cert([]() { return node->num_correct_replicas() - 1; }),
primary(false)
{}

Просмотреть файл

@ -235,8 +235,18 @@ inline int Prepared_cert::num_correct()
inline bool Prepared_cert::is_complete()
{
return pp_info.is_complete() && prepare_cert.is_complete() &&
pp_info.pre_prepare()->match(prepare_cert.cvalue());
if (pp_info.is_complete())
{
if (prepare_cert.num_complete() == 0)
{
return true;
}
return prepare_cert.is_complete() &&
pp_info.pre_prepare()->match(prepare_cert.cvalue());
}
return false;
}
inline bool Prepared_cert::is_pp_complete()

Просмотреть файл

@ -87,7 +87,10 @@ Replica::Replica(
rep_cb(nullptr),
global_commit_cb(nullptr),
state(this, mem, nbytes),
vi(node_id, 0)
vi(
node_id,
0,
64) // make this dynamic - https://github.com/microsoft/CCF/issues/385
{
// Fail if node is not a replica.
if (!is_replica(id()))
@ -212,18 +215,14 @@ void Replica::receive_message(const uint8_t* data, uint32_t size)
{
if (size > Max_message_size)
{
LOG_DEBUG
<< "Received message will not be processed, size exceeds message limits: "
<< size << std::endl;
return;
LOG_FAIL << "Received message size exceeds message: " << size << std::endl;
}
Message* m = new Message(Max_message_size);
uint64_t alloc_size = std::max(size, (uint32_t)Max_message_size);
Message* m = new Message(alloc_size);
// TODO: remove this memcpy
memcpy(m->contents(), data, size);
if (pre_verify(m))
{
PBFT_ASSERT(
Max_message_size >= size, "size must be less than Max_message_size");
recv_process_one_msg(m);
}
else
@ -291,7 +290,8 @@ bool Replica::apply_ledger_data(const std::vector<uint8_t>& data)
if (global_commit_cb != nullptr)
{
global_commit_cb(executable_pp->get_ctx(), global_commit_ctx);
global_commit_cb(
executable_pp->get_ctx(), executable_pp->view(), global_commit_ctx);
}
last_executed++;
@ -516,6 +516,14 @@ void Replica::handle(Request* m)
{
bool ro = m->is_read_only();
Digest rd = m->digest();
LOG_TRACE << "Received request with rid: " << m->request_id()
<< " id:" << id() << " primary:" << primary()
<< " with cid: " << m->client_id()
<< " current seqno: " << next_pp_seqno
<< " last executed: " << last_executed << " digest: " << rd.hash()
<< std::endl;
if (has_complete_new_view())
{
LOG_TRACE << "Received request with rid: " << m->request_id()
@ -729,6 +737,11 @@ void Replica::handle(Pre_prepare* m)
b.contents = m->choices(b.size);
LOG_TRACE << "Received pre prepare with seqno: " << ms
<< ", in_wv:" << (in_wv(m) ? "true" : "false")
<< ", low_bound:" << low_bound << ", has complete_new_view:"
<< (has_complete_new_view() ? "true" : "false") << std::endl;
if (in_wv(m) && ms > low_bound && has_complete_new_view())
{
LOG_TRACE << "processing pre prepare with seqno: " << ms << std::endl;
@ -1340,7 +1353,7 @@ void Replica::handle(Status* m)
void Replica::handle(View_change* m)
{
LOG_INFO << "Received view change for " << m->view() << " from " << m->id()
<< "\n";
<< ", v:" << v << std::endl;
if (m->id() == primary() && m->view() > v)
{
@ -1385,7 +1398,7 @@ void Replica::handle(View_change* m)
void Replica::handle(New_view* m)
{
LOG_INFO << "Received new view for " << m->view() << " from " << m->id()
<< "\n";
<< std::endl;
vi.add(m);
}
@ -1580,7 +1593,7 @@ void Replica::process_new_view(Seqno min, Digest d, Seqno max, Seqno ms)
if (primary(v) == id())
{
New_view* nv = vi.my_new_view();
LOG_INFO << "Sending new view for " << nv->view() << "\n";
LOG_INFO << "Sending new view for " << nv->view() << std::endl;
send(nv, All_replicas);
}
@ -1675,7 +1688,7 @@ void Replica::process_new_view(Seqno min, Digest d, Seqno max, Seqno ms)
{
start_vtimer_if_request_waiting();
}
LOG_INFO << "Done with process new view " << v << "\n";
LOG_INFO << "Done with process new view " << v << std::endl;
}
Pre_prepare* Replica::prepared_pre_prepare(Seqno n)
@ -1837,7 +1850,7 @@ void Replica::execute_prepared(bool committed)
if (global_commit_cb != nullptr)
{
LOG_TRACE << "Global_commit:" << pp->get_ctx() << std::endl;
global_commit_cb(pp->get_ctx(), global_commit_ctx);
global_commit_cb(pp->get_ctx(), pp->view(), global_commit_ctx);
}
}
}

Просмотреть файл

@ -42,10 +42,6 @@ View_change::View_change(View v, Seqno ls, int id) :
void View_change::add_checkpoint(Seqno n, Digest& d)
{
PBFT_ASSERT(n % checkpoint_interval == 0, "Invalid argument");
PBFT_ASSERT(
(last_stable() <= n) && (n <= last_stable() + max_out), "Invalid argument");
int index = (n - last_stable()) / checkpoint_interval;
rep().ckpts[index] = d;

Просмотреть файл

@ -20,7 +20,7 @@ View_info::VCA_info::VCA_info() : v(0), vacks(node->num_of_replicas(), nullptr)
void View_info::VCA_info::clear()
{
for (int i = 0; i < node->num_of_replicas(); i++)
for (int i = 0; i < vacks.size(); i++)
{
delete vacks[i];
vacks[i] = 0;
@ -28,17 +28,18 @@ void View_info::VCA_info::clear()
v = 0;
}
View_info::View_info(int ident, View vi) :
View_info::View_info(int ident, View vi, uint64_t num_replicas) :
v(vi),
id(ident),
last_stable(0),
oplog(max_out),
last_views(node->num_of_replicas(), (View)0),
last_vcs(node->num_of_replicas()),
my_vacks(node->num_of_replicas(), nullptr),
last_nvs(node->num_of_replicas())
last_vcs(num_replicas),
my_vacks(num_replicas, nullptr),
last_nvs(num_replicas)
{
vacks.resize(node->num_of_replicas());
last_views.resize(num_replicas);
std::fill(std::begin(last_views), std::end(last_views), 0);
vacks.resize(1);
}
View_info::~View_info()
@ -201,38 +202,63 @@ bool View_info::prepare(Seqno n, Digest& d)
return false;
}
void View_info::discard_old()
void View_info::discard_old_and_resize_if_needed()
{
// Discard view-changes, view-change acks, and new views with view
// less than "v"
for (int i = 0; i < node->num_of_replicas(); i++)
for (int i = 0; i < last_vcs.size(); i++)
{
if (last_vcs[i] && last_vcs[i]->view() < v)
{
last_vcs[i] = 0;
}
}
if (last_vcs.size() != node->num_of_replicas())
{
last_vcs.resize(node->num_of_replicas());
}
for (int i = 0; i < my_vacks.size(); i++)
{
delete my_vacks[i];
my_vacks[i] = 0;
}
if (my_vacks.size() != node->num_of_replicas())
{
my_vacks.resize(node->num_of_replicas());
}
for (int i = 0; i < vacks.size(); i++)
{
if (vacks[i].v < v)
{
vacks[i].clear();
vacks[i].v = v;
}
}
if (vacks.size() != node->num_of_replicas())
{
vacks.resize(node->num_of_replicas());
}
for (int i = 0; i < last_nvs.size(); i++)
{
if (last_nvs[i].view() < v)
{
last_nvs[i].clear();
}
}
if (last_nvs.size() != node->num_of_replicas())
{
last_nvs.resize(node->num_of_replicas());
}
}
void View_info::view_change(View vi, Seqno last_executed, State* state)
{
v = vi;
discard_old();
discard_old_and_resize_if_needed();
// Create my view-change message for "v".
auto vc = std::make_unique<View_change>(v, last_stable, id);
@ -318,7 +344,7 @@ void View_info::view_change(View vi, Seqno last_executed, State* state)
n.add(nv, this);
// Move any view-change messages for view "v" to "n".
for (int i = 0; i < node->num_of_replicas(); i++)
for (int i = 0; i < last_vcs.size(); i++)
{
auto vc = last_vcs[i].get();
if (vc && vc->view() == v && n.can_add(vc))
@ -514,7 +540,7 @@ void View_info::set_received_vcs(Status* m)
}
else
{
for (int i = 0; i < node->num_of_replicas(); i++)
for (int i = 0; i < last_vcs.size(); i++)
{
if (last_vcs[i] != 0 && last_vcs[i]->view() == v)
{
@ -590,15 +616,29 @@ void View_info::clear()
{
oplog.clear(last_stable + 1);
for (int i = 0; i < node->num_of_replicas(); i++)
for (int i = 0; i < last_vcs.size(); i++)
{
last_vcs[i] = 0;
last_views[i] = v;
vacks[i].clear();
}
for (int i = 0; i < last_views.size(); i++)
{
last_views[i] = v;
}
for (int i = 0; i < vacks.size(); i++)
{
vacks[i].clear();
}
for (int i = 0; i < my_vacks.size(); i++)
{
delete my_vacks[i];
my_vacks[i] = 0;
}
for (int i = 0; i < last_nvs.size(); i++)
{
last_nvs[i].clear();
}
vc_sent = zero_time();
@ -734,7 +774,13 @@ bool View_info::enforce_bound(Seqno b, Seqno ks, bool corrupt)
void View_info::mark_stale()
{
for (int i = 0; i < node->num_of_replicas(); i++)
PBFT_ASSERT(last_vcs.size() == last_views.size(), "sizes do not match");
PBFT_ASSERT(last_vcs.size() == my_vacks.size(), "sizes do not match");
PBFT_ASSERT(last_vcs.size() == last_nvs.size(), "sizes do not match");
PBFT_ASSERT(last_vcs.size() == last_vcs.size(), "sizes do not match");
PBFT_ASSERT(last_vcs.size() == vacks.size(), "sizes do not match");
for (int i = 0; i < last_vcs.size(); i++)
{
if (i != id)
{

Просмотреть файл

@ -27,7 +27,7 @@ class View_info
// Holds information for the view-change protocol.
//
public:
View_info(int id, View v = 0);
View_info(int id, View v, uint64_t num_replicas);
// Effects: Create a view-info object for replica "id" with initial
// view "v".
@ -280,8 +280,9 @@ private:
// Auxiliary methods:
//
void discard_old();
void discard_old_and_resize_if_needed();
// Effects: Discards messages with views lower than "v"
// then checks the number of replicas and resizes arrays if required
View k_max(int k) const;
// Effects: Returns the value "v" in "last_views" such that there

Просмотреть файл

@ -16,7 +16,8 @@ public:
virtual void receive_message(const uint8_t* data, uint32_t size) = 0;
typedef void (*reply_handler_cb)(Reply* m, void* ctx);
virtual void register_reply_handler(reply_handler_cb cb, void* ctx) = 0;
typedef void (*global_commit_handler_cb)(int64_t tx_ctx, void* cb_ctx);
typedef void (*global_commit_handler_cb)(
int64_t tx_ctx, View view, void* cb_ctx);
virtual void register_global_commit(
global_commit_handler_cb cb, void* ctx) = 0;
virtual size_t num_correct_replicas() const = 0;

Просмотреть файл

@ -105,12 +105,28 @@ namespace pbft
std::unique_ptr<ClientProxy<kv::TxHistory::RequestID, void>> client_proxy;
std::shared_ptr<enclave::RPCSessions> rpcsessions;
SeqNo global_commit_seqno;
View last_commit_view;
std::unique_ptr<pbft::Store> store;
struct view_change_info
{
view_change_info(View view_, SeqNo min_global_commit_) :
min_global_commit(min_global_commit_),
view(view_)
{}
SeqNo min_global_commit;
View view;
};
std::vector<view_change_info> view_change_list;
struct register_global_commit_info
{
pbft::Store* store;
SeqNo* global_commit_seqno;
View* last_commit_view;
std::vector<view_change_info>* view_change_list;
} register_global_commit_ctx;
public:
@ -125,7 +141,9 @@ namespace pbft
channels(channels_),
rpcsessions(rpcsessions_),
global_commit_seqno(1),
store(std::move(store_))
last_commit_view(0),
store(std::move(store_)),
view_change_list(1, view_change_info(0, 0))
{
// configure replica
GeneralInfo general_info;
@ -195,18 +213,25 @@ namespace pbft
message_receiver_base->register_reply_handler(
reply_handler_cb, client_proxy.get());
auto global_commit_cb = [](kv::Version version, void* ctx) {
auto global_commit_cb = [](kv::Version version, ::View view, void* ctx) {
auto p = static_cast<register_global_commit_info*>(ctx);
if (version == kv::NoVersion || version < *p->global_commit_seqno)
{
return;
}
*p->global_commit_seqno = version;
if (*p->last_commit_view < view)
{
p->view_change_list->emplace_back(view, version);
}
p->store->compact(version);
};
register_global_commit_ctx.store = store.get();
register_global_commit_ctx.global_commit_seqno = &global_commit_seqno;
register_global_commit_ctx.last_commit_view = &last_commit_view;
register_global_commit_ctx.view_change_list = &view_change_list;
message_receiver_base->register_global_commit(
global_commit_cb, &register_global_commit_ctx);
@ -239,17 +264,23 @@ namespace pbft
client_proxy.get());
}
// TODO(#PBFT): PBFT consensus should implement the following functions to
// return meaningful information to clients (e.g. global commit, term/view)
// https://github.com/microsoft/CCF/issues/57
View get_view() override
{
return 2;
return message_receiver_base->view() + 2;
}
View get_view(SeqNo seqno) override
{
return 2;
for (auto rit = view_change_list.rbegin(); rit != view_change_list.rend();
++rit)
{
view_change_info& info = *rit;
if (info.min_global_commit <= seqno)
{
return info.view + 2;
}
}
throw std::logic_error("should never be here");
}
SeqNo get_commit_seqno() override
@ -259,7 +290,7 @@ namespace pbft
kv::NodeId primary() override
{
return 0;
return message_receiver_base->primary();
}
void add_configuration(

Просмотреть файл

@ -14,7 +14,7 @@ namespace pbft
enum PbftMsgType : Node2NodeMsg
{
pbft_message = 0,
pbft_message = 1000,
};
#pragma pack(push, 1)

Просмотреть файл

@ -28,7 +28,7 @@ def wait_for_index_globally_committed(index, term, nodes):
with f.node_client() as c:
id = c.request("getCommit", {"commit": index})
res = c.response(id)
if res.result["term"] == term and res.global_commit > index:
if res.result["term"] == term and (res.global_commit >= index):
up_to_date_f.append(f.node_id)
if len(up_to_date_f) == len(nodes):
break
@ -41,7 +41,11 @@ def wait_for_index_globally_committed(index, term, nodes):
def run(args):
# Three nodes minimum to make sure that the raft network can still make progress
# if one node stops
hosts = ["localhost", "localhost", "localhost"]
if args.consensus == "pbft":
hosts = ["localhost", "localhost", "localhost", "localhost"]
else:
hosts = ["localhost", "localhost", "localhost"]
with infra.ccf.network(
hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
@ -55,13 +59,19 @@ def run(args):
# Number of nodes F to stop until network cannot make progress
nodes_to_stop = math.ceil(len(hosts) / 2)
if args.consensus == "pbft":
nodes_to_stop = math.ceil(len(hosts) / 3)
for _ in range(nodes_to_stop):
# Note that for the first iteration, the primary is known in advance anyway
LOG.debug("Find freshly elected primary")
primary, current_term = network.find_primary()
LOG.debug("Commit new transactions")
LOG.debug(
"Commit new transactions, primary:{}, current_term:{}".format(
primary, current_term
)
)
commit_index = None
with primary.user_client(format="json") as c:
res = c.do(
@ -95,10 +105,14 @@ def run(args):
try:
primary, current_term = network.find_primary()
assert False, "Primary should not be found"
except TypeError:
assert args.consensus == "pbft", "Unexpected error"
except AssertionError:
LOG.info(
"As expected, primary could not be found after election timeout. Test ended successfully."
)
assert args.consensus == "raft", "Unexpected error"
LOG.info(
"As expected, primary could not be found after election timeout. Test ended successfully."
)
if __name__ == "__main__":

Просмотреть файл

@ -102,12 +102,13 @@ class Network:
)
# If the network is opening, node are trusted without consortium approval
if self.status == ServiceStatus.OPENING and args.consensus != "pbft":
try:
node.wait_for_node_to_join()
except TimeoutError:
LOG.error(f"New node {node.node_id} failed to join the network")
raise
if self.status == ServiceStatus.OPENING:
if args.consensus != "pbft":
try:
node.wait_for_node_to_join()
except TimeoutError:
LOG.error(f"New node {node.node_id} failed to join the network")
raise
node.network_state = infra.node.NodeNetworkState.joined
def _start_all_nodes(