This commit is contained in:
olgavrou 2020-06-03 21:11:41 +01:00 коммит произвёл GitHub
Родитель 1c8df536ba
Коммит 09addb09b5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 411 добавлений и 191 удалений

Просмотреть файл

@ -547,6 +547,12 @@ if(BUILD_TESTS)
CONSENSUS pbft
)
add_e2e_test(
NAME replay_new_view_pbft
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/replay_new_view.py
CONSENSUS pbft
)
if(BUILD_SMALLBANK)
include(${CMAKE_CURRENT_SOURCE_DIR}/samples/apps/smallbank/smallbank.cmake)
endif()

Просмотреть файл

@ -63,4 +63,9 @@ void LedgerWriter::write_new_view(New_view* nv)
{(const uint8_t*)nv->contents(),
(const uint8_t*)nv->contents() + nv->size()}},
pbft_new_views_map);
}
void LedgerWriter::write_new_view(kv::Tx& tx)
{
store.commit_tx(tx);
}

Просмотреть файл

@ -32,4 +32,5 @@ public:
kv::Version write_pre_prepare(Pre_prepare* pp, View view);
kv::Version write_pre_prepare(kv::Tx& tx, Pre_prepare* pp);
void write_new_view(New_view* nv);
void write_new_view(kv::Tx& tx);
};

Просмотреть файл

