ePBFT - smallbank and other things (#532)

- Get smallbank to work with the pbft backend
- Create a python pbft flag and then use that to make network.start_and_join work for both raft and pbft
- Bug fix where we do not return the KV version properly
- Change to use the standard allocator for pbft message allocations
This commit is contained in:
Alex 2019-11-07 11:55:23 +00:00 коммит произвёл GitHub
Родитель d106f9dc03
Коммит de6e09bb17
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 110 добавлений и 85 удалений

Просмотреть файл

@ -388,6 +388,10 @@ if(BUILD_TESTS)
else()
message(STATUS "Using PBFT as consensus")
include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/pbft.cmake)
if (BUILD_SMALLBANK)
include(${CMAKE_CURRENT_SOURCE_DIR}/samples/apps/smallbank/smallbank.cmake)
endif()
endif()
if (EXTENSIVE_TESTS)

Просмотреть файл

@ -557,11 +557,18 @@ add_enclave_library_c(http_parser.host "${HTTP_PARSER_SOURCES}")
set_property(TARGET http_parser.host PROPERTY POSITION_INDEPENDENT_CODE ON)
# Common test args for Python scripts starting up CCF networks
if(PBFT)
set(PBFT_ARG "--pbft")
else()
unset(PBFT_ARG)
endif()
set(CCF_NETWORK_TEST_ARGS
${TEST_IGNORE_QUOTE}
${TEST_ENCLAVE_TYPE}
-l ${TEST_HOST_LOGGING_LEVEL}
-g ${CCF_DIR}/src/runtime_config/gov.lua
${PBFT_ARG}
)
# Lua generic app

Просмотреть файл

@ -34,33 +34,35 @@ if(BUILD_TESTS)
set(SMALL_BANK_SIGNED_ITERATIONS 2000)
endif ()
add_perf_test(
NAME small_bank_sigs_client_test
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py
CLIENT_BIN ./small_bank_client
VERIFICATION_FILE ${SMALL_BANK_SIGNED_VERIFICATION_FILE}
ITERATIONS ${SMALL_BANK_SIGNED_ITERATIONS}
ADDITIONAL_ARGS
--label Small_Bank_Client_Sigs
--max-writes-ahead 1000
--sign
--metrics-file small_bank_sigs_metrics.json
)
if(NOT PBFT)
add_perf_test(
NAME small_bank_sigs_client_test
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py
CLIENT_BIN ./small_bank_client
VERIFICATION_FILE ${SMALL_BANK_SIGNED_VERIFICATION_FILE}
ITERATIONS ${SMALL_BANK_SIGNED_ITERATIONS}
ADDITIONAL_ARGS
--label Small_Bank_Client_Sigs
--max-writes-ahead 1000
--sign
--metrics-file small_bank_sigs_metrics.json
)
# It is better to run performance tests with forwarding on different machines
# (i.e. nodes and clients)
add_perf_test(
NAME small_bank_sigs_forwarding
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py
CLIENT_BIN ./small_bank_client
ITERATIONS ${SMALL_BANK_SIGNED_ITERATIONS}
ADDITIONAL_ARGS
--label Small_Bank_ClientSigs_Forwarding
--max-writes-ahead 1000
--metrics-file small_bank_fwd_metrics.json
-n localhost -n localhost
-cn localhost
--send-tx-to backups
--sign
)
# It is better to run performance tests with forwarding on different machines
# (i.e. nodes and clients)
add_perf_test(
NAME small_bank_sigs_forwarding
PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/small_bank_client.py
CLIENT_BIN ./small_bank_client
ITERATIONS ${SMALL_BANK_SIGNED_ITERATIONS}
ADDITIONAL_ARGS
--label Small_Bank_ClientSigs_Forwarding
--max-writes-ahead 1000
--metrics-file small_bank_fwd_metrics.json
-n localhost -n localhost
-cn localhost
--send-tx-to backups
--sign
)
endif()
endif()

Просмотреть файл

