This commit is contained in:
Julien Maffre 2020-07-29 11:03:53 +01:00 коммит произвёл GitHub
Родитель 08ebb13a0d
Коммит beedc9568d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 234 добавлений и 118 удалений

Просмотреть файл

@ -1,13 +1,11 @@
Parsing the Ledger with Python
==============================
This page describes the Python API of the :py:class:`ccf.ledger` module which can be used by auditors to parse a CCF ledger.
This page describes the Python API of the :py:class:`ccf.ledger` module which can be used by auditors to parse a CCF ledger. To install the `ccf` Python package, run:
.. note:: To install the ``ccf`` python package, run:
.. code-block:: bash
.. code-block:: bash
$ pip install ccf
$ pip install ccf
Tutorial
--------
@ -36,7 +34,7 @@ In this particular example, a target table is set. This is a public table that c
:start-after: SNIPPET: target_table
:lines: 1
.. note:: In practice, it is likely that auditors will want to run more complicated logic when parsing the ledger. For example, this might involve verifying signatures transactions or auditing governance operations and looping over multiple tables.
.. note:: In practice, it is likely that auditors will want to run more elaborate logic when parsing the ledger. For example, this might involve verifying signatures transactions or auditing governance operations and looping over multiple tables.
Finally, the ledger can be iterated over. For each transaction in the ledger, the public tables changed within that transaction are observed. If the target table is included, we loop through all keys and values modified in that transaction.

Просмотреть файл

