This commit is contained in:
Amaury Chamayou 2020-05-22 14:56:21 +01:00 коммит произвёл GitHub
Родитель 48768a90de
Коммит e432c813cf
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 83 добавлений и 87 удалений

Просмотреть файл

@ -19,17 +19,15 @@ For example, to record a message at a specific id with the :ref:`developers/exam
HTTP/1.1 200 OK
content-length: 5
content-type: application/json
x-ccf-commit: 23
x-ccf-global-commit: 22
x-ccf-term: 2
x-ccf-tx-seqno: 23
x-ccf-tx-view: 2
true
The HTTP response some CCF commit information in the headers:
- ``"x-ccf-commit"`` is the unique version at which the request was executed
- ``"x-ccf-global-commit"`` is the latest version agreed on by the network and forever committed to the ledger, at the time the request was executed, as seen by the contacted node
- ``"x-ccf-term"`` indicates the consensus term at which the request was executed
- ``"x-ccf-tx-seqno"`` is the unique version at which the request was executed
- ``"x-ccf-tx-view"`` indicates the consensus view at which the request was executed
The response body (the JSON value ``true``) indicates that the request was executed successfully. For many RPCs this will be a JSON object with more details about the execution result.
@ -60,9 +58,8 @@ To guarantee that their request is successfully committed to the ledger, a user
HTTP/1.1 200 OK
content-length: 23
content-type: application/json
x-ccf-commit: 42
x-ccf-global-commit: 40
x-ccf-term: 5
x-ccf-tx-seqno: 42
x-ccf-tx-view: 5
{"status":"COMMITTED"}

Просмотреть файл

@ -166,7 +166,7 @@ namespace timing
static CommitIDs parse_commit_ids(const RpcTlsClient::Response& response)
{
const auto& h = response.headers;
const auto local_commit_it = h.find(http::headers::CCF_COMMIT);
const auto local_commit_it = h.find(http::headers::CCF_TX_SEQNO);
if (local_commit_it == h.end())
{
throw std::logic_error("Missing commit response header");
@ -178,7 +178,7 @@ namespace timing
throw std::logic_error("Missing global commit response header");
}
const auto term_it = h.find(http::headers::CCF_TERM);
const auto term_it = h.find(http::headers::CCF_TX_VIEW);
if (term_it == h.end())
{
throw std::logic_error("Missing term response header");

Просмотреть файл

@ -15,10 +15,12 @@ namespace http
static constexpr auto LOCATION = "location";
static constexpr auto WWW_AUTHENTICATE = "www-authenticate";
static constexpr auto CCF_COMMIT = "x-ccf-commit";
static constexpr auto CCF_GLOBAL_COMMIT = "x-ccf-global-commit";
static constexpr auto CCF_TX_SEQNO = "x-ccf-tx-seqno";
static constexpr auto CCF_TX_VIEW = "x-ccf-tx-view";
static constexpr auto CCF_READ_ONLY = "x-ccf-read-only";
static constexpr auto CCF_TERM = "x-ccf-term";
// Deprecated, will be removed in a later release
static constexpr auto CCF_GLOBAL_COMMIT = "x-ccf-global-commit";
}
namespace headervalues

Просмотреть файл

@ -198,12 +198,12 @@ namespace http
virtual void set_commit(kv::Version cv) override
{
set_response_header(http::headers::CCF_COMMIT, fmt::format("{}", cv));
set_response_header(http::headers::CCF_TX_SEQNO, fmt::format("{}", cv));
}
virtual void set_term(kv::Consensus::View t) override
{
set_response_header(http::headers::CCF_TERM, fmt::format("{}", t));
set_response_header(http::headers::CCF_TX_VIEW, fmt::format("{}", t));
}
virtual void set_global_commit(kv::Version gc) override

Просмотреть файл

@ -14,8 +14,8 @@ namespace ws
{
static constexpr size_t INITIAL_READ = 2;
static constexpr size_t OUT_CCF_HEADER_SIZE =
sizeof(uint16_t) /* return code */ + sizeof(size_t) /* commit */ +
sizeof(size_t) /* term */ + sizeof(size_t) /* global_commit */;
sizeof(uint16_t) /* return code */ + sizeof(size_t) /* seqno */ +
sizeof(size_t) /* view */ + sizeof(size_t) /* global_commit */;
static size_t in_header_size(const std::string& path)
{
@ -128,8 +128,8 @@ namespace ws
proc.handle_response(
(http_status)status,
{{http::headers::CCF_COMMIT, fmt::format("{}", commit)},
{http::headers::CCF_TERM, fmt::format("{}", term)},
{{http::headers::CCF_TX_SEQNO, fmt::format("{}", commit)},
{http::headers::CCF_TX_VIEW, fmt::format("{}", term)},
{http::headers::CCF_GLOBAL_COMMIT,
fmt::format("{}", global_commit)}},
std::move(body));

Просмотреть файл

@ -288,7 +288,7 @@ def test_view_history(network, args):
r = c.get("getCommit")
check(c)
commit_view = r.term
commit_view = r.view
commit_seqno = r.global_commit
# Temporarily disable logging of RPCs for readability
@ -390,8 +390,8 @@ def test_tx_statuses(network, args):
check(r)
# Until this tx is globally committed, poll for the status of this and some other
# related transactions around it (and also any historical transactions we're tracking)
target_view = r.term
target_seqno = r.commit
target_view = r.view
target_seqno = r.seqno
SentTxs.update_status(target_view, target_seqno)
SentTxs.update_status(target_view, target_seqno + 1)
SentTxs.update_status(target_view - 1, target_seqno, TxStatus.Invalid)

Просмотреть файл

@ -16,15 +16,15 @@ from loguru import logger as LOG
# as F > N/2).
def wait_for_index_globally_committed(index, term, nodes):
def wait_for_seqno_to_commit(seqno, view, nodes):
"""
Wait for a specific version at a specific term to be committed on all nodes.
Wait for a specific seqno at a specific view to be committed on all nodes.
"""
for _ in range(infra.ccf.Network.replication_delay * 10):
up_to_date_f = []
for f in nodes:
with f.node_client() as c:
r = c.get("tx", {"view": term, "seqno": index})
r = c.get("tx", {"view": view, "seqno": seqno})
assert (
r.status == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status}"
@ -33,7 +33,7 @@ def wait_for_index_globally_committed(index, term, nodes):
up_to_date_f.append(f.node_id)
elif status == TxStatus.Invalid:
raise RuntimeError(
f"Node {f.node_id} reports transaction ID {term}.{index} is invalid and will never be committed"
f"Node {f.node_id} reports transaction ID {view}.{seqno} is invalid and will never be committed"
)
else:
pass
@ -56,7 +56,7 @@ def run(args):
check = infra.checker.Checker()
network.start_and_join(args)
current_term = None
current_view = None
# Time before an election completes
max_election_duration = (
@ -74,31 +74,29 @@ def run(args):
# Note that for the first iteration, the primary is known in advance anyway
LOG.debug("Find freshly elected primary")
# After a view change in pbft, finding the new primary takes longer
primary, current_term = network.find_primary(
primary, current_view = network.find_primary(
request_timeout=(30 if args.consensus == "pbft" else 3)
)
LOG.debug(
"Commit new transactions, primary:{}, current_term:{}".format(
primary.node_id, current_term
"Commit new transactions, primary:{}, current_view:{}".format(
primary.node_id, current_view
)
)
with primary.user_client() as c:
res = c.rpc(
"LOG_record",
{
"id": current_term,
"msg": "This log is committed in term {}".format(current_term),
"id": current_view,
"msg": "This log is committed in view {}".format(current_view),
},
readonly_hint=None,
)
check(res, result=True)
commit_index = res.commit
seqno = res.seqno
LOG.debug("Waiting for transaction to be committed by all nodes")
wait_for_index_globally_committed(
commit_index, current_term, network.get_joined_nodes()
)
wait_for_seqno_to_commit(seqno, current_view, network.get_joined_nodes())
LOG.debug("Stopping primary")
primary.stop()

Просмотреть файл

@ -483,10 +483,10 @@ class Network:
def find_primary(self, timeout=3, request_timeout=3):
"""
Find the identity of the primary in the network and return its identity
and the current term.
and the current view.
"""
primary_id = None
term = None
view = None
end_time = time.time() + timeout
while time.time() < end_time:
@ -496,7 +496,7 @@ class Network:
res = c.get("getPrimaryInfo")
if res.error is None:
primary_id = res.result["primary_id"]
term = res.result["current_term"]
view = res.result["current_term"]
break
else:
assert "Primary unknown" in res.error, res.error
@ -508,7 +508,7 @@ class Network:
if primary_id is None:
raise PrimaryNotFound
return (self._get_node_by_id(primary_id), term)
return (self._get_node_by_id(primary_id), view)
def find_backups(self, primary=None, timeout=3):
if primary is None:
@ -538,20 +538,20 @@ class Network:
while time.time() < end_time:
with primary.node_client() as c:
resp = c.get("getCommit")
commit_leader = resp.result["commit"]
term_leader = resp.result["term"]
if commit_leader != 0:
seqno = resp.result["commit"]
view = resp.result["term"]
if seqno != 0:
break
time.sleep(0.1)
assert (
commit_leader != 0
), f"Primary {primary.node_id} has not made any progress yet (term: {term_leader}, commit: {commit_leader})"
seqno != 0
), f"Primary {primary.node_id} has not made any progress yet (view: {view}, seqno: {seqno})"
while time.time() < end_time:
caught_up_nodes = []
for node in self.get_joined_nodes():
with node.node_client() as c:
resp = c.get("tx", {"view": term_leader, "seqno": commit_leader},)
resp = c.get("tx", {"view": view, "seqno": seqno})
if resp.error is not None:
# Node may not have joined the network yet, try again
break
@ -560,7 +560,7 @@ class Network:
caught_up_nodes.append(node)
elif status == TxStatus.Invalid:
raise RuntimeError(
f"Node {node.node_id} reports transaction ID {term_leader}.{commit_leader} is invalid and will never be committed"
f"Node {node.node_id} reports transaction ID {view}.{seqno} is invalid and will never be committed"
)
else:
pass
@ -583,7 +583,7 @@ class Network:
for node in self.get_joined_nodes():
with node.node_client() as c:
r = c.get("getCommit")
commits.append(r.commit)
commits.append(r.seqno)
if [commits[0]] * len(commits) == commits:
break
time.sleep(0.1)

Просмотреть файл

@ -8,11 +8,11 @@ import time
from infra.tx_status import TxStatus
def wait_for_global_commit(node_client, commit_index, term, mksign=False, timeout=3):
def wait_for_global_commit(node_client, seqno, view, mksign=False, timeout=3):
"""
Given a client to a CCF network and a commit_index/term pair, this function
Given a client to a CCF network and a seqno/view pair, this function
waits for this specific commit index to be globally committed by the
network in this term.
network in this view.
A TimeoutError exception is raised if the commit index is not globally
committed within the given timeout.
"""
@ -27,7 +27,7 @@ def wait_for_global_commit(node_client, commit_index, term, mksign=False, timeou
end_time = time.time() + timeout
while time.time() < end_time:
r = node_client.get("tx", {"view": term, "seqno": commit_index})
r = node_client.get("tx", {"view": view, "seqno": seqno})
assert (
r.status == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status}"
@ -36,7 +36,7 @@ def wait_for_global_commit(node_client, commit_index, term, mksign=False, timeou
return
elif status == TxStatus.Invalid:
raise RuntimeError(
f"Transaction ID {term}.{commit_index} is marked invalid and will never be committed"
f"Transaction ID {view}.{seqno} is marked invalid and will never be committed"
)
else:
time.sleep(0.1)
@ -71,7 +71,7 @@ class Checker:
if self.node_client:
wait_for_global_commit(
self.node_client, rpc_result.commit, rpc_result.term
self.node_client, rpc_result.seqno, rpc_result.view
)
if self.notification_queue:
@ -84,7 +84,7 @@ class Checker:
n > self.notified_commit
), f"Received notification of commit {n} after commit {self.notified_commit}"
self.notified_commit = n
if n >= rpc_result.commit:
if n >= rpc_result.seqno:
return
time.sleep(0.5)
raise TimeoutError("Timed out waiting for notification")

Просмотреть файл

@ -28,10 +28,11 @@ def truncate(string, max_len=256):
return string
CCF_COMMIT_HEADER = "x-ccf-commit"
CCF_TERM_HEADER = "x-ccf-term"
CCF_GLOBAL_COMMIT_HEADER = "x-ccf-global-commit"
CCF_TX_SEQNO_HEADER = "x-ccf-tx-seqno"
CCF_TX_VIEW_HEADER = "x-ccf-tx-view"
CCF_READ_ONLY_HEADER = "x-ccf-read-only"
# Deprecated, will be removed
CCF_GLOBAL_COMMIT_HEADER = "x-ccf-global-commit"
class Request:
@ -61,19 +62,19 @@ class FakeSocket:
class Response:
def __init__(self, status, result, error, commit, term, global_commit):
def __init__(self, status, result, error, seqno, view, global_commit):
self.status = status
self.result = result
self.error = error
self.commit = commit
self.term = term
self.seqno = seqno
self.view = view
self.global_commit = global_commit
def to_dict(self):
d = {
"commit": self.commit,
"seqno": self.seqno,
"global_commit": self.global_commit,
"term": self.term,
"view": self.view,
}
if self.result is not None:
d["result"] = self.result
@ -97,8 +98,8 @@ class Response:
status=rr.status_code,
result=parsed_body if rr.ok else None,
error=None if rr.ok else parsed_body,
commit=int_or_none(rr.headers.get(CCF_COMMIT_HEADER)),
term=int_or_none(rr.headers.get(CCF_TERM_HEADER)),
seqno=int_or_none(rr.headers.get(CCF_TX_SEQNO_HEADER)),
view=int_or_none(rr.headers.get(CCF_TX_VIEW_HEADER)),
global_commit=int_or_none(rr.headers.get(CCF_GLOBAL_COMMIT_HEADER)),
)
@ -124,8 +125,8 @@ class Response:
status=response.status,
result=parsed_body if ok else None,
error=None if ok else parsed_body,
commit=int_or_none(response.getheader(CCF_COMMIT_HEADER)),
term=int_or_none(response.getheader(CCF_TERM_HEADER)),
seqno=int_or_none(response.getheader(CCF_TX_SEQNO_HEADER)),
view=int_or_none(response.getheader(CCF_TX_VIEW_HEADER)),
global_commit=int_or_none(response.getheader(CCF_GLOBAL_COMMIT_HEADER)),
)
@ -404,8 +405,8 @@ class WSClient:
self.ws.send_frame(frame)
out = self.ws.recv_frame().data
(status,) = struct.unpack("<h", out[:2])
(commit,) = struct.unpack("<Q", out[2:10])
(term,) = struct.unpack("<Q", out[10:18])
(seqno,) = struct.unpack("<Q", out[2:10])
(view,) = struct.unpack("<Q", out[10:18])
(global_commit,) = struct.unpack("<Q", out[18:26])
payload = out[26:]
if status == 200:
@ -414,7 +415,7 @@ class WSClient:
else:
result = None
error = payload.decode()
return Response(status, result, error, commit, term, global_commit)
return Response(status, result, error, seqno, view, global_commit)
class CCFClient:

Просмотреть файл

@ -111,7 +111,7 @@ class Member:
and wait_for_global_commit
):
with remote_node.node_client() as mc:
infra.checker.wait_for_global_commit(mc, r.commit, r.term, True)
infra.checker.wait_for_global_commit(mc, r.seqno, r.view, True)
return r

Просмотреть файл

@ -27,7 +27,7 @@ def test(network, args, notifications_queue=None):
check_commit(c.rpc("LOG_record", {"id": 42, "msg": msg}), result=True)
r = c.get("LOG_get", {"id": 42})
check(r, result={"msg": msg})
r = c.get("getReceipt", {"commit": r.commit})
r = c.get("getReceipt", {"commit": r.seqno})
check(
c.rpc("verifyReceipt", {"receipt": r.result["receipt"]}),
result={"valid": True},

Просмотреть файл

@ -32,10 +32,10 @@ def timeout_handler(node, suspend, election_timeout):
node.resume()
def update_term_info(network, term_info):
def update_view_info(network, view_info):
try:
cur_primary, cur_term = network.find_primary()
term_info[cur_term] = cur_primary.node_id
cur_primary, cur_view = network.find_primary()
view_info[cur_view] = cur_primary.node_id
except TimeoutError:
LOG.warning("Trying to access a suspended network")
@ -43,7 +43,7 @@ def update_term_info(network, term_info):
def get_node_local_commit(node):
with node.node_client() as c:
r = c.get("debug/getLocalCommit")
return r.commit, r.global_commit
return r.seqno, r.global_commit
def wait_for_late_joiner(old_node, late_joiner, timeout=30):
@ -110,11 +110,11 @@ def run(args):
) as network:
network.start_and_join(args)
original_nodes = network.get_joined_nodes()
term_info = {}
update_term_info(network, term_info)
view_info = {}
update_view_info(network, view_info)
app.test_run_txs(network=network, args=args, num_txs=TOTAL_REQUESTS)
update_term_info(network, term_info)
update_view_info(network, view_info)
nodes_to_kill = [network.find_any_backup()]
nodes_to_keep = [n for n in original_nodes if n not in nodes_to_kill]
@ -162,18 +162,18 @@ def run(args):
ignore_failures=True,
)
update_term_info(network, term_info)
update_view_info(network, view_info)
# check nodes have resumed normal execution before shutting down
app.test_run_txs(network=network, args=args, num_txs=len(nodes_to_keep))
# we have asserted that all nodes are caught up
# assert that view changes actually did occur
assert len(term_info) > 1
assert len(view_info) > 1
LOG.success("----------- terms and primaries recorded -----------")
for term, primary in term_info.items():
LOG.success(f"term {term} - primary {primary}")
LOG.success("----------- views and primaries recorded -----------")
for view, primary in view_info.items():
LOG.success(f"view {view} - primary {primary}")
if __name__ == "__main__":

Просмотреть файл

@ -21,14 +21,12 @@ def test(network, args, notifications_queue=None):
with primary.user_client(ws=True) as c:
for i in range(1, 501):
r = c.rpc("LOG_record", {"id": 42, "msg": msg * i})
print(f"c: {r.commit}, v: {r.term}, gc: {r.global_commit}")
assert r.result == True, r.result
LOG.info("Write on secondary through forwarding")
with other.user_client(ws=True) as c:
for i in range(1, 501):
r = c.rpc("LOG_record", {"id": 42, "msg": msg * i})
print(f"c: {r.commit}, v: {r.term}, gc: {r.global_commit}")
assert r.result == True, r.result
return network