@ -216,6 +216,8 @@ bool Big_req_table::add_unmatched(Request* r, Request*& old_req)
if (centry.num_requests >= Max_unmatched_requests_per_client)
{
LOG_FAIL << "Too many unbuffered requests for client_id:" << r->client_id()
<< std::endl;
old_req = centry.requests.back();
centry.requests.pop_back();
}

Просмотреть файл

@ -78,7 +78,9 @@ public:
void dump_state(std::ostream& os);
// Effects: logs state for debugging
static const int Max_unmatched_requests_per_client = 8;
static const int Max_unmatched_requests_per_client = 1024;
// Effect: The maximum number of requests we will store per client.
// This functions as a LRU cache.
private:
bool check_pcerts(BR_entry* bre);

Просмотреть файл

@ -88,7 +88,7 @@ private:
RequestContext* prev;
};
std::unordered_map<Request_id, std::unique_ptr<RequestContext>> out_reqs;
static const int Max_outstanding = 128;
static const int Max_outstanding = 1000 * 100;
// list of outstanding requests used for retransmissions
// (we only retransmit the request at the head of the queue)
@ -147,6 +147,7 @@ bool ClientProxy<T, C>::send_request(
{
if (out_reqs.size() >= Max_outstanding)
{
LOG_FAIL << "Too many outstanding requests, rejecting!" << std::endl;
return false;
}

Просмотреть файл

@ -34,7 +34,7 @@
# define DEBUG_ALLOC 1
#endif
//#define USE_STD_MALLOC
#define USE_STD_MALLOC
class Log_allocator
{

Просмотреть файл

@ -93,6 +93,12 @@ void Message::trim()
void Message::set_size(int size)
{
PBFT_ASSERT(msg && ALIGNED(msg), "Invalid state");
if (!(max_size < 0 || ALIGNED_SIZE(size) <= max_size))
{
LOG_INFO << "Error - size:" << size
<< ", aligned_size:" << ALIGNED_SIZE(size)
<< ", max_size:" << max_size << std::endl;
}
PBFT_ASSERT(max_size < 0 || ALIGNED_SIZE(size) <= max_size, "Invalid state");
int aligned = ALIGNED_SIZE(size);
for (int i = size; i < aligned; i++)

Просмотреть файл

@ -110,6 +110,8 @@ Pre_prepare::Pre_prepare(
INCR_CNT(sum_batch_size, requests_in_batch);
INCR_OP(batch_size_histogram[requests_in_batch]);
LOG_TRACE << "request in batch:" << requests_in_batch << std::endl;
// Compute authenticator and update size.
int old_size = sizeof(Pre_prepare_rep) + rep().rset_size +
rep().n_big_reqs * sizeof(Digest) + rep().non_det_size;

Просмотреть файл

@ -1836,6 +1836,7 @@ void Replica::execute_prepared(bool committed)
if (global_commit_cb != nullptr)
{
LOG_TRACE << "Global_commit:" << pp->get_ctx() << std::endl;
global_commit_cb(pp->get_ctx(), global_commit_ctx);
}
}
@ -1857,6 +1858,7 @@ bool Replica::execute_tentative(Pre_prepare* pp, ByzInfo& info)
// each of them.
Pre_prepare::Requests_iter iter(pp);
Request request;
int64_t max_local_commit_value = INT64_MIN;
while (iter.get(request))
{
@ -1914,9 +1916,15 @@ bool Replica::execute_tentative(Pre_prepare* pp, ByzInfo& info)
// Finish constructing the reply.
LOG_DEBUG << "Executed from tentative exec: " << pp->seqno()
<< " from client: " << client_id
<< " rid: " << request.request_id() << " ctx: " << info.ctx
<< std::endl;
<< " rid: " << request.request_id()
<< " commit_id: " << info.ctx << std::endl;
if (info.ctx > max_local_commit_value)
{
max_local_commit_value = info.ctx;
}
info.ctx = max_local_commit_value;
#ifdef ENFORCE_EXACTLY_ONCE
replies.end_reply(client_id, request.request_id(), outb.size);
#else
@ -1924,6 +1932,9 @@ bool Replica::execute_tentative(Pre_prepare* pp, ByzInfo& info)
client_id, request.request_id(), last_tentative_execute, outb.size);
#endif
}
LOG_DEBUG << "Executed from tentative exec: " << pp->seqno()
<< " rid: " << request.request_id() << " commit_id: " << info.ctx
<< std::endl;
if (last_tentative_execute % checkpoint_interval == 0)
{

Просмотреть файл

@ -128,8 +128,8 @@ void setup_client_proxy()
auto req_timer_cb = [](void* ctx) {
auto cp = (ClientProxy<uint64_t, void>*)ctx;
while (request_count - reply_count <
Big_req_table::Max_unmatched_requests_per_client - 1)
static const uint32_t max_pending_requests = 7;
while (request_count - reply_count < max_pending_requests)
{
uint8_t request_buffer[8];
auto request = new (request_buffer) test_req;

Просмотреть файл

@ -100,6 +100,9 @@ def cli_args(add=lambda x: None, parser=None, accept_unknown=False):
parser.add_argument(
"--label", help="Unique identifier for the test", default=default_label
)
parser.add_argument(
"--pbft", help="Opens network the PBFT way", action="store_true",
)
add(parser)
if accept_unknown:

Просмотреть файл

@ -18,27 +18,12 @@ from loguru import logger as LOG
def run(args):
hosts = ["localhost"]
hosts = ["localhost"] * 4
with infra.ccf.network(
hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
primary, _ = network.start_and_join(args, open_network=False)
for i in range(1, 4):
LOG.info(f"Adding node {i}")
assert network.create_and_trust_node(
args.package, "localhost", args, target_node=None, should_wait=False
)
network.add_users(primary, network.initial_users)
LOG.info("Initial set of users added")
# network.open_network(primary)
script = None
result = network.propose(1, primary, script, None, "open_network")
network.vote_using_majority(primary, result[1]["id"], False)
LOG.info("***** Network is now open *****")
primary, _ = network.start_and_join(args)
with primary.node_client() as mc:
check_commit = infra.ccf.Checker(mc)

Просмотреть файл

@ -189,9 +189,10 @@ class Network:
primary, _ = self.find_primary()
self.check_for_service(primary, status=ServiceStatus.OPENING)
return primary
def start_and_join(self, args, open_network=True):
def start_and_join(self, args):
"""
Starts a CCF network.
:param args: command line arguments to configure the CCF nodes.
@ -211,11 +212,8 @@ class Network:
primary = self._start_all_nodes(args)
if not open_network:
LOG.warning("Network still needs to be opened")
return primary, self.nodes[1:]
self.wait_for_all_nodes_to_catch_up(primary)
if not args.pbft:
self.wait_for_all_nodes_to_catch_up(primary)
LOG.success("All nodes joined network")
if args.app_script:
@ -225,7 +223,7 @@ class Network:
self.add_users(primary, self.initial_users)
LOG.info("Initial set of users added")
self.open_network(primary)
self.open_network(args, primary)
LOG.success("***** Network is now open *****")
return primary, self.nodes[1:]
@ -251,7 +249,7 @@ class Network:
def remove_last_node(self):
last_node = self.nodes.pop()
def _add_node(self, node, lib_name, args, target_node=None, should_wait=True):
def _add_node(self, node, lib_name, args, target_node=None):
forwarded_args = {
arg: getattr(args, arg) for arg in infra.ccf.Network.node_args_to_forward
}
@ -269,7 +267,7 @@ class Network:
)
# If the network is opening, node are trusted without consortium approval
if self.status == ServiceStatus.OPENING and should_wait:
if self.status == ServiceStatus.OPENING and not args.pbft:
try:
node.wait_for_node_to_join()
except TimeoutError:
@ -292,15 +290,13 @@ class Network:
+ getattr(node_status, f" with status {node_status.name}", "")
)
def create_and_add_pending_node(
self, lib_name, host, args, target_node=None, should_wait=True
):
def create_and_add_pending_node(self, lib_name, host, args, target_node=None):
"""
Create a new node and add it to the network. Note that the new node
still needs to be trusted by members to complete the join protocol.
"""
new_node = self.create_node(host)
self._add_node(new_node, lib_name, args, target_node, should_wait)
self._add_node(new_node, lib_name, args, target_node)
primary, _ = self.find_primary()
try:
self._wait_for_node_to_exist_in_store(
@ -322,17 +318,12 @@ class Network:
return new_node
# TODO: should_wait should disappear once nodes can join a network and catch up in PBFT
def create_and_trust_node(
self, lib_name, host, args, target_node=None, should_wait=True
):
def create_and_trust_node(self, lib_name, host, args, target_node=None):
"""
Create a new node, add it to the network and let members vote to trust
it so that it becomes part of the consensus protocol.
"""
new_node = self.create_and_add_pending_node(
lib_name, host, args, target_node, should_wait
)
new_node = self.create_and_add_pending_node(lib_name, host, args, target_node)
if new_node is None:
return None
@ -340,7 +331,7 @@ class Network:
try:
if self.status is ServiceStatus.OPEN:
self.trust_node(primary, new_node.node_id)
if should_wait:
if not args.pbft:
new_node.wait_for_node_to_join()
except (ValueError, TimeoutError):
LOG.error(f"New trusted node {new_node.node_id} failed to join the network")
@ -348,7 +339,7 @@ class Network:
return None
new_node.network_state = NodeNetworkState.joined
if should_wait:
if not args.pbft:
self.wait_for_all_nodes_to_catch_up(primary)
return new_node
@ -556,7 +547,7 @@ class Network:
f"--member-cert={new_member_cert}",
)
def open_network(self, node):
def open_network(self, args, node):
"""
Assuming a network in state OPENING, this functions creates a new
proposal and make members vote to transition the network to state
@ -569,7 +560,8 @@ class Network:
return Calls:call("open_network")
"""
result = self.propose(1, node, script, None, "open_network")
self.vote_using_majority(node, result[1]["id"])
self.vote_using_majority(node, result[1]["id"], not args.pbft)
self.check_for_service(node)
self.status = ServiceStatus.OPEN

Просмотреть файл

@ -21,11 +21,15 @@ logging.getLogger("matplotlib").setLevel(logging.WARNING)
logging.getLogger("paramiko").setLevel(logging.WARNING)
def number_of_local_nodes():
def number_of_local_nodes(args):
"""
On 2-core VMs, we start only one node, but on 4 core, we want to start 2.
If we are using pbft then we need to have 4 nodes. Otherwise with CFT
on 2-core VMs, we start only one node, but on 4 core, we want to start 2.
Not 3, because the client is typically running two threads.
"""
if args.pbft:
return 4
if multiprocessing.cpu_count() > 2:
return 2
else:
@ -97,7 +101,7 @@ def run(build_directory, get_command, args):
hosts = args.nodes
if not hosts:
hosts = ["localhost"] * number_of_local_nodes()
hosts = ["localhost"] * number_of_local_nodes(args)
LOG.info("Starting nodes on {}".format(hosts))
@ -141,13 +145,17 @@ def run(build_directory, get_command, args):
break
time.sleep(1)
tx_rates.get_metrics()
for remote_client in clients:
remote_client.print_and_upload_result(args.label, metrics)
remote_client.stop()
# For now we will not collect metrics with PBFT as the messages that
# can be created when collecting the metrics is too large.
# https://github.com/microsoft/CCF/issues/534
if not args.pbft:
tx_rates.get_metrics()
for remote_client in clients:
remote_client.print_and_upload_result(args.label, metrics)
remote_client.stop()
LOG.info(f"Rates:\n{tx_rates}")
tx_rates.save_results(args.metrics_file)
LOG.info(f"Rates:\n{tx_rates}")
tx_rates.save_results(args.metrics_file)
except Exception:
for remote_client in clients: