Proposing more compact logging in tests (#1610)

This commit is contained in:
Amaury Chamayou 2020-09-17 16:56:53 +01:00 коммит произвёл GitHub
Родитель 85e2b06e7f
Коммит c1c038bdd9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 88 добавлений и 25 удалений

Просмотреть файл

@ -16,7 +16,7 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
import struct
import base64
from typing import Union, Optional
from typing import Union, Optional, List
import requests
from loguru import logger as LOG # type: ignore
@ -24,9 +24,10 @@ from requests_http_signature import HTTPSignatureAuth # type: ignore
import websocket # type: ignore
import ccf.commit
from ccf.log_capture import flush_info
def truncate(string: str, max_len: int = 256):
def truncate(string: str, max_len: int = 128):
if len(string) > max_len:
return f"{string[: max_len]} + {len(string) - max_len} chars"
else:
@ -59,11 +60,11 @@ class Request:
headers: dict
def __str__(self):
string = f"{self.http_verb} {self.path}"
string = f"<cyan>{self.http_verb}</> <green>{self.path}</>"
if self.headers:
string += f" {self.headers}"
if self.body is not None:
string += f'{truncate(f"{self.body}")}'
string += f' {truncate(f"{self.body}")}'
return string
@ -101,10 +102,11 @@ class Response:
def __str__(self):
versioned = (self.view, self.seqno) != (None, None)
status_color = "red" if self.status_code / 100 in (4, 5) else "green"
return (
f"{self.status_code} "
+ (f"@{self.view}.{self.seqno} " if versioned else "")
+ truncate(f"{self.body}")
f"<{status_color}>{self.status_code}</> "
+ (f"@<magenta>{self.view}.{self.seqno}</> " if versioned else "")
+ f"<yellow>{truncate(str(self.body))}</>"
)
@staticmethod
@ -537,7 +539,7 @@ class CCFClient:
LOG.info(response)
return response
def _direct_call(
def _call(
self,
path: str,
body: Optional[Union[str, dict, bytes]] = None,
@ -545,16 +547,22 @@ class CCFClient:
headers: Optional[dict] = None,
signed: bool = False,
timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC,
log_capture: Optional[list] = None,
) -> Response:
description = ""
if self.description:
description = f"({self.description})" + (" [signed]" if signed else "")
description = f"{self.description}{signed * 's'}"
else:
description = self.name
if headers is None:
headers = {}
r = Request(path, body, http_verb, headers)
LOG.info(f"{self.name} {r} {description}")
return self._response(self.client_impl.request(r, signed, timeout))
flush_info([f"{description} {r}"], log_capture, 3)
response = self.client_impl.request(r, signed, timeout)
flush_info([str(response)], log_capture, 3)
return response
def call(
self,
@ -564,6 +572,7 @@ class CCFClient:
headers: Optional[dict] = None,
signed: bool = False,
timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC,
log_capture: Optional[list] = None,
) -> Response:
"""
Issues one request, synchronously, and returns the response.
@ -575,33 +584,40 @@ class CCFClient:
:param dict headers: HTTP request headers (optional).
:param bool signed: Sign request with client private key.
:param int timeout: Maximum time to wait for a response before giving up.
:param list log_capture: Rather than emit to default handler, capture log lines to list (optional).
:return: :py:class:`ccf.clients.Response`
"""
if not path.startswith("/"):
raise ValueError(f"URL path '{path}' is invalid, must start with /")
logs: List[str] = []
if self.is_connected:
return self._direct_call(path, body, http_verb, headers, signed, timeout)
r = self._call(path, body, http_verb, headers, signed, timeout, logs)
flush_info(logs, log_capture, 2)
return r
end_time = time.time() + self.connection_timeout
while True:
try:
response = self._direct_call(
path, body, http_verb, headers, signed, timeout
logs = []
response = self._call(
path, body, http_verb, headers, signed, timeout, logs
)
# Only the first request gets this timeout logic - future calls
# call _direct_call
# call _call
self.is_connected = True
flush_info(logs, log_capture, 2)
return response
except (CCFConnectionException, TimeoutError) as e:
# If the initial connection fails (e.g. due to node certificate
# not yet being endorsed by the network) sleep briefly and try again
if time.time() > end_time:
flush_info(logs, log_capture, 2)
raise CCFConnectionException(
f"Connection still failing after {self.connection_timeout}s"
) from e
LOG.debug(f"Got exception: {e}")
time.sleep(0.1)
def get(self, *args, **kwargs) -> Response:

Просмотреть файл

@ -4,11 +4,15 @@
import http
import time
from typing import Optional, List
from ccf.tx_status import TxStatus
from ccf.log_capture import flush_info
def wait_for_commit(client, seqno: int, view: int, timeout: int = 3) -> None:
def wait_for_commit(
client, seqno: int, view: int, timeout: int = 3, log_capture: Optional[list] = None
) -> None:
"""
Waits for a specific seqno/view pair to be committed by the network,
as per the node to which client is connected to.
@ -17,18 +21,22 @@ def wait_for_commit(client, seqno: int, view: int, timeout: int = 3) -> None:
:param int seqno: Transaction sequence number.
:param int view: Consensus view.
:param str timeout: Maximum time to wait for this seqno/view pair to be committed before giving up.
:param list log_capture: Rather than emit to default handler, capture log lines to list (optional).
A TimeoutError exception is raised if the commit index is not committed within the given timeout.
"""
logs: List[str] = []
end_time = time.time() + timeout
while time.time() < end_time:
r = client.get(f"/node/tx?view={view}&seqno={seqno}")
logs = []
r = client.get(f"/node/tx?view={view}&seqno={seqno}", log_capture=logs)
assert (
r.status_code == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status_code}"
assert isinstance(r.body, dict), "/node/tx should return a JSON object"
status = TxStatus(r.body["status"])
if status == TxStatus.Committed:
flush_info(logs, log_capture, 1)
return
elif status == TxStatus.Invalid:
raise RuntimeError(
@ -36,4 +44,5 @@ def wait_for_commit(client, seqno: int, view: int, timeout: int = 3) -> None:
)
else:
time.sleep(0.1)
flush_info(logs, log_capture, 1)
raise TimeoutError("Timed out waiting for commit")

13
python/ccf/log_capture.py Normal file
Просмотреть файл

@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
from loguru import logger as LOG # type: ignore
def flush_info(lines, log_capture=None, depth=0):
for line in lines:
if log_capture is None:
LOG.opt(colors=True, depth=depth + 1).info(line)
else:
log_capture.append(line)

Просмотреть файл

@ -11,6 +11,7 @@ import http
import suite.test_requirements as reqs
from ccf.tx_status import TxStatus
from ccf.log_capture import flush_info
from loguru import logger as LOG
# This test starts from a given number of nodes (hosts), commits
@ -36,9 +37,13 @@ def wait_for_seqno_to_commit(seqno, view, nodes):
"""
for _ in range(infra.network.Network.replication_delay * 10):
up_to_date_f = []
logs = {}
for f in nodes:
with f.client() as c:
r = c.get(f"/node/tx?view={view}&seqno={seqno}")
logs[f.node_id] = []
r = c.get(
f"/node/tx?view={view}&seqno={seqno}", log_capture=logs[f.node_id]
)
assert (
r.status_code == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status_code}"
@ -46,6 +51,7 @@ def wait_for_seqno_to_commit(seqno, view, nodes):
if status == TxStatus.Committed:
up_to_date_f.append(f.node_id)
elif status == TxStatus.Invalid:
flush_info(logs[f.node_id], None, 0)
raise RuntimeError(
f"Node {f.node_id} reports transaction ID {view}.{seqno} is invalid and will never be committed"
)
@ -54,6 +60,8 @@ def wait_for_seqno_to_commit(seqno, view, nodes):
if len(up_to_date_f) == len(nodes):
break
time.sleep(0.1)
for lines in logs.values():
flush_info(lines, None, 0)
assert len(up_to_date_f) == len(
nodes
), "Only {} out of {} nodes are up to date".format(len(up_to_date_f), len(nodes))

Просмотреть файл

@ -6,6 +6,8 @@ import infra.path
import infra.network
import sys
from loguru import logger as LOG
def absolute_path_to_existing_file(arg):
if not os.path.isabs(arg):
@ -16,6 +18,12 @@ def absolute_path_to_existing_file(arg):
def cli_args(add=lambda x: None, parser=None, accept_unknown=False):
LOG.remove()
LOG.add(
sys.stdout,
format="<green>{time:HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
)
if parser is None:
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter

Просмотреть файл

@ -5,7 +5,7 @@ import time
import logging
from contextlib import contextmanager
from enum import Enum, IntEnum
from ccf.clients import CCFConnectionException
from ccf.clients import CCFConnectionException, flush_info
import infra.path
import infra.proc
import infra.node
@ -550,7 +550,7 @@ class Network:
def _get_node_by_id(self, node_id):
return next((node for node in self.nodes if node.node_id == node_id), None)
def find_primary(self, timeout=3):
def find_primary(self, timeout=3, log_capture=None):
"""
Find the identity of the primary in the network and return its identity
and the current view.
@ -558,19 +558,21 @@ class Network:
primary_id = None
view = None
logs = []
end_time = time.time() + timeout
while time.time() < end_time:
for node in self.get_joined_nodes():
with node.client() as c:
try:
res = c.get("/node/primary_info")
logs = []
res = c.get("/node/primary_info", log_capture=logs)
if res.status_code == 200:
primary_id = res.body["primary_id"]
view = res.body["current_view"]
break
else:
assert "Primary unknown" in res.body, res
LOG.warning("Primary unknown. Retrying...")
except CCFConnectionException:
LOG.warning(
f"Could not successful connect to node {node.node_id}. Retrying..."
@ -580,7 +582,10 @@ class Network:
time.sleep(0.1)
if primary_id is None:
flush_info(logs, log_capture, 0)
raise PrimaryNotFound
flush_info(logs, log_capture, 0)
return (self._get_node_by_id(primary_id), view)
def find_backups(self, primary=None, timeout=3):
@ -673,16 +678,20 @@ class Network:
)
end_time = time.time() + timeout
error = TimeoutError
logs = []
while time.time() < end_time:
try:
new_primary, new_term = self.find_primary()
logs = []
new_primary, new_term = self.find_primary(log_capture=logs)
if new_primary.node_id != old_primary_id:
flush_info(logs, None)
return (new_primary, new_term)
except PrimaryNotFound:
error = PrimaryNotFound
except Exception:
pass
time.sleep(0.1)
flush_info(logs, None)
raise error(f"A new primary was not elected after {timeout} seconds")

Просмотреть файл

@ -266,7 +266,7 @@ class Node:
if identity
else None,
"ca": os.path.join(self.common_dir, "networkcert.pem"),
"description": f"node {self.node_id} as {identity or 'unauthenticated'}",
"description": f"[{self.node_id}{'|' + identity if identity is not None else ''}]",
}
akwargs.update(kwargs)
return ccf.clients.client(self.pubhost, self.rpc_port, **akwargs)