@ -460,7 +460,7 @@ Pre_prepare::ValidProofs_iter::ValidProofs_iter(Pre_prepare* m)
}
bool Pre_prepare::ValidProofs_iter::get(
int& id, bool& is_valid_proof, Digest& prepare_digest)
int& id, bool& is_valid_proof, Digest& prepare_digest, bool is_null_op)
{
if (proofs_left <= 0)
{
@ -480,7 +480,7 @@ bool Pre_prepare::ValidProofs_iter::get(
LOG_INFO_FMT("Sender principal has not been configured yet {}", id);
is_valid_proof = false;
}
else
else if (!is_null_op)
{
PrepareSignature s(prepare_digest, id, ic->nonce);

Просмотреть файл

@ -201,11 +201,16 @@ public:
// Requires: Pre_prepare is known to be valid
// Effects: Return an iterator for the valid principal prepare proofs in "m
bool get(int& id, bool& is_valid_proof, Digest& prepare_digest);
bool get(
int& id,
bool& is_valid_proof,
Digest& prepare_digest,
bool is_null_op = false);
// Effects: Updates "proofs" to "point" to the next proof's "IncludedSig"
// pid in the Pre_prepare message and returns true. If there are no more
// proofs left to process, it returns false. "is_valid_proof" indicates
// whether the proof is valid or not
// whether the proof is valid or not. Null ops are not signed, and therefore
// their signature is not checked
private:
Pre_prepare* msg;

Просмотреть файл

@ -46,6 +46,7 @@ public:
virtual int my_id() const = 0;
virtual void emit_signature_on_next_pp(int64_t version) = 0;
virtual void playback_pre_prepare(kv::Tx& tx) = 0;
virtual void playback_new_view(kv::Tx& tx) = 0;
virtual void playback_request(kv::Tx& tx) = 0;
virtual char* create_response_message(
int client_id, Request_id rid, uint32_t size, uint64_t nonce) = 0;

Просмотреть файл

@ -373,6 +373,11 @@ void Replica::receive_message(const uint8_t* data, uint32_t size)
void Replica::update_gov_req_info(ByzInfo& info, Pre_prepare* pre_prepare)
{
if (pre_prepare->num_big_reqs() <= 0)
{
// null op
return;
}
info.last_exec_gov_req = gov_req_track.last_seqno();
if (info.did_exec_gov_req)
{
@ -490,7 +495,7 @@ void Replica::playback_request(kv::Tx& tx)
waiting_for_playback_pp = true;
vec_exec_cmds[0] = std::move(execute_tentative_request(
*req, playback_max_local_commit_value, true, &tx, true));
*req, playback_max_local_commit_value, true, &tx, -1));
exec_command(vec_exec_cmds, playback_byz_info, 1, 0, false);
did_exec_gov_req = did_exec_gov_req || playback_byz_info.did_exec_gov_req;
@ -508,7 +513,8 @@ void Replica::add_certs_if_valid(
Pre_prepare::ValidProofs_iter vp_iter(pp);
int p_id;
bool valid;
while (vp_iter.get(p_id, valid, prev_pp->digest()))
while (
vp_iter.get(p_id, valid, prev_pp->digest(), prev_pp->num_big_reqs() == 0))
{
if (valid)
{
@ -578,6 +584,14 @@ void Replica::playback_pre_prepare(kv::Tx& tx)
LOG_TRACE_FMT("Playback pre-prepare {}", pre_prepare.seqno);
auto executable_pp = create_message<Pre_prepare>(
pre_prepare.contents.data(), pre_prepare.contents.size());
if (!executable_pp->pre_verify())
{
LOG_INFO_FMT(
"Did not verify playback pre-prepare for seqno {} from node {}",
executable_pp->seqno(),
executable_pp->id());
return;
}
auto seqno = executable_pp->seqno();
playback_pp_seqno = seqno;
waiting_for_playback_pp = false;
@ -587,7 +601,17 @@ void Replica::playback_pre_prepare(kv::Tx& tx)
update_gov_req_info(playback_byz_info, executable_pp.get());
did_exec_gov_req = false;
if (compare_execution_results(playback_byz_info, executable_pp.get()))
if (executable_pp->num_big_reqs() == 0)
{
// null op pre prepare, we need to advance last tentative exec but nothing
// will be executed
ByzInfo info;
execute_tentative(executable_pp.get(), info, executable_pp->get_nonce());
}
if (
executable_pp->num_big_reqs() == 0 /*null op*/ ||
compare_execution_results(playback_byz_info, executable_pp.get()))
{
next_pp_seqno = seqno;
@ -597,9 +621,7 @@ void Replica::playback_pre_prepare(kv::Tx& tx)
}
LOG_TRACE_FMT("Storing pre prepare at seqno {}", seqno);
last_te_version = ledger_writer->write_pre_prepare(tx, executable_pp.get());
global_commit(executable_pp.get());
last_executed++;
@ -610,7 +632,6 @@ void Replica::playback_pre_prepare(kv::Tx& tx)
"prepare");
populate_certificates(executable_pp.get());
auto& prepared_cert = plog.fetch(executable_pp->seqno());
prepared_cert.add(executable_pp.release());
@ -637,6 +658,42 @@ void Replica::playback_pre_prepare(kv::Tx& tx)
}
}
void Replica::playback_new_view(kv::Tx& tx)
{
auto view = tx.get_view(pbft_new_views_map);
auto nv = view->get(0);
PBFT_ASSERT(
nv.has_value(),
"Deserialised new view but it was not found in the new-views map");
auto new_view_val = nv.value();
LOG_TRACE_FMT(
"Playback new-view with view {} for node {}",
new_view_val.view,
new_view_val.node_id);
auto new_view = create_message<New_view>(
new_view_val.contents.data(), new_view_val.contents.size());
if (!new_view->pre_verify())
{
LOG_INFO_FMT(
"Did not verify playback new-view for view {} from node {}",
new_view->view(),
new_view->id());
return;
}
ledger_writer->write_new_view(tx);
// enter the new view
v = new_view->view();
cur_primary = v % num_replicas;
vi.add(new_view.release());
vi.set_new_view(v);
if (encryptor)
{
encryptor->set_iv_id(v);
}
LOG_INFO_FMT("Done with process new view {}", v);
}
void Replica::init_state()
{
// Compute digest of initial state and first checkpoint.
@ -1103,7 +1160,6 @@ void Replica::send_prepare(Seqno seqno, std::optional<ByzInfo> byz_info)
{
is_exec_pending = true;
Prepared_cert& pc = plog.fetch(seqno);
if (pc.my_prepare() == 0 && pc.is_pp_complete())
{
bool send_only_to_self = (f() == 0);
@ -1235,7 +1291,7 @@ void Replica::handle(Prepare* m)
}
const Seqno ms = m->seqno();
LOG_DEBUG_FMT("handle prepare {}", ms);
LOG_DEBUG_FMT("handle prepare {} from {}", ms, m->id());
// Only accept prepare messages that are not sent by the primary for
// current view.
if (
@ -1561,6 +1617,17 @@ void Replica::handle(Status* m)
m->last_executed(),
max_out);
if (
last_stable > m->last_stable() && last_executed > m->last_executed() + 1)
{
LOG_TRACE_FMT(
"Sending append entries to {} since we are way off", m->id());
Append_entries ae;
send(&ae, m->id());
delete m;
return;
}
if (m->view() < v)
{
// Retransmit my latest view-change message
@ -1581,83 +1648,71 @@ void Replica::handle(Status* m)
{
min = std::max(last_stable + 1, m->last_executed() + 1);
LOG_TRACE_FMT("Retransmitting from min {} to max {}", min, max);
if (
last_stable > m->last_stable() &&
last_executed > m->last_executed() + 1)
for (Seqno n = min; n <= max; n++)
{
LOG_TRACE_FMT(
"Sending append entries to {} since we are way off", m->id());
Append_entries ae;
send(&ae, m->id());
}
else
{
for (Seqno n = min; n <= max; n++)
if (m->is_committed(n))
{
if (m->is_committed(n))
{
// No need for retransmission of commit or pre-prepare/prepare
// message.
continue;
}
Commit* c = clog.fetch(n).mine(t_sent);
if (c != 0)
{
retransmit(c, current, t_sent, p.get());
}
if (m->is_prepared(n))
{
// No need for retransmission of pre-prepare/prepare message.
continue;
}
// If I have a pre-prepare/prepare send it, provided I have sent
// a pre-prepare/prepare for view v.
if (primary() == node_id)
{
Pre_prepare* pp = plog.fetch(n).my_pre_prepare(t_sent);
if (pp != 0)
{
retransmit(pp, current, t_sent, p.get());
}
}
else
{
Prepare* pr = plog.fetch(n).my_prepare(t_sent);
if (pr != 0)
{
retransmit(pr, current, t_sent, p.get());
}
}
// No need for retransmission of commit or pre-prepare/prepare
// message.
continue;
}
if (id() == primary())
Commit* c = clog.fetch(n).mine(t_sent);
if (c != 0)
{
// For now only primary retransmits big requests.
Status::BRS_iter gen(m);
retransmit(c, current, t_sent, p.get());
}
int count = 0;
Seqno ppn;
BR_map mrmap;
while (gen.get(ppn, mrmap) && count <= max_ret_bytes)
if (m->is_prepared(n))
{
// No need for retransmission of pre-prepare/prepare message.
continue;
}
// If I have a pre-prepare/prepare send it, provided I have sent
// a pre-prepare/prepare for view v.
if (primary() == node_id)
{
Pre_prepare* pp = plog.fetch(n).my_pre_prepare(t_sent);
if (pp != 0)
{
if (plog.within_range(ppn))
retransmit(pp, current, t_sent, p.get());
}
}
else
{
Prepare* pr = plog.fetch(n).my_prepare(t_sent);
if (pr != 0)
{
retransmit(pr, current, t_sent, p.get());
}
}
}
if (id() == primary())
{
// For now only primary retransmits big requests.
Status::BRS_iter gen(m);
int count = 0;
Seqno ppn;
BR_map mrmap;
while (gen.get(ppn, mrmap) && count <= max_ret_bytes)
{
if (plog.within_range(ppn))
{
Pre_prepare_info::BRS_iter gen(
plog.fetch(ppn).prep_info(), mrmap);
Request* r;
while (gen.get(r))
{
Pre_prepare_info::BRS_iter gen(
plog.fetch(ppn).prep_info(), mrmap);
Request* r;
while (gen.get(r))
{
LOG_TRACE_FMT(
"Retransmitting request with id {} and cid {}",
r->request_id(),
r->client_id());
INCR_OP(message_counts_retransmitted[m->tag()]);
send(r, m->id());
count += r->size();
}
LOG_TRACE_FMT(
"Retransmitting request with id {} and cid {}",
r->request_id(),
r->client_id());
INCR_OP(message_counts_retransmitted[m->tag()]);
send(r, m->id());
count += r->size();
}
}
}
@ -2029,18 +2084,9 @@ void Replica::process_new_view(Seqno min, Digest d, Seqno max, Seqno ms)
Prepared_cert& pc = plog.fetch(i);
PBFT_ASSERT(pp != 0 && pp->digest() == d, "Invalid state");
Pre_prepare::Requests_iter iter(pp);
Request request;
size_t req_in_pp = 0;
while (iter.get(request))
{
req_in_pp++;
}
if (encryptor)
if (encryptor && pp->num_big_reqs() > 0)
{
// don't change encryptor if nullop
encryptor->set_iv_id(prev_view);
}
@ -2067,12 +2113,7 @@ void Replica::process_new_view(Seqno min, Digest d, Seqno max, Seqno ms)
{
last_te_version = ledger_writer->write_pre_prepare(pp, prev_view);
}
if (req_in_pp > 0)
{
// not a null op
update_gov_req_info(info, pp);
}
update_gov_req_info(info, pp);
}
if (i <= last_executed || pc.is_complete())
@ -2112,7 +2153,7 @@ void Replica::process_new_view(Seqno min, Digest d, Seqno max, Seqno ms)
{
encryptor->set_iv_id(v);
}
LOG_INFO << "Done with process new view " << v << std::endl;
LOG_INFO_FMT("Done with process new view {}", v);
}
Pre_prepare* Replica::prepared_pre_prepare(Seqno n, bool was_f_0)
@ -2471,7 +2512,6 @@ void Replica::execute_committed(bool was_f_0)
}
Pre_prepare* pp = committed(last_executed + 1, was_f_0);
if (pp && pp->view() == view())
{
// Can execute the requests in the message with sequence number

Просмотреть файл

@ -235,6 +235,10 @@ public:
// Effects: pre-prepare is verified, if merkle roots match
// we update the pre-prepare related meta-data, if not we rollback
void playback_new_view(kv::Tx& tx);
// Effects: if the new-view message is verified then the replica enters
// the new view
bool IsExecutionPending()
{
return is_exec_pending;

Просмотреть файл

@ -286,10 +286,11 @@ TEST_CASE("Test Ledger Replay")
// initiate replica with stub consensus to be used on replay
auto write_consensus =
std::make_shared<kv::StubConsensus>(ConsensusType::PBFT);
NodeInfo write_node_info;
INFO("Create dummy pre-prepares and write them to ledger");
{
PbftState pbft_state;
init_test_state(pbft_state);
write_node_info = init_test_state(pbft_state);
pbft_state.store->set_consensus(write_consensus);
auto& write_derived_map =
@ -347,6 +348,7 @@ TEST_CASE("Test Ledger Replay")
info.replicated_state_merkle_root.data()[0] = i + 1;
pp->set_merkle_roots_and_ctx(info.replicated_state_merkle_root, info.ctx);
pp->set_digest();
ledger_writer.write_pre_prepare(pp.get());
}
@ -359,7 +361,12 @@ TEST_CASE("Test Ledger Replay")
"corrupt, and in order");
{
PbftState pbft_state;
init_test_state(pbft_state);
NodeId node_id = 1;
init_test_state(pbft_state, node_id);
// let this node know about the node that signed the pre prepare
// otherwise we can't verify the pre prepare during playback
pbft::GlobalState::get_node().add_principal(
write_node_info.general_info.principal_info[0]);
auto consensus = std::make_shared<kv::StubConsensus>(ConsensusType::PBFT);
pbft_state.store->set_consensus(consensus);

Просмотреть файл

@ -260,6 +260,12 @@ void View_info::discard_old_and_resize_if_needed()
}
}
void View_info::set_new_view(View vi)
{
v = vi;
last_nvs[pbft::GlobalState::get_node().primary(v)].make_complete(v);
}
void View_info::view_change(View vi, Seqno last_executed, State* state)
{
v = vi;

Просмотреть файл

@ -69,6 +69,9 @@ public:
// view "v", and sends view-change acks for any logged view-change
// messages from other replicas with view "v".
void set_new_view(View v);
// during playback sets the new view to v and marks the view as complete
//
// Handling received messages:
//

Просмотреть файл

@ -868,6 +868,11 @@ namespace pbft
message_receiver_base->playback_pre_prepare(tx);
break;
}
case kv::DeserialiseSuccess::PASS_NEW_VIEW:
{
message_receiver_base->playback_new_view(tx);
break;
}
default:
throw std::logic_error("Unknown DeserialiseSuccess value");
}

Просмотреть файл

@ -57,6 +57,7 @@ namespace pbft
ccf::Signatures& signatures) = 0;
virtual kv::Version commit_tx(
kv::Tx& tx, CBuffer root, ccf::Signatures& signatures) = 0;
virtual kv::Version commit_tx(kv::Tx& tx) = 0;
virtual void commit_new_view(
const pbft::NewView& new_view, pbft::NewViewsMap& pbft_new_views_map) = 0;
virtual std::shared_ptr<kv::AbstractTxEncryptor> get_encryptor() = 0;
@ -122,15 +123,20 @@ namespace pbft
}
kv::Version commit_tx(kv::Tx& tx, CBuffer root, ccf::Signatures& signatures)
{
auto sig_view = tx.get_view(signatures);
ccf::Signature sig_value(root);
sig_view->put(0, sig_value);
return commit_tx(tx);
}
kv::Version commit_tx(kv::Tx& tx)
{
while (true)
{
auto p = x.lock();
if (p)
{
auto sig_view = tx.get_view(signatures);
ccf::Signature sig_value(root);
sig_view->put(0, sig_value);
auto success = tx.commit();
if (success == kv::CommitSuccess::OK)
{

114
tests/infra/suspension.py Normal file
Просмотреть файл

@ -0,0 +1,114 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
from threading import Timer
import time
import suite.test_requirements as reqs
import infra.ccf
import random
from enum import Enum
from loguru import logger as LOG
class LateJoinerStatus(Enum):
# late joiner is stuck and needs all other replicas to view change
Stuck = "STUCK"
# late joiner is not accepting the getLocalCommit RPC's
NotReady = "NOT_READY"
# late joiner has caught up with the other replicas
Ready = "READY"
def timeout_handler(node, suspend, election_timeout):
if suspend:
# We want to suspend the nodes' process so we need to initiate a new timer to wake it up eventually
node.suspend()
next_timeout = random.uniform(2 * election_timeout, 3 * election_timeout)
LOG.info(f"New timer set for node {node.node_id} is {next_timeout} seconds")
t = Timer(next_timeout, timeout_handler, args=[node, False, 0])
t.start()
else:
node.resume()
def update_view_info(network, view_info):
try:
cur_primary, cur_view = network.find_primary()
view_info[cur_view] = cur_primary.node_id
except TimeoutError:
LOG.warning("Trying to access a suspended network")
def get_node_local_commit(node):
with node.node_client() as c:
r = c.get("commit")
return r.seqno, r.global_commit
def wait_for_late_joiner(old_node, late_joiner, strict=False, timeout=30):
old_node_lc, old_node_gc = get_node_local_commit(old_node)
LOG.success(
f"node {old_node.node_id} is at state local_commit:{old_node_lc}, global_commit:{old_node_gc}"
)
local_commit = 0
end = time.time() + timeout
while time.time() <= end:
try:
local_commit, gc = get_node_local_commit(late_joiner)
LOG.success(
f"late joiner {late_joiner.node_id} is at state local_commit:{local_commit}, global_commit:{gc}"
)
if local_commit >= old_node_lc:
return LateJoinerStatus.Ready
time.sleep(1)
except (
TimeoutError,
infra.clients.CCFConnectionException,
):
LOG.warning(
f"late joiner with node id {late_joiner.node_id} isn't quite ready yet"
)
return_state = None
if local_commit == 0:
return_state = LateJoinerStatus.NotReady
else:
return_state = LateJoinerStatus.Stuck
if strict:
raise AssertionError(
f"late joiner with node id {late_joiner.node_id} failed to catch up, it's final state is {return_state}"
)
else:
return return_state
@reqs.description("Suspend nodes")
def test_suspend_nodes(network, args, nodes=None):
cur_primary, _ = network.find_primary()
if nodes is None:
nodes = network.get_joined_nodes()
# first timer determines after how many seconds each node will be suspended
timeouts = []
for i, node in enumerate(nodes):
# if pbft suspend half of them including the primary
if i % 2 != 0 and args.consensus == "pbft":
continue
LOG.success(f"Will suspend node with id {node.node_id}")
t = random.uniform(0, 2)
LOG.info(f"Initial timer for node {node.node_id} is {t} seconds...")
timeouts.append((t, node))
for t, node in timeouts:
suspend_time = (
args.pbft_view_change_timeout / 1000
if args.consensus == "pbft"
else args.raft_election_timeout / 1000
)
if node.node_id == cur_primary.node_id and args.consensus == "pbft":
# if pbft suspend the primary for more than twice the election timeout
# in order to make sure view changes will be triggered
suspend_time = 2.5 * suspend_time
tm = Timer(t, timeout_handler, args=[node, True, suspend_time])
tm.start()
return network

97
tests/replay_new_view.py Normal file
Просмотреть файл

@ -0,0 +1,97 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.ccf
import infra.proc
import infra.notification
import infra.net
import infra.e2e_args
import infra.suspension as suspend
import random
import infra.logging_app as app
from loguru import logger as LOG
TOTAL_REQUESTS = 9 # x2 is 18 since LoggingTxs app sends a private and a public request for each tx index
def run(args):
hosts = ["localhost", "localhost", "localhost"]
LOG.info(f"setting seed to {args.seed}")
random.seed(args.seed)
txs = app.LoggingTxs()
with infra.ccf.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb, txs=txs
) as network:
network.start_and_join(args)
original_nodes = network.get_joined_nodes()
view_info = {}
suspend.update_view_info(network, view_info)
app.test_run_txs(network=network, args=args, num_txs=TOTAL_REQUESTS)
suspend.test_suspend_nodes(network, args)
# run txs while nodes get suspended
app.test_run_txs(
network=network,
args=args,
num_txs=4 * TOTAL_REQUESTS,
ignore_failures=True,
)
suspend.update_view_info(network, view_info)
late_joiner = network.create_and_trust_node(args.package, "localhost", args)
# some requests to be processed while the late joiner catches up
# (no strict checking that these requests are actually being processed simultaneously with the node catchup)
app.test_run_txs(
network=network,
args=args,
num_txs=int(TOTAL_REQUESTS / 2),
nodes=original_nodes, # doesn't contain late joiner
verify=False, # will try to verify for late joiner and it might not be ready yet
)
caught_up = suspend.wait_for_late_joiner(original_nodes[0], late_joiner)
if caught_up == suspend.LateJoinerStatus.Stuck:
# should be removed when node configuration has been implemented to allow
# a late joiner to force a view change
LOG.warning("late joiner is stuck, stop trying if catchup fails again")
suspend.wait_for_late_joiner(original_nodes[0], late_joiner, True)
elif caught_up == suspend.LateJoinerStatus.NotReady:
while caught_up == suspend.LateJoinerStatus.NotReady:
LOG.warning("late joiner is not ready to accept RPC's yet")
caught_up = suspend.wait_for_late_joiner(original_nodes[0], late_joiner)
elif caught_up == suspend.LateJoinerStatus.Ready:
LOG.success("late joiner caught up successfully")
# check nodes have resumed normal execution before shutting down
app.test_run_txs(
network=network, args=args, num_txs=len(network.get_joined_nodes())
)
# assert that view changes actually did occur
assert len(view_info) > 1
LOG.success("----------- views and primaries recorded -----------")
for view, primary in view_info.items():
LOG.success(f"view {view} - primary {primary}")
if __name__ == "__main__":
def add(parser):
parser.add_argument(
"--seed",
help="seed used to randomise the node suspension timeouts",
default=random.randint(1, 10),
)
args = infra.e2e_args.cli_args(add)
if args.js_app_script:
args.package = "libjs_generic"
elif args.app_script:
args.package = "liblua_generic"
else:
args.package = "liblogging"
run(args)

Просмотреть файл

@ -5,10 +5,8 @@ import infra.proc
import infra.notification
import infra.net
import infra.e2e_args
from threading import Timer
import time
import infra.suspension as suspend
import random
import suite.test_requirements as reqs
import infra.logging_app as app
from loguru import logger as LOG
@ -20,84 +18,6 @@ from loguru import logger as LOG
TOTAL_REQUESTS = 9 # x2 is 18 since LoggingTxs app sends a private and a public request for each tx index
def timeout_handler(node, suspend, election_timeout):
if suspend:
# We want to suspend the nodes' process so we need to initiate a new timer to wake it up eventually
node.suspend()
next_timeout = random.uniform(2 * election_timeout, 3 * election_timeout)
LOG.info(f"New timer set for node {node.node_id} is {next_timeout} seconds")
t = Timer(next_timeout, timeout_handler, args=[node, False, 0])
t.start()
else:
node.resume()
def update_view_info(network, view_info):
try:
cur_primary, cur_view = network.find_primary()
view_info[cur_view] = cur_primary.node_id
except TimeoutError:
LOG.warning("Trying to access a suspended network")
def get_node_local_commit(node):
with node.node_client() as c:
r = c.get("commit")
return r.seqno, r.global_commit
def wait_for_late_joiner(old_node, late_joiner, timeout=30):
old_node_lc, old_node_gc = get_node_local_commit(old_node)
LOG.success(
f"node {old_node.node_id} is at state local_commit:{old_node_lc}, global_commit:{old_node_gc}"
)
end = time.time() + timeout
while time.time() <= end:
try:
lc, gc = get_node_local_commit(late_joiner)
LOG.success(
f"late joiner {late_joiner.node_id} is at state local_commit:{lc}, global_commit:{gc}"
)
if lc >= old_node_lc:
return
time.sleep(1)
except (
TimeoutError,
infra.clients.CCFConnectionException,
):
LOG.warning(f"late joiner {late_joiner.node_id} isn't quite ready yet")
@reqs.description("Suspend nodes")
def test_suspend_nodes(network, args, nodes_to_keep):
cur_primary, _ = network.find_primary()
# first timer determines after how many seconds each node will be suspended
timeouts = []
for i, node in enumerate(nodes_to_keep):
# if pbft suspend half of them including the primary
if i % 2 != 0 and args.consensus == "pbft":
continue
LOG.success(f"Will suspend node with id {node.node_id}")
t = random.uniform(0, 2)
LOG.info(f"Initial timer for node {node.node_id} is {t} seconds...")
timeouts.append((t, node))
for t, node in timeouts:
suspend_time = (
args.pbft_view_change_timeout / 1000
if args.consensus == "pbft"
else args.raft_election_timeout / 1000
)
if node.node_id == cur_primary.node_id and args.consensus == "pbft":
# if pbft suspend the primary for more than twice the election timeout
# in order to make sure view changes will be triggered
suspend_time = 2.5 * suspend_time
tm = Timer(t, timeout_handler, args=[node, True, suspend_time])
tm.start()
return network
def run(args):
hosts = ["localhost", "localhost", "localhost"]
@ -111,10 +31,10 @@ def run(args):
network.start_and_join(args)
original_nodes = network.get_joined_nodes()
view_info = {}
update_view_info(network, view_info)
suspend.update_view_info(network, view_info)
app.test_run_txs(network=network, args=args, num_txs=TOTAL_REQUESTS)
update_view_info(network, view_info)
suspend.update_view_info(network, view_info)
nodes_to_kill = [network.find_any_backup()]
nodes_to_keep = [n for n in original_nodes if n not in nodes_to_kill]
@ -133,7 +53,7 @@ def run(args):
verify=False, # will try to verify for late joiner and it might not be ready yet
)
wait_for_late_joiner(original_nodes[0], late_joiner)
suspend.wait_for_late_joiner(original_nodes[0], late_joiner)
# kill the old node(s) and ensure we are still making progress
for backup_to_retire in nodes_to_kill:
@ -152,7 +72,7 @@ def run(args):
# take longer than usual to complete and we don't want the test to break here
)
test_suspend_nodes(network, args, nodes_to_keep)
suspend.test_suspend_nodes(network, args, nodes_to_keep)
# run txs while nodes get suspended
app.test_run_txs(
@ -162,7 +82,7 @@ def run(args):
ignore_failures=True,
)
update_view_info(network, view_info)
suspend.update_view_info(network, view_info)
# check nodes have resumed normal execution before shutting down
app.test_run_txs(network=network, args=args, num_txs=len(nodes_to_keep))