From e432c813cf3ecc5faafcf4fc5bffa096c74d3ed7 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Fri, 22 May 2020 14:56:21 +0100 Subject: [PATCH] View seqno rename (#1206) --- doc/users/issue_commands.rst | 15 ++++++--------- samples/perf_client/timing.h | 4 ++-- src/http/http_consts.h | 8 +++++--- src/http/http_rpc_context.h | 4 ++-- src/http/ws_parser.h | 8 ++++---- tests/e2e_logging.py | 6 +++--- tests/election.py | 26 ++++++++++++-------------- tests/infra/ccf.py | 24 ++++++++++++------------ tests/infra/checker.py | 14 +++++++------- tests/infra/clients.py | 31 ++++++++++++++++--------------- tests/infra/member.py | 2 +- tests/receipts.py | 2 +- tests/suspend_nodes.py | 24 ++++++++++++------------ tests/ws_scaffold.py | 2 -- 14 files changed, 83 insertions(+), 87 deletions(-) diff --git a/doc/users/issue_commands.rst b/doc/users/issue_commands.rst index a3711e60c3..ec72f9809b 100644 --- a/doc/users/issue_commands.rst +++ b/doc/users/issue_commands.rst @@ -19,17 +19,15 @@ For example, to record a message at a specific id with the :ref:`developers/exam HTTP/1.1 200 OK content-length: 5 content-type: application/json - x-ccf-commit: 23 - x-ccf-global-commit: 22 - x-ccf-term: 2 + x-ccf-tx-seqno: 23 + x-ccf-tx-view: 2 true The HTTP response some CCF commit information in the headers: -- ``"x-ccf-commit"`` is the unique version at which the request was executed -- ``"x-ccf-global-commit"`` is the latest version agreed on by the network and forever committed to the ledger, at the time the request was executed, as seen by the contacted node -- ``"x-ccf-term"`` indicates the consensus term at which the request was executed +- ``"x-ccf-tx-seqno"`` is the unique version at which the request was executed +- ``"x-ccf-tx-view"`` indicates the consensus view at which the request was executed The response body (the JSON value ``true``) indicates that the request was executed successfully. For many RPCs this will be a JSON object with more details about the execution result. @@ -60,9 +58,8 @@ To guarantee that their request is successfully committed to the ledger, a user HTTP/1.1 200 OK content-length: 23 content-type: application/json - x-ccf-commit: 42 - x-ccf-global-commit: 40 - x-ccf-term: 5 + x-ccf-tx-seqno: 42 + x-ccf-tx-view: 5 {"status":"COMMITTED"} diff --git a/samples/perf_client/timing.h b/samples/perf_client/timing.h index d2071f8fb4..f6767bd0d2 100644 --- a/samples/perf_client/timing.h +++ b/samples/perf_client/timing.h @@ -166,7 +166,7 @@ namespace timing static CommitIDs parse_commit_ids(const RpcTlsClient::Response& response) { const auto& h = response.headers; - const auto local_commit_it = h.find(http::headers::CCF_COMMIT); + const auto local_commit_it = h.find(http::headers::CCF_TX_SEQNO); if (local_commit_it == h.end()) { throw std::logic_error("Missing commit response header"); @@ -178,7 +178,7 @@ namespace timing throw std::logic_error("Missing global commit response header"); } - const auto term_it = h.find(http::headers::CCF_TERM); + const auto term_it = h.find(http::headers::CCF_TX_VIEW); if (term_it == h.end()) { throw std::logic_error("Missing term response header"); diff --git a/src/http/http_consts.h b/src/http/http_consts.h index 3aa3b1fffb..968720777f 100644 --- a/src/http/http_consts.h +++ b/src/http/http_consts.h @@ -15,10 +15,12 @@ namespace http static constexpr auto LOCATION = "location"; static constexpr auto WWW_AUTHENTICATE = "www-authenticate"; - static constexpr auto CCF_COMMIT = "x-ccf-commit"; - static constexpr auto CCF_GLOBAL_COMMIT = "x-ccf-global-commit"; + static constexpr auto CCF_TX_SEQNO = "x-ccf-tx-seqno"; + static constexpr auto CCF_TX_VIEW = "x-ccf-tx-view"; static constexpr auto CCF_READ_ONLY = "x-ccf-read-only"; - static constexpr auto CCF_TERM = "x-ccf-term"; + + // Deprecated, will be removed in a later release + static constexpr auto CCF_GLOBAL_COMMIT = "x-ccf-global-commit"; } namespace headervalues diff --git a/src/http/http_rpc_context.h b/src/http/http_rpc_context.h index 360fb44b1b..7834e31ffc 100644 --- a/src/http/http_rpc_context.h +++ b/src/http/http_rpc_context.h @@ -198,12 +198,12 @@ namespace http virtual void set_commit(kv::Version cv) override { - set_response_header(http::headers::CCF_COMMIT, fmt::format("{}", cv)); + set_response_header(http::headers::CCF_TX_SEQNO, fmt::format("{}", cv)); } virtual void set_term(kv::Consensus::View t) override { - set_response_header(http::headers::CCF_TERM, fmt::format("{}", t)); + set_response_header(http::headers::CCF_TX_VIEW, fmt::format("{}", t)); } virtual void set_global_commit(kv::Version gc) override diff --git a/src/http/ws_parser.h b/src/http/ws_parser.h index 684be7e39b..1e14d2d710 100644 --- a/src/http/ws_parser.h +++ b/src/http/ws_parser.h @@ -14,8 +14,8 @@ namespace ws { static constexpr size_t INITIAL_READ = 2; static constexpr size_t OUT_CCF_HEADER_SIZE = - sizeof(uint16_t) /* return code */ + sizeof(size_t) /* commit */ + - sizeof(size_t) /* term */ + sizeof(size_t) /* global_commit */; + sizeof(uint16_t) /* return code */ + sizeof(size_t) /* seqno */ + + sizeof(size_t) /* view */ + sizeof(size_t) /* global_commit */; static size_t in_header_size(const std::string& path) { @@ -128,8 +128,8 @@ namespace ws proc.handle_response( (http_status)status, - {{http::headers::CCF_COMMIT, fmt::format("{}", commit)}, - {http::headers::CCF_TERM, fmt::format("{}", term)}, + {{http::headers::CCF_TX_SEQNO, fmt::format("{}", commit)}, + {http::headers::CCF_TX_VIEW, fmt::format("{}", term)}, {http::headers::CCF_GLOBAL_COMMIT, fmt::format("{}", global_commit)}}, std::move(body)); diff --git a/tests/e2e_logging.py b/tests/e2e_logging.py index b1c7bad2c2..6d14207de5 100644 --- a/tests/e2e_logging.py +++ b/tests/e2e_logging.py @@ -288,7 +288,7 @@ def test_view_history(network, args): r = c.get("getCommit") check(c) - commit_view = r.term + commit_view = r.view commit_seqno = r.global_commit # Temporarily disable logging of RPCs for readability @@ -390,8 +390,8 @@ def test_tx_statuses(network, args): check(r) # Until this tx is globally committed, poll for the status of this and some other # related transactions around it (and also any historical transactions we're tracking) - target_view = r.term - target_seqno = r.commit + target_view = r.view + target_seqno = r.seqno SentTxs.update_status(target_view, target_seqno) SentTxs.update_status(target_view, target_seqno + 1) SentTxs.update_status(target_view - 1, target_seqno, TxStatus.Invalid) diff --git a/tests/election.py b/tests/election.py index b2c51325f8..957ed96510 100644 --- a/tests/election.py +++ b/tests/election.py @@ -16,15 +16,15 @@ from loguru import logger as LOG # as F > N/2). -def wait_for_index_globally_committed(index, term, nodes): +def wait_for_seqno_to_commit(seqno, view, nodes): """ - Wait for a specific version at a specific term to be committed on all nodes. + Wait for a specific seqno at a specific view to be committed on all nodes. """ for _ in range(infra.ccf.Network.replication_delay * 10): up_to_date_f = [] for f in nodes: with f.node_client() as c: - r = c.get("tx", {"view": term, "seqno": index}) + r = c.get("tx", {"view": view, "seqno": seqno}) assert ( r.status == http.HTTPStatus.OK ), f"tx request returned HTTP status {r.status}" @@ -33,7 +33,7 @@ def wait_for_index_globally_committed(index, term, nodes): up_to_date_f.append(f.node_id) elif status == TxStatus.Invalid: raise RuntimeError( - f"Node {f.node_id} reports transaction ID {term}.{index} is invalid and will never be committed" + f"Node {f.node_id} reports transaction ID {view}.{seqno} is invalid and will never be committed" ) else: pass @@ -56,7 +56,7 @@ def run(args): check = infra.checker.Checker() network.start_and_join(args) - current_term = None + current_view = None # Time before an election completes max_election_duration = ( @@ -74,31 +74,29 @@ def run(args): # Note that for the first iteration, the primary is known in advance anyway LOG.debug("Find freshly elected primary") # After a view change in pbft, finding the new primary takes longer - primary, current_term = network.find_primary( + primary, current_view = network.find_primary( request_timeout=(30 if args.consensus == "pbft" else 3) ) LOG.debug( - "Commit new transactions, primary:{}, current_term:{}".format( - primary.node_id, current_term + "Commit new transactions, primary:{}, current_view:{}".format( + primary.node_id, current_view ) ) with primary.user_client() as c: res = c.rpc( "LOG_record", { - "id": current_term, - "msg": "This log is committed in term {}".format(current_term), + "id": current_view, + "msg": "This log is committed in view {}".format(current_view), }, readonly_hint=None, ) check(res, result=True) - commit_index = res.commit + seqno = res.seqno LOG.debug("Waiting for transaction to be committed by all nodes") - wait_for_index_globally_committed( - commit_index, current_term, network.get_joined_nodes() - ) + wait_for_seqno_to_commit(seqno, current_view, network.get_joined_nodes()) LOG.debug("Stopping primary") primary.stop() diff --git a/tests/infra/ccf.py b/tests/infra/ccf.py index 0768c92f52..0ac9dd2165 100644 --- a/tests/infra/ccf.py +++ b/tests/infra/ccf.py @@ -483,10 +483,10 @@ class Network: def find_primary(self, timeout=3, request_timeout=3): """ Find the identity of the primary in the network and return its identity - and the current term. + and the current view. """ primary_id = None - term = None + view = None end_time = time.time() + timeout while time.time() < end_time: @@ -496,7 +496,7 @@ class Network: res = c.get("getPrimaryInfo") if res.error is None: primary_id = res.result["primary_id"] - term = res.result["current_term"] + view = res.result["current_term"] break else: assert "Primary unknown" in res.error, res.error @@ -508,7 +508,7 @@ class Network: if primary_id is None: raise PrimaryNotFound - return (self._get_node_by_id(primary_id), term) + return (self._get_node_by_id(primary_id), view) def find_backups(self, primary=None, timeout=3): if primary is None: @@ -538,20 +538,20 @@ class Network: while time.time() < end_time: with primary.node_client() as c: resp = c.get("getCommit") - commit_leader = resp.result["commit"] - term_leader = resp.result["term"] - if commit_leader != 0: + seqno = resp.result["commit"] + view = resp.result["term"] + if seqno != 0: break time.sleep(0.1) assert ( - commit_leader != 0 - ), f"Primary {primary.node_id} has not made any progress yet (term: {term_leader}, commit: {commit_leader})" + seqno != 0 + ), f"Primary {primary.node_id} has not made any progress yet (view: {view}, seqno: {seqno})" while time.time() < end_time: caught_up_nodes = [] for node in self.get_joined_nodes(): with node.node_client() as c: - resp = c.get("tx", {"view": term_leader, "seqno": commit_leader},) + resp = c.get("tx", {"view": view, "seqno": seqno}) if resp.error is not None: # Node may not have joined the network yet, try again break @@ -560,7 +560,7 @@ class Network: caught_up_nodes.append(node) elif status == TxStatus.Invalid: raise RuntimeError( - f"Node {node.node_id} reports transaction ID {term_leader}.{commit_leader} is invalid and will never be committed" + f"Node {node.node_id} reports transaction ID {view}.{seqno} is invalid and will never be committed" ) else: pass @@ -583,7 +583,7 @@ class Network: for node in self.get_joined_nodes(): with node.node_client() as c: r = c.get("getCommit") - commits.append(r.commit) + commits.append(r.seqno) if [commits[0]] * len(commits) == commits: break time.sleep(0.1) diff --git a/tests/infra/checker.py b/tests/infra/checker.py index 5d936e6fb3..b822a59a8c 100644 --- a/tests/infra/checker.py +++ b/tests/infra/checker.py @@ -8,11 +8,11 @@ import time from infra.tx_status import TxStatus -def wait_for_global_commit(node_client, commit_index, term, mksign=False, timeout=3): +def wait_for_global_commit(node_client, seqno, view, mksign=False, timeout=3): """ - Given a client to a CCF network and a commit_index/term pair, this function + Given a client to a CCF network and a seqno/view pair, this function waits for this specific commit index to be globally committed by the - network in this term. + network in this view. A TimeoutError exception is raised if the commit index is not globally committed within the given timeout. """ @@ -27,7 +27,7 @@ def wait_for_global_commit(node_client, commit_index, term, mksign=False, timeou end_time = time.time() + timeout while time.time() < end_time: - r = node_client.get("tx", {"view": term, "seqno": commit_index}) + r = node_client.get("tx", {"view": view, "seqno": seqno}) assert ( r.status == http.HTTPStatus.OK ), f"tx request returned HTTP status {r.status}" @@ -36,7 +36,7 @@ def wait_for_global_commit(node_client, commit_index, term, mksign=False, timeou return elif status == TxStatus.Invalid: raise RuntimeError( - f"Transaction ID {term}.{commit_index} is marked invalid and will never be committed" + f"Transaction ID {view}.{seqno} is marked invalid and will never be committed" ) else: time.sleep(0.1) @@ -71,7 +71,7 @@ class Checker: if self.node_client: wait_for_global_commit( - self.node_client, rpc_result.commit, rpc_result.term + self.node_client, rpc_result.seqno, rpc_result.view ) if self.notification_queue: @@ -84,7 +84,7 @@ class Checker: n > self.notified_commit ), f"Received notification of commit {n} after commit {self.notified_commit}" self.notified_commit = n - if n >= rpc_result.commit: + if n >= rpc_result.seqno: return time.sleep(0.5) raise TimeoutError("Timed out waiting for notification") diff --git a/tests/infra/clients.py b/tests/infra/clients.py index 3b6acf5f5a..269136e99e 100644 --- a/tests/infra/clients.py +++ b/tests/infra/clients.py @@ -28,10 +28,11 @@ def truncate(string, max_len=256): return string -CCF_COMMIT_HEADER = "x-ccf-commit" -CCF_TERM_HEADER = "x-ccf-term" -CCF_GLOBAL_COMMIT_HEADER = "x-ccf-global-commit" +CCF_TX_SEQNO_HEADER = "x-ccf-tx-seqno" +CCF_TX_VIEW_HEADER = "x-ccf-tx-view" CCF_READ_ONLY_HEADER = "x-ccf-read-only" +# Deprecated, will be removed +CCF_GLOBAL_COMMIT_HEADER = "x-ccf-global-commit" class Request: @@ -61,19 +62,19 @@ class FakeSocket: class Response: - def __init__(self, status, result, error, commit, term, global_commit): + def __init__(self, status, result, error, seqno, view, global_commit): self.status = status self.result = result self.error = error - self.commit = commit - self.term = term + self.seqno = seqno + self.view = view self.global_commit = global_commit def to_dict(self): d = { - "commit": self.commit, + "seqno": self.seqno, "global_commit": self.global_commit, - "term": self.term, + "view": self.view, } if self.result is not None: d["result"] = self.result @@ -97,8 +98,8 @@ class Response: status=rr.status_code, result=parsed_body if rr.ok else None, error=None if rr.ok else parsed_body, - commit=int_or_none(rr.headers.get(CCF_COMMIT_HEADER)), - term=int_or_none(rr.headers.get(CCF_TERM_HEADER)), + seqno=int_or_none(rr.headers.get(CCF_TX_SEQNO_HEADER)), + view=int_or_none(rr.headers.get(CCF_TX_VIEW_HEADER)), global_commit=int_or_none(rr.headers.get(CCF_GLOBAL_COMMIT_HEADER)), ) @@ -124,8 +125,8 @@ class Response: status=response.status, result=parsed_body if ok else None, error=None if ok else parsed_body, - commit=int_or_none(response.getheader(CCF_COMMIT_HEADER)), - term=int_or_none(response.getheader(CCF_TERM_HEADER)), + seqno=int_or_none(response.getheader(CCF_TX_SEQNO_HEADER)), + view=int_or_none(response.getheader(CCF_TX_VIEW_HEADER)), global_commit=int_or_none(response.getheader(CCF_GLOBAL_COMMIT_HEADER)), ) @@ -404,8 +405,8 @@ class WSClient: self.ws.send_frame(frame) out = self.ws.recv_frame().data (status,) = struct.unpack(" 1 + assert len(view_info) > 1 - LOG.success("----------- terms and primaries recorded -----------") - for term, primary in term_info.items(): - LOG.success(f"term {term} - primary {primary}") + LOG.success("----------- views and primaries recorded -----------") + for view, primary in view_info.items(): + LOG.success(f"view {view} - primary {primary}") if __name__ == "__main__": diff --git a/tests/ws_scaffold.py b/tests/ws_scaffold.py index 2be6119552..ccf9555ef6 100644 --- a/tests/ws_scaffold.py +++ b/tests/ws_scaffold.py @@ -21,14 +21,12 @@ def test(network, args, notifications_queue=None): with primary.user_client(ws=True) as c: for i in range(1, 501): r = c.rpc("LOG_record", {"id": 42, "msg": msg * i}) - print(f"c: {r.commit}, v: {r.term}, gc: {r.global_commit}") assert r.result == True, r.result LOG.info("Write on secondary through forwarding") with other.user_client(ws=True) as c: for i in range(1, 501): r = c.rpc("LOG_record", {"id": 42, "msg": msg * i}) - print(f"c: {r.commit}, v: {r.term}, gc: {r.global_commit}") assert r.result == True, r.result return network