@ -10,7 +10,7 @@ from dataclasses import dataclass
from http.client import HTTPResponse
from io import BytesIO
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
from urllib3.util.ssl_ import create_urllib3_context # type: ignore
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import struct
@ -18,14 +18,14 @@ import base64
from typing import Union, Optional
import requests
from loguru import logger as LOG
from requests_http_signature import HTTPSignatureAuth
import websocket
from loguru import logger as LOG # type: ignore
from requests_http_signature import HTTPSignatureAuth # type: ignore
import websocket # type: ignore
import ccf.commit
def truncate(string, max_len=256):
def truncate(string: str, max_len: int = 256):
if len(string) > max_len:
return f"{string[: max_len]} + {len(string) - max_len} chars"
else:
@ -49,7 +49,7 @@ class Request:
#: Body of request
body: Optional[Union[dict, str]]
#: HTTP verb
http_verb: Optional[int]
http_verb: str
#: HTTP headers
headers: dict
@ -294,7 +294,14 @@ class RequestClient:
CCF default client and wrapper around Python Requests, handling HTTP signatures.
"""
def __init__(self, host, port, ca, cert=None, key=None):
def __init__(
self,
host: str,
port: int,
ca: str,
cert: Optional[str] = None,
key: Optional[str] = None,
):
self.host = host
self.port = port
self.ca = ca
@ -302,15 +309,24 @@ class RequestClient:
self.key = key
self.session = requests.Session()
self.session.verify = self.ca
self.session.cert = (self.cert, self.key)
if self.cert is not None and self.key is not None:
self.session.cert = (self.cert, self.key)
self.session.mount("https://", TlsAdapter(self.ca))
def request(self, request, signed=False, timeout=DEFAULT_REQUEST_TIMEOUT_SEC):
def request(
self,
request: Request,
signed: bool = False,
timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC,
):
extra_headers = {}
extra_headers.update(request.headers)
auth_value = None
if signed:
if self.key is None:
raise ValueError("Cannot sign request if client has no key")
auth_value = HTTPSignatureAuth_AlwaysDigest(
algorithm="ecdsa-sha256",
key=open(self.key, "rb").read(),
@ -327,6 +343,7 @@ class RequestClient:
"allow_redirects": False,
}
request_body = None
if request.body is not None:
if isinstance(request.body, str) and request.body.startswith("@"):
# Request is a file path - read contents, assume json
@ -336,7 +353,15 @@ class RequestClient:
request_args["json"] = request_body
try:
response = self.session.request(timeout=timeout, **request_args)
response = self.session.request(
method=request.http_verb,
url=f"https://{self.host}:{self.port}{request.path}",
auth=auth_value,
headers=extra_headers,
allow_redirects=False,
json=request_body,
timeout=timeout,
)
except requests.exceptions.ReadTimeout as exc:
raise TimeoutError from exc
except requests.exceptions.SSLError as exc:
@ -354,7 +379,14 @@ class WSClient:
Note: Client signatures over WebSocket are not supported by CCF.
"""
def __init__(self, host, port, ca, cert=None, key=None):
def __init__(
self,
host: str,
port: int,
ca: str,
cert: Optional[str] = None,
key: Optional[str] = None,
):
self.host = host
self.port = port
self.ca = ca
@ -369,13 +401,18 @@ class WSClient:
"Use RequestClient class instead."
)
def request(self, request, signed=False, timeout=DEFAULT_REQUEST_TIMEOUT_SEC):
def request(
self,
request: Request,
signed: bool = False,
timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC,
):
if signed:
raise RuntimeError(
raise ValueError(
"Client signatures over WebSocket are not supported by CCF"
)
if not self.ws:
if self.ws is None:
LOG.info("Creating WSS connection")
try:
self.ws = websocket.create_connection(
@ -389,6 +426,9 @@ class WSClient:
)
except Exception as exc:
raise CCFConnectionException from exc
assert self.ws is not None
payload = json.dumps(request.body).encode()
path = (request.path).encode()
header = struct.pack("<h", len(path)) + path
@ -433,20 +473,23 @@ class CCFClient:
A :py:exc:`CCFConnectionException` exception is raised if the connection is not established successfully within ``connection_timeout`` seconds.
"""
client_impl: Union[CurlClient, WSClient, RequestClient]
def __init__(
self,
host,
port,
ca,
cert=None,
key=None,
connection_timeout=DEFAULT_CONNECTION_TIMEOUT_SEC,
description=None,
ws=False,
host: str,
port: int,
ca: str,
cert: Optional[str] = None,
key: Optional[str] = None,
connection_timeout: int = DEFAULT_CONNECTION_TIMEOUT_SEC,
description: Optional[str] = None,
ws: bool = False,
):
self.connection_timeout = connection_timeout
self.description = description
self.name = f"[{host}:{port}]"
self.is_connected = False
if os.getenv("CURL_CLIENT"):
self.client_impl = CurlClient(host, port, ca, cert, key)
@ -455,20 +498,19 @@ class CCFClient:
else:
self.client_impl = RequestClient(host, port, ca, cert, key)
def _response(self, response):
def _response(self, response: Response) -> Response:
LOG.info(response)
return response
# pylint: disable=method-hidden
def _direct_call(
self,
path,
body=None,
http_verb="POST",
headers=None,
signed=False,
timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
):
path: str,
body: Optional[Union[str, dict]] = None,
http_verb: str = "POST",
headers: Optional[dict] = None,
signed: bool = False,
timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC,
) -> Response:
description = ""
if self.description:
description = f"({self.description})" + (" [signed]" if signed else "")
@ -481,25 +523,28 @@ class CCFClient:
def call(
self,
path,
body=None,
http_verb="POST",
headers=None,
signed=False,
timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
):
path: str,
body: Optional[Union[str, dict]] = None,
http_verb: str = "POST",
headers: Optional[dict] = None,
signed: bool = False,
timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC,
) -> Response:
"""
Issues one request, synchronously, and returns the response.
:param str path: URI of the targeted resource.
:param dict body: Request body (optional).
:param http_verb: HTTP verb (e.g. "POST" or "GET").
:param headers: HTTP request headers (optional).
:param str http_verb: HTTP verb (e.g. "POST" or "GET").
:param dict headers: HTTP request headers (optional).
:param bool signed: Sign request with client private key.
:param int timeout: Maximum time to wait corresponding response before giving up.
:param int timeout: Maximum time to wait for a response before giving up.
:return: :py:class:`ccf.clients.Response`
"""
if self.is_connected:
return self._direct_call(path, body, http_verb, headers, signed, timeout)
end_time = time.time() + self.connection_timeout
while True:
try:
@ -507,8 +552,8 @@ class CCFClient:
path, body, http_verb, headers, signed, timeout
)
# Only the first request gets this timeout logic - future calls
# call _direct_call directly
self.call = self._direct_call
# call _direct_call
self.is_connected = True
return response
except (CCFConnectionException, TimeoutError) as e:
# If the initial connection fails (e.g. due to node certificate
@ -520,52 +565,74 @@ class CCFClient:
LOG.debug(f"Got exception: {e}")
time.sleep(0.1)
def get(self, *args, **kwargs):
def get(self, *args, **kwargs) -> Response:
"""
Issue ``GET`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="GET", **kwargs)
if "http_verb" in kwargs:
raise ValueError('"http_verb" should not be specified')
def post(self, *args, **kwargs):
kwargs["http_verb"] = "GET"
return self.call(*args, **kwargs)
def post(self, *args, **kwargs) -> Response:
"""
Issue ``POST`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="POST", **kwargs)
if "http_verb" in kwargs:
raise ValueError('"http_verb" should not be specified')
def put(self, *args, **kwargs):
kwargs["http_verb"] = "POST"
return self.call(*args, **kwargs)
def put(self, *args, **kwargs) -> Response:
"""
Issue ``PUT`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="PUT", **kwargs)
if "http_verb" in kwargs:
raise ValueError('"http_verb" should not be specified')
def delete(self, *args, **kwargs):
kwargs["http_verb"] = "PUT"
return self.call(*args, **kwargs)
def delete(self, *args, **kwargs) -> Response:
"""
Issue ``DELETE`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="DELETE", **kwargs)
if "http_verb" in kwargs:
raise ValueError('"http_verb" should not be specified')
def head(self, *args, **kwargs):
kwargs["http_verb"] = "DELETE"
return self.call(*args, **kwargs)
def head(self, *args, **kwargs) -> Response:
"""
Issue ``HEAD`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="HEAD", **kwargs)
if "http_verb" in kwargs:
raise ValueError('"http_verb" should not be specified')
def wait_for_commit(self, response, timeout=DEFAULT_COMMIT_TIMEOUT_SEC):
kwargs["http_verb"] = "HEAD"
return self.call(*args, **kwargs)
def wait_for_commit(
self, response: Response, timeout: int = DEFAULT_COMMIT_TIMEOUT_SEC
):
"""
Given a :py:class:`ccf.clients.Response`, this functions waits
for the associated sequence number and view to be committed by the CCF network.
@ -577,6 +644,9 @@ class CCFClient:
A ``TimeoutError`` exception is raised if the transaction is not committed within ``timeout`` seconds.
"""
if response.seqno is None or response.view is None:
raise ValueError("Response seqno and view should not be None")
ccf.commit.wait_for_commit(self, response.seqno, response.view, timeout)

