Do not try to parse uncommitted chunks by default (#2499)

This commit is contained in:
Amaury Chamayou 2021-04-27 19:57:46 +01:00 коммит произвёл GitHub
Родитель e7aa0ebead
Коммит 13fea52eee
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 130 добавлений и 100 удалений

Просмотреть файл

@ -539,13 +539,26 @@ class Ledger:
# Initialize LedgerValidator instance which will be passed to LedgerChunks.
self._ledger_validator = LedgerValidator()
def __init__(self, directories: List[str]):
@classmethod
def _range_from_filename(cls, filename: str) -> Tuple[str, str]:
elements = (
os.path.basename(filename)
.replace(".committed", "")
.replace("ledger_", "")
.split("-")
)
assert len(elements) == 2
return (elements[0], elements[1])
def __init__(self, directories: List[str], committed_only: bool = True):
self._filenames = []
ledger_files = []
for directory in directories:
for path in os.listdir(directory):
if committed_only and not path.endswith(".committed"):
continue
chunk = os.path.join(directory, path)
if os.path.isfile(chunk):
ledger_files.append(chunk)
@ -554,16 +567,17 @@ class Ledger:
# the ledger is verified in sequence
self._filenames = sorted(
ledger_files,
key=lambda x: int(
os.path.basename(x)
.replace(".committed", "")
.replace("ledger_", "")
.split("-")[0]
),
key=lambda x: int(Ledger._range_from_filename(x)[0]),
)
self._reset_iterators()
@property
def last_committed_chunk_range(self) -> Tuple[int, int]:
last_chunk_name = self._filenames[-1]
start, end = Ledger._range_from_filename(last_chunk_name)
return (int(start), int(end))
def __next__(self) -> LedgerChunk:
self._fileindex += 1
if len(self._filenames) > self._fileindex:

Просмотреть файл

@ -61,7 +61,7 @@ def test_cert_store(network, args):
)
stored_cert = json.loads(
primary.get_ledger_public_state_at(set_proposal.completed_seqno)[
network.get_ledger_public_state_at(set_proposal.completed_seqno)[
"public:ccf.gov.tls.ca_cert_bundles"
][raw_cert_name]
)
@ -74,7 +74,7 @@ def test_cert_store(network, args):
remove_proposal = network.consortium.remove_ca_cert_bundle(primary, cert_name)
assert (
primary.get_ledger_public_state_at(remove_proposal.completed_seqno)[
network.get_ledger_public_state_at(remove_proposal.completed_seqno)[
"public:ccf.gov.tls.ca_cert_bundles"
][raw_cert_name]
== None

Просмотреть файл

@ -113,9 +113,7 @@ def test_no_quote(network, args):
@reqs.description("Check member data")
def test_member_data(network, args):
assert args.initial_operator_count > 0
primary, _ = network.find_nodes()
latest_public_tables, _ = primary.get_latest_ledger_public_state()
latest_public_tables, _ = network.get_latest_ledger_public_state()
members_info = latest_public_tables["public:ccf.gov.members.info"]
md_count = 0
@ -155,7 +153,7 @@ def test_service_principals(network, args):
principal_id = "0xdeadbeef"
# Initially, there is nothing in this table
latest_public_tables, _ = node.get_latest_ledger_public_state()
latest_public_tables, _ = network.get_latest_ledger_public_state()
assert "public:ccf.gov.service_principals" not in latest_public_tables
# Create and accept a proposal which populates an entry in this table
@ -173,7 +171,7 @@ def test_service_principals(network, args):
network.consortium.vote_using_majority(node, proposal, ballot)
# Confirm it can be read
latest_public_tables, _ = node.get_latest_ledger_public_state()
latest_public_tables, _ = network.get_latest_ledger_public_state()
assert (
json.loads(
latest_public_tables["public:ccf.gov.service_principals"][
@ -191,7 +189,7 @@ def test_service_principals(network, args):
network.consortium.vote_using_majority(node, proposal, ballot)
# Confirm it is gone
latest_public_tables, _ = node.get_latest_ledger_public_state()
latest_public_tables, _ = network.get_latest_ledger_public_state()
assert (
principal_id.encode()
not in latest_public_tables["public:ccf.gov.service_principals"]

Просмотреть файл

@ -15,14 +15,10 @@ from loguru import logger as LOG
import suite.test_requirements as reqs
def count_governance_operations(ledger):
def check_operations(ledger, operations):
LOG.debug("Audit the ledger file for governance operations")
members = {}
verified_votes = 0
verified_proposals = 0
verified_withdrawals = 0
for chunk in ledger:
for tr in chunk:
tables = tr.get_public_domain().get_tables()
@ -35,7 +31,6 @@ def count_governance_operations(ledger):
governance_history_table = tables["public:ccf.gov.history"]
for member_id, signed_request in governance_history_table.items():
assert member_id in members
signed_request = json.loads(signed_request)
cert = members[member_id]
@ -50,14 +45,19 @@ def count_governance_operations(ledger):
request_target_line = req.decode().splitlines()[0]
if "/gov/proposals" in request_target_line:
vote_suffix = "/ballots"
elements = request_target_line.split("/")
if request_target_line.endswith(vote_suffix):
verified_votes += 1
op = (elements[-2], member_id.decode(), "vote")
elif request_target_line.endswith("/withdraw"):
verified_withdrawals += 1
op = (elements[-2], member_id.decode(), "withdraw")
else:
verified_proposals += 1
(proposal_id,) = tables["public:ccf.gov.proposals"].keys()
op = (proposal_id.decode(), member_id.decode(), "propose")
return (verified_proposals, verified_votes, verified_withdrawals)
if op in operations:
operations.remove(op)
assert operations == set(), operations
def check_all_tables_are_documented(ledger, doc_path):
@ -100,10 +100,8 @@ def test_ledger_is_readable(network, args):
def run(args):
# Keep track of how many propose, vote and withdraw are issued in this test
proposals_issued = 0
votes_issued = 0
withdrawals_issued = 0
# Keep track of governance operations that happened in the test
governance_operations = set()
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
@ -112,14 +110,6 @@ def run(args):
primary, _ = network.find_primary()
ledger_directories = primary.remote.ledger_paths()
ledger = ccf.ledger.Ledger(ledger_directories)
(
original_proposals,
original_votes,
original_withdrawals,
) = count_governance_operations(ledger)
LOG.info("Add new member proposal (implicit vote)")
(
new_member_proposal,
@ -128,20 +118,31 @@ def run(args):
) = network.consortium.generate_and_propose_new_member(
primary, curve=infra.network.EllipticCurve.secp256r1
)
proposals_issued += 1
member = network.consortium.get_member_by_local_id(
new_member_proposal.proposer_id
)
governance_operations.add(
(new_member_proposal.proposal_id, member.service_id, "propose")
)
LOG.info("2/3 members accept the proposal")
p = network.consortium.vote_using_majority(
primary, new_member_proposal, careful_vote
)
votes_issued += p.votes_for
for voter in p.voters:
governance_operations.add((p.proposal_id, voter, "vote"))
assert new_member_proposal.state == infra.proposal.ProposalState.ACCEPTED
LOG.info("Create new proposal but withdraw it before it is accepted")
new_member_proposal, _, _ = network.consortium.generate_and_propose_new_member(
primary, curve=infra.network.EllipticCurve.secp256r1
)
proposals_issued += 1
member = network.consortium.get_member_by_local_id(
new_member_proposal.proposer_id
)
governance_operations.add(
(new_member_proposal.proposal_id, member.service_id, "propose")
)
with primary.client() as c:
response = network.consortium.get_member_by_local_id(
@ -150,30 +151,21 @@ def run(args):
infra.checker.Checker(c)(response)
assert response.status_code == http.HTTPStatus.OK.value
assert response.body.json()["state"] == ProposalState.WITHDRAWN.value
withdrawals_issued += 1
member = network.consortium.get_member_by_local_id(
new_member_proposal.proposer_id
)
governance_operations.add(
(new_member_proposal.proposal_id, member.service_id, "withdraw")
)
# Force ledger flush of all transactions so far
network.get_latest_ledger_public_state()
ledger = ccf.ledger.Ledger(ledger_directories)
check_operations(ledger, governance_operations)
test_ledger_is_readable(network, args)
test_tables_doc(network, args)
# Refresh ledger to beginning
ledger = ccf.ledger.Ledger(ledger_directories)
(
final_proposals,
final_votes,
final_withdrawals,
) = count_governance_operations(ledger)
assert (
final_proposals == original_proposals + proposals_issued
), f"Unexpected number of propose operations recorded in the ledger (expected {original_proposals + proposals_issued}, found {final_proposals})"
assert (
final_votes == original_votes + votes_issued
), f"Unexpected number of vote operations recorded in the ledger (expected {original_votes + votes_issued}, found {final_votes})"
assert (
final_withdrawals == original_withdrawals + withdrawals_issued
), f"Unexpected number of withdraw operations recorded in the ledger (expected {original_withdrawals + withdrawals_issued}, found {final_withdrawals})"
if __name__ == "__main__":
args = infra.e2e_args.cli_args()

Просмотреть файл

@ -246,7 +246,7 @@ class Consortium:
proposal.state = infra.proposal.ProposalState(
response.body.json()["state"]
)
proposal.increment_votes_for()
proposal.increment_votes_for(member.service_id)
# Wait for proposal completion to be committed, even if no votes are issued
if wait_for_global_commit:
@ -347,6 +347,18 @@ class Consortium:
proposal = self.get_any_active_member().propose(remote_node, proposal)
return self.vote_using_majority(remote_node, proposal, careful_vote)
def create_and_withdraw_large_proposal(self, remote_node):
"""
This is useful to force a ledger chunk to be produced, which is desirable
when trying to use ccf.ledger to read ledger entries.
"""
proposal, _ = self.make_proposal(
"set_user", self.user_cert_path("user0"), {"padding": " " * 4096 * 5}
)
m = self.get_any_active_member()
p = m.propose(remote_node, proposal)
m.withdraw(remote_node, p)
def add_users(self, remote_node, users):
for u in users:
self.add_user(remote_node, u)

Просмотреть файл

@ -938,6 +938,34 @@ class Network:
return node.get_committed_snapshots(wait_for_snapshots_to_be_committed)
def _get_ledger_public_view_at(self, node, call, seqno, timeout=3):
end_time = time.time() + timeout
while time.time() < end_time:
try:
return call(seqno)
except Exception:
self.consortium.create_and_withdraw_large_proposal(node)
raise TimeoutError(
f"Could not read transaction at seqno {seqno} from ledger {node.remote.ledger_paths()}"
)
def get_ledger_public_state_at(self, seqno, timeout=3):
primary, _ = self.find_primary()
return self._get_ledger_public_view_at(
primary, primary.get_ledger_public_tables_at, seqno, timeout
)
def get_latest_ledger_public_state(self, timeout=3):
primary, _ = self.find_primary()
with primary.client() as nc:
resp = nc.get("/node/commit")
body = resp.body.json()
tx_id = TxID.from_str(body["transaction_id"])
return self._get_ledger_public_view_at(
primary, primary.get_ledger_public_state_at, tx_id.seqno, timeout
)
@contextmanager
def network(

Просмотреть файл

@ -12,7 +12,6 @@ import ccf.ledger
import os
import socket
import re
import time
from loguru import logger as LOG
@ -312,32 +311,16 @@ class Node:
f"Node {self.local_node_id} failed to join the network"
) from e
def get_ledger_public_state_at(self, seqno, timeout=3):
end_time = time.time() + timeout
while time.time() < end_time:
try:
ledger = ccf.ledger.Ledger(self.remote.ledger_paths())
tx = ledger.get_transaction(seqno)
return tx.get_public_domain().get_tables()
except Exception:
time.sleep(0.1)
def get_ledger_public_tables_at(self, seqno):
ledger = ccf.ledger.Ledger(self.remote.ledger_paths())
assert ledger.last_committed_chunk_range[1] >= seqno
tx = ledger.get_transaction(seqno)
return tx.get_public_domain().get_tables()
raise TimeoutError(
f"Could not read transaction at seqno {seqno} from ledger {self.remote.ledger_paths()}"
)
def get_latest_ledger_public_state(self, timeout=3):
end_time = time.time() + timeout
while time.time() < end_time:
try:
ledger = ccf.ledger.Ledger(self.remote.ledger_paths())
return ledger.get_latest_public_state()
except Exception:
time.sleep(0.1)
raise TimeoutError(
f"Could not read latest state from ledger {self.remote.ledger_paths()}"
)
def get_ledger_public_state_at(self, seqno):
ledger = ccf.ledger.Ledger(self.remote.ledger_paths())
assert ledger.last_committed_chunk_range[1] >= seqno
return ledger.get_latest_public_state()
def get_ledger(self, include_read_only_dirs=False):
"""

Просмотреть файл

@ -37,7 +37,8 @@ class Proposal:
self.proposer_id = proposer_id
self.proposal_id = proposal_id
self.state = state
self.votes_for = 0
self.voters = []
self.view = view
self.seqno = seqno
@ -48,5 +49,9 @@ class Proposal:
self.completed_seqno = seqno
self.completed_view = view
def increment_votes_for(self):
self.votes_for += 1
def increment_votes_for(self, member_id):
self.voters.append(member_id)
@property
def votes_for(self):
return len(self.voters)

Просмотреть файл

@ -70,7 +70,7 @@ def test_app_bundle(network, args):
assert (
raw_module_name
in primary.get_ledger_public_state_at(set_js_proposal.completed_seqno)[
in network.get_ledger_public_state_at(set_js_proposal.completed_seqno)[
"public:ccf.gov.modules"
]
), "Module was not added"
@ -101,7 +101,7 @@ def test_app_bundle(network, args):
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
assert (
primary.get_ledger_public_state_at(remove_js_proposal.completed_seqno)[
network.get_ledger_public_state_at(remove_js_proposal.completed_seqno)[
"public:ccf.gov.modules"
][raw_module_name]
is None
@ -323,7 +323,7 @@ def test_npm_app(network, args):
r = c.post("/app/rpc/apply_writes")
assert r.status_code == http.HTTPStatus.BAD_REQUEST, r.status_code
val = primary.get_ledger_public_state_at(r.seqno)["public:apply_writes"][
val = network.get_ledger_public_state_at(r.seqno)["public:apply_writes"][
"foo".encode()
]
assert val == b"bar", val

Просмотреть файл

@ -81,7 +81,7 @@ def test_jwt_without_key_policy(network, args):
primary, issuer, jwks_fp.name
)
stored_jwt_signing_key = primary.get_ledger_public_state_at(
stored_jwt_signing_key = network.get_ledger_public_state_at(
set_jwt_proposal.completed_seqno
)["public:ccf.gov.jwt.public_signing_keys"][raw_kid]
@ -94,7 +94,7 @@ def test_jwt_without_key_policy(network, args):
remove_jwt_proposal = network.consortium.remove_jwt_issuer(primary, issuer)
assert (
primary.get_ledger_public_state_at(remove_jwt_proposal.completed_seqno)[
network.get_ledger_public_state_at(remove_jwt_proposal.completed_seqno)[
"public:ccf.gov.jwt.public_signing_keys"
][raw_kid]
is None
@ -106,7 +106,7 @@ def test_jwt_without_key_policy(network, args):
metadata_fp.flush()
set_jwt_issuer = network.consortium.set_jwt_issuer(primary, metadata_fp.name)
stored_jwt_signing_key = primary.get_ledger_public_state_at(
stored_jwt_signing_key = network.get_ledger_public_state_at(
set_jwt_issuer.completed_seqno
)["public:ccf.gov.jwt.public_signing_keys"][raw_kid]
@ -230,7 +230,7 @@ def test_jwt_with_sgx_key_filter(network, args):
primary, issuer, jwks_fp.name
)
stored_jwt_signing_keys = primary.get_ledger_public_state_at(
stored_jwt_signing_keys = network.get_ledger_public_state_at(
set_jwt_proposal.completed_seqno
)["public:ccf.gov.jwt.public_signing_keys"]
@ -296,9 +296,7 @@ class OpenIDProviderServer(AbstractContextManager):
def check_kv_jwt_key_matches(network, kid, cert_pem):
primary, _ = network.find_nodes()
latest_public_state, _ = primary.get_latest_ledger_public_state()
latest_public_state, _ = network.get_latest_ledger_public_state()
latest_jwt_signing_key = latest_public_state[
"public:ccf.gov.jwt.public_signing_keys"
]