Просмотреть файл

@ -8,14 +8,17 @@ import time
from ccf.tx_status import TxStatus
def wait_for_commit(client, seqno, view, timeout=3):
def wait_for_commit(client, seqno: int, view: int, timeout: int = 3) -> None:
"""
Given a client to a CCF network and a seqno/view pair, this function
waits for this specific commit index to be committed by the
network in this view.
Waits for a specific seqno/view pair to be committed by the network,
as per the node to which client is connected to.
A TimeoutError exception is raised if the commit index is not globally
committed within the given timeout.
:param client: Instance of :py:class:`ccf.clients.CCFClient`
:param int seqno: Transaction sequence number.
:param int view: Consensus view.
:param str timeout: Maximum time to wait for this seqno/view pair to be committed before giving up.
A TimeoutError exception is raised if the commit index is not committed within the given timeout.
"""
end_time = time.time() + timeout
while time.time() < end_time:
@ -23,6 +26,7 @@ def wait_for_commit(client, seqno, view, timeout=3):
assert (
r.status_code == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status_code}"
assert isinstance(r.body, dict), "/node/tx should return a JSON object"
status = TxStatus(r.body["status"])
if status == TxStatus.Committed:
return

Просмотреть файл

@ -1,11 +1,15 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import io
import msgpack.fallback as msgpack # Default implementation has buggy interaction between read_bytes and tell, so use fallback
# Default implementation has buggy interaction between read_bytes and tell, so use fallback
import msgpack.fallback as msgpack # type: ignore
import struct
import os
from loguru import logger as LOG
from loguru import logger as LOG # type: ignore
from typing import BinaryIO, Optional, Set
GCM_SIZE_TAG = 16
GCM_SIZE_IV = 12
@ -45,7 +49,15 @@ class PublicDomain:
All public tables within a :py:class:`ccf.ledger.Transaction`.
"""
def __init__(self, buffer):
_buffer: io.BytesIO
_buffer_size: int
_unpacker: msgpack.Unpacker
_is_snapshot: bool
_version: int
_tables: dict
_msgpacked_tables: Set[str]
def __init__(self, buffer: io.BytesIO):
self._buffer = buffer
self._buffer_size = buffer.getbuffer().nbytes
self._unpacker = msgpack.Unpacker(self._buffer, **UNPACK_ARGS)
@ -112,9 +124,9 @@ class PublicDomain:
f"Found {read_count} reads, {write_count} writes, and {remove_count} removes"
)
def get_tables(self):
def get_tables(self) -> dict:
"""
Returns a dictionnary of all public tables (with their content) in a :py:class:`ccf.ledger.Transaction`.
Returns a dictionary of all public tables (with their content) in a :py:class:`ccf.ledger.Transaction`.
:return: Dictionnary of public tables with their content.
"""
@ -136,17 +148,20 @@ class Transaction:
A transaction represents one entry in the CCF ledger.
"""
_file = None
_total_size = 0
_public_domain_size = 0
_next_offset = LEDGER_HEADER_SIZE
_public_domain = None
_file_size = 0
gcm_header = None
_tx_offset = 0
_file: Optional[BinaryIO] = None
_total_size: int = 0
_public_domain_size: int = 0
_next_offset: int = LEDGER_HEADER_SIZE
_public_domain: Optional[PublicDomain] = None
_file_size: int = 0
gcm_header: Optional[GcmHeader] = None
_tx_offset: int = 0
def __init__(self, filename):
def __init__(self, filename: str):
self._file = open(filename, mode="rb")
if self._file is None:
raise RuntimeError(f"Ledger file {filename} could not be opened")
self._file_size = int.from_bytes(
_byte_read_safe(self._file, LEDGER_HEADER_SIZE), byteorder="little"
)
@ -170,10 +185,12 @@ class Transaction:
buffer = _byte_read_safe(self._file, LEDGER_DOMAIN_SIZE)
self._public_domain_size = to_uint_64(buffer)
def get_public_domain(self):
def get_public_domain(self) -> Optional[PublicDomain]:
"""
Retrieve the public (i.e. non-encrypted) domain for that transaction.
Note: If the transaction is private-only, nothing is returned.
:return: :py:class:`ccf.ledger.PublicDomain`
"""
if self._public_domain == None:
@ -187,6 +204,8 @@ class Transaction:
:return: Raw transaction bytes.
"""
assert self._file is not None
# remember where the pointer is in the file before we go back for the transaction bytes
header = self._file.tell()
self._file.seek(self._tx_offset)
@ -220,12 +239,16 @@ class Ledger:
:param str name: Ledger directory for a single CCF node.
"""
def __init__(self, name: str):
_filenames: list
_fileindex: int
_current_tx: Transaction
def __init__(self, directory: str):
self._filenames = []
self._fileindex = 0
ledgers = os.listdir(name)
ledgers = os.listdir(directory)
# Sorts the list based off the first number after ledger_ so that
# the ledger is verified in sequence
sorted_ledgers = sorted(
@ -236,14 +259,14 @@ class Ledger:
)
for chunk in sorted_ledgers:
if os.path.isfile(os.path.join(name, chunk)):
if os.path.isfile(os.path.join(directory, chunk)):
if not chunk.endswith(".committed"):
LOG.warning(f"The file {chunk} has not been committed")
self._filenames.append(os.path.join(name, chunk))
self._filenames.append(os.path.join(directory, chunk))
self._current_tx = Transaction(self._filenames[0])
def __next__(self):
def __next__(self) -> Transaction:
try:
return next(self._current_tx)
except StopIteration:

Просмотреть файл

@ -8,11 +8,12 @@ import json
import os
import sys
import functools
from typing import Union, Optional, Any
from loguru import logger as LOG
from loguru import logger as LOG # type: ignore
def dump_to_file(output_path, obj, dump_args):
def dump_to_file(output_path: str, obj: dict, dump_args: dict):
with open(output_path, "w") as f:
json.dump(obj, f, **dump_args)
@ -21,10 +22,6 @@ def list_as_lua_literal(l):
return str(l).translate(str.maketrans("[]", "{}"))
def script_to_vote_object(script):
return {"ballot": {"text": script}}
LUA_FUNCTION_EQUAL_ARRAYS = """function equal_arrays(a, b)
if #a ~= #b then
return false
@ -43,7 +40,9 @@ DEFAULT_VOTE_OUTPUT = "{proposal_name}_vote_for.json"
def complete_proposal_output_path(
proposal_name, proposal_output_path=None, common_dir="."
proposal_name: str,
proposal_output_path: Optional[str] = None,
common_dir: str = ".",
):
if proposal_output_path is None:
proposal_output_path = DEFAULT_PROPOSAL_OUTPUT.format(
@ -58,7 +57,9 @@ def complete_proposal_output_path(
return proposal_output_path
def complete_vote_output_path(proposal_name, vote_output_path=None, common_dir="."):
def complete_vote_output_path(
proposal_name: str, vote_output_path: Optional[str] = None, common_dir: str = "."
):
if vote_output_path is None:
vote_output_path = DEFAULT_VOTE_OUTPUT.format(proposal_name=proposal_name)
@ -70,7 +71,11 @@ def complete_vote_output_path(proposal_name, vote_output_path=None, common_dir="
return vote_output_path
def add_arg_construction(lines, arg, arg_name="args"):
def add_arg_construction(
lines: list,
arg: Union[str, collections.abc.Sequence, collections.abc.Mapping],
arg_name: str = "args",
):
if isinstance(arg, str):
lines.append(f"{arg_name} = [====[{arg}]====]")
elif isinstance(arg, collections.abc.Sequence):
@ -83,7 +88,12 @@ def add_arg_construction(lines, arg, arg_name="args"):
lines.append(f"{arg_name} = {arg}")
def add_arg_checks(lines, arg, arg_name="args", added_equal_arrays_fn=False):
def add_arg_checks(
lines: list,
arg: Union[str, collections.abc.Sequence, collections.abc.Mapping],
arg_name: str = "args",
added_equal_arrays_fn: bool = False,
):
lines.append(f"if {arg_name} == nil then return false end")
if isinstance(arg, str):
lines.append(f"if not {arg_name} == [====[{arg}]====] then return false end")
@ -110,7 +120,12 @@ def add_arg_checks(lines, arg, arg_name="args", added_equal_arrays_fn=False):
lines.append(f"if not {arg_name} == {arg} then return false end")
def build_proposal(proposed_call, args=None, inline_args=False, vote_against=False):
def build_proposal(
proposed_call: str,
args: Optional[Any] = None,
inline_args: bool = False,
vote_against: bool = False,
):
LOG.trace(f"Generating {proposed_call} proposal")
proposal_script_lines = []
@ -157,7 +172,7 @@ def cli_proposal(func):
@cli_proposal
def new_member(member_cert_path, member_enc_pubk_path, **kwargs):
def new_member(member_cert_path: str, member_enc_pubk_path: str, **kwargs):
LOG.debug("Generating new_member proposal")
# Read certs
@ -216,60 +231,60 @@ def new_member(member_cert_path, member_enc_pubk_path, **kwargs):
@cli_proposal
def retire_member(member_id, **kwargs):
def retire_member(member_id: int, **kwargs):
return build_proposal("retire_member", member_id, **kwargs)
@cli_proposal
def new_user(user_cert_path, **kwargs):
def new_user(user_cert_path: str, **kwargs):
user_cert = open(user_cert_path).read()
return build_proposal("new_user", user_cert, **kwargs)
@cli_proposal
def remove_user(user_id, **kwargs):
def remove_user(user_id: int, **kwargs):
return build_proposal("remove_user", user_id, **kwargs)
@cli_proposal
def set_user_data(user_id, user_data, **kwargs):
def set_user_data(user_id: int, user_data: dict, **kwargs):
proposal_args = {"user_id": user_id, "user_data": user_data}
return build_proposal("set_user_data", proposal_args, **kwargs)
@cli_proposal
def set_lua_app(app_script_path, **kwargs):
def set_lua_app(app_script_path: str, **kwargs):
with open(app_script_path) as f:
app_script = f.read()
return build_proposal("set_lua_app", app_script, **kwargs)
@cli_proposal
def set_js_app(app_script_path, **kwargs):
def set_js_app(app_script_path: str, **kwargs):
with open(app_script_path) as f:
app_script = f.read()
return build_proposal("set_js_app", app_script, **kwargs)
@cli_proposal
def trust_node(node_id, **kwargs):
def trust_node(node_id: int, **kwargs):
return build_proposal("trust_node", node_id, **kwargs)
@cli_proposal
def retire_node(node_id, **kwargs):
def retire_node(node_id: int, **kwargs):
return build_proposal("retire_node", node_id, **kwargs)
@cli_proposal
def new_node_code(code_digest, **kwargs):
def new_node_code(code_digest: Union[str, list], **kwargs):
if isinstance(code_digest, str):
code_digest = list(bytearray.fromhex(code_digest))
return build_proposal("new_node_code", code_digest, **kwargs)
@cli_proposal
def new_user_code(code_digest, **kwargs):
def new_user_code(code_digest: Union[str, list], **kwargs):
if isinstance(code_digest, str):
code_digest = list(bytearray.fromhex(code_digest))
return build_proposal("new_user_code", code_digest, **kwargs)
@ -296,29 +311,32 @@ def update_recovery_shares(**kwargs):
@cli_proposal
def set_recovery_threshold(threshold, **kwargs):
def set_recovery_threshold(threshold: int, **kwargs):
return build_proposal("set_recovery_threshold", threshold, **kwargs)
class ProposalGenerator:
def __init__(self, common_dir="."):
def __init__(self, common_dir: str = "."):
self.common_dir = common_dir
# Auto-generate methods wrapping inspected functions, dumping outputs to file
def wrapper(func):
@functools.wraps(func)
def wrapper_func(
*args, proposal_output_path=None, vote_output_path=None, **kwargs,
*args,
proposal_output_path_: Optional[str] = None,
vote_output_path_: Optional[str] = None,
**kwargs,
):
proposal_output_path = complete_proposal_output_path(
func.__name__,
proposal_output_path=proposal_output_path,
proposal_output_path=proposal_output_path_,
common_dir=self.common_dir,
)
vote_output_path = complete_vote_output_path(
func.__name__,
vote_output_path=vote_output_path,
vote_output_path=vote_output_path_,
common_dir=self.common_dir,
)

Просмотреть файл

@ -1,7 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
from setuptools import setup
from setuptools import setup # type: ignore
from os import path
PACKAGE_NAME = "ccf"

Просмотреть файл

@ -14,10 +14,10 @@ if len(sys.argv) < 2:
print("Client info file should be specified as first argument")
sys.exit(1)
client_info_file = sys.argv[1]
client_info_file_path = sys.argv[1]
client_info = {}
with open(sys.argv[1]) as client_info_file:
with open(client_info_file_path) as client_info_file:
client_info = json.load(client_info_file)
host = client_info["host"]

Просмотреть файл

@ -41,7 +41,7 @@ if [ ! -f "scripts/env/bin/activate" ]
fi
source scripts/env/bin/activate
pip --disable-pip-version-check install black pylint 1>/dev/null
pip --disable-pip-version-check install black pylint mypy 1>/dev/null
echo "Python format"
if [ $FIX -ne 0 ]; then
@ -54,4 +54,7 @@ fi
pip --disable-pip-version-check install -U -r tests/requirements.txt 1>/dev/null
echo "Python lint"
find tests/ python/ -type f -name "*.py" -exec python -m pylint {} +
find tests/ python/ -type f -name "*.py" -exec python -m pylint {} +
echo "Python types"
mypy python/