Python clients cleanup and docs (#1429)

This commit is contained in:
Julien Maffre 2020-07-27 17:34:16 +01:00 коммит произвёл GitHub
Родитель c5cd8656cf
Коммит f798d8b902
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
37 изменённых файлов: 667 добавлений и 376 удалений

Просмотреть файл

@ -27,6 +27,7 @@ jobs:
source env/bin/activate
pip install wheel
pip install -U -r doc/requirements.txt
pip install -U -e ./python
sphinx-multiversion doc build/html
displayName: Sphinx

Просмотреть файл

@ -7,30 +7,8 @@ steps:
displayName: 'Install'
- script: |
INSTALL_PREFIX=`cat /tmp/install_prefix`
mkdir -p nested/run
cd nested/run
python3.7 -m venv env
source env/bin/activate
python -m pip install -U -r $INSTALL_PREFIX/bin/requirements.txt
pip freeze > $INSTALL_PREFIX/bin/requirements.txt
timeout --signal=SIGINT --kill-after=30s --preserve-status 30s \
python $INSTALL_PREFIX/bin/start_network.py \
-p ../../build/liblogging \
-b $INSTALL_PREFIX/bin \
-g $(pwd)/../../src/runtime_config/gov.lua \
-v
cp -r ./workspace/start_network_0/0.ledger .
cp ./workspace/start_network_0/network_enc_pubk.pem .
timeout --signal=SIGINT --kill-after=30s --preserve-status 30s \
python $INSTALL_PREFIX/bin/start_network.py \
-p ../../build/liblogging \
-b $INSTALL_PREFIX/bin \
-v \
--recover \
--ledger-dir 0.ledger \
--network-enc-pubk network_enc_pubk.pem \
--common-dir ./workspace/start_network_common/
cat /tmp/install_prefix | xargs ./test_install.sh
workingDirectory: build
displayName: Test installed CCF
- script: |

Просмотреть файл

@ -101,7 +101,9 @@ foreach(UTILITY ${CCF_UTILITIES})
endforeach()
# Copy utilities from tests directory
set(CCF_TEST_UTILITIES tests.sh cimetrics_env.sh upload_pico_metrics.py)
set(CCF_TEST_UTILITIES tests.sh cimetrics_env.sh upload_pico_metrics.py
test_install.sh
)
foreach(UTILITY ${CCF_TEST_UTILITIES})
configure_file(
${CCF_DIR}/tests/${UTILITY} ${CMAKE_CURRENT_BINARY_DIR} COPYONLY

Просмотреть файл

@ -14,9 +14,9 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
# import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
import os
import sys
sys.path.insert(0, os.path.abspath('../python'))
# -- Project information -----------------------------------------------------
@ -51,6 +51,7 @@ extensions = [
"sphinx.ext.githubpages",
"sphinx_multiversion",
"sphinx_copybutton",
"sphinx.ext.autodoc"
]
autosectionlabel_prefix_document = True
@ -205,6 +206,12 @@ html_context = {
"doc_path": "doc/",
}
# Python autodoc options
autodoc_default_options = {
'member-order': 'bysource',
}
def setup(self):
import subprocess

Просмотреть файл

@ -24,7 +24,7 @@ The following command will run a simple 3-node test network on a single machine:
$ cd CCF/build
$ ../start_test_network.sh --package ./liblogging.enclave.so.signed
$ ../start_test_network.sh --package ./liblogging.enclave.so.signed
Setting up Python environment...
...
Python environment successfully setup
@ -34,7 +34,7 @@ The following command will run a simple 3-node test network on a single machine:
[2020-07-17 15:27:46.643] Node [ 1] = 127.98.174.190:36795
[2020-07-17 15:27:46.643] Node [ 2] = 127.113.40.231:33227
[2020-07-17 15:27:46.644] You can now issue business transactions to the ./liblogging.enclave.so.signed application.
[2020-07-17 15:27:46.644] Certificates have been copied to /data/src/CCF/build/workspace/test_network_common
[2020-07-17 15:27:46.644] Keys and certificates have been copied to the common folder: /data/src/CCF/build/workspace/test_network_common
[2020-07-17 15:27:46.644] See https://microsoft.github.io/CCF/users/issue_commands.html for more information.
[2020-07-17 15:27:46.644] Press Ctrl+C to shutdown the network.

Просмотреть файл

@ -63,6 +63,7 @@ You can quickly spin up a CCF network and start :ref:`issuing commands to the de
[2019-10-29 14:48:12.138] Node [ 1] = 127.169.74.37:58343
[2019-10-29 14:48:12.138] Node [ 2] = 127.131.108.179:50532
[2019-10-29 14:48:12.138] You can now issue business transactions to the ./liblogging.enclave.so.signed application.
[2019-10-29 14:48:12.138] Keys and certificates have been copied to the common folder: /data/src/CCF/build/workspace/test_network_common
[2019-10-29 14:48:12.138] See https://microsoft.github.io/CCF/users/issue_commands.html for more information.
[2019-10-29 14:48:12.138] Press Ctrl+C to shutdown the network.

Просмотреть файл

@ -19,6 +19,7 @@ For example, deploying the ``liblogging`` example application:
[2019-10-29 14:48:12.138] Node [ 1] = 127.169.74.37:58343
[2019-10-29 14:48:12.138] Node [ 2] = 127.131.108.179:50532
[2019-10-29 14:48:12.138] You can now issue business transactions to the ./liblogging.enclave.so.signed application.
[2019-10-29 14:48:12.138] Keys and certificates have been copied to the common folder: /path/to/test_network_common
[2019-10-29 14:48:12.138] See https://microsoft.github.io/CCF/users/issue_commands.html for more information.
[2019-10-29 14:48:12.138] Press Ctrl+C to shutdown the network.
@ -54,6 +55,7 @@ The ``start_test_network.sh`` script can also be used to automatically recover a
[2020-05-14 14:50:24.388] Node [ 4] = 127.184.250.157:35113
[2020-05-14 14:50:24.388] Node [ 5] = 127.175.51.36:34699
[2020-05-14 14:50:24.388] You can now issue business transactions to the liblogging.enclave.so.signed application.
[2020-05-14 14:50:24.388] Keys and certificates have been copied to the common folder: /path/to/test_network_common
[2020-05-14 14:50:24.388] See https://microsoft.github.io/CCF/users/issue_commands.html for more information.
[2020-05-14 14:50:24.388] Press Ctrl+C to shutdown the network.

Просмотреть файл

@ -20,4 +20,6 @@ Before issuing business transactions to CCF, the certificates of trusted users n
deploy_app
issue_commands
python_tutorial
python_api
rpc_api

10
doc/users/python_api.rst Normal file
Просмотреть файл

@ -0,0 +1,10 @@
Python Client API
=================
.. autoclass:: ccf.clients.CCFClient
:members:
.. autoclass:: ccf.clients.Response
:members:
.. autoexception:: ccf.clients.CCFConnectionException

Просмотреть файл

@ -0,0 +1,87 @@
Python Client Tutorial
======================
This tutorial describes how a Python client can securely issue requests to a running CCF network. It is assumed that the CCF network has already been started (e.g. after having :ref:`deployed a test network <users/deploy_app:Deploying an Application>`).
.. note:: See :ref:`Python Client API <users/python_api:Python Client API>` for the complete API specification.
First, install the CCF Python package:
.. code-block:: bash
$ pip install ccf
Then, in the Python interpreter or new file:
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET: import_clients
:lines: 1
Set the following CCF node variables:
.. code-block:: python
host = "<node-host>" # Node address or domain (str)
port = <node-port> # Node port (int)
ca = "<path/to/network/cert>" # Network certificate path
.. note:: :ref:`When deploying a test network <users/deploy_app:Deploying an Application>`, use any node's IP address and port number. All certificates and keys can be found in the associated ``common_dir`` folder.
Create a new :py:class:`ccf.clients.CCFClient` instance which will create a secure TLS connection to the target node part of the network specified via ``ca``:
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET: anonymous_client
:lines: 1
You can then use the ``anonymous_client`` to issue requests that do not require authentication (typically, ``GET`` endpoints under ``/node``). Every call returns a :py:class:`ccf.clients.Response` object associated with the HTTP response.
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET_START: anonymous_requests
:end-before: SNIPPET_END: anonymous_requests
To create an authenticated client and issue application or governance requests, the client identity (certificate and private key) should be specified:
.. code-block:: python
cert = "</path/to/client/cert>" # Client certificate path
key = "</path/to/client/private/key>" # Private key certificate path
Create a new instance of :py:class:`ccf.clients.CCFClient`, this time specifying the client's certificate and private key:
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET: authenticated_client
:lines: 1
The authenticated client can then be used to issue ``POST`` requests, e.g. registering new public and private messages to the default logging application:
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET_START: authenticated_post_requests
:end-before: SNIPPET_END: authenticated_post_requests
It is possible to use the same :py:class:`ccf.clients.CCFClient` instance to wait for the transaction to be committed by the network:
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET: wait_for_commit
:lines: 1
In fact, even an anonymous client can be used to verify that a transaction is committed. This is because only the sequence number and view associated with the transaction are required to verify that a transaction is committed.
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET: any_client_can_wait
:lines: 1
.. warning:: This does not imply that the content of a confidential transaction issued by an authenticated client is visible by an unauthenticated client. Access control to the confidential resource is handled by the CCF application logic.
Finally, the authenticated client can be used to issue ``GET`` requests and verify that the previous messages have successfully been recorded:
.. literalinclude:: ../../python/tutorial.py
:language: py
:start-after: SNIPPET_START: authenticated_get_requests
:end-before: SNIPPET_END: authenticated_get_requests

Просмотреть файл

@ -6,7 +6,7 @@ import time
import os
import subprocess
import tempfile
import urllib.parse
from dataclasses import dataclass
from http.client import HTTPResponse
from io import BytesIO
from requests.adapters import HTTPAdapter
@ -15,12 +15,15 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
import struct
import base64
from typing import Union, Optional
import requests
from loguru import logger as LOG
from requests_http_signature import HTTPSignatureAuth
import websocket
import ccf.commit
def truncate(string, max_len=256):
if len(string) > max_len:
@ -36,31 +39,28 @@ CCF_GLOBAL_COMMIT_HEADER = "x-ccf-global-commit"
DEFAULT_CONNECTION_TIMEOUT_SEC = 3
DEFAULT_REQUEST_TIMEOUT_SEC = 3
DEFAULT_COMMIT_TIMEOUT_SEC = 3
@dataclass
class Request:
def __init__(
self, path, params=None, http_verb="POST", headers=None, params_in_query=None
):
if headers is None:
headers = {}
# TODO: remove
if params_in_query is None:
params_in_query = http_verb == "GET"
self.path = path
self.params = params
self.http_verb = http_verb
self.headers = headers
self.params_in_query = params_in_query
#: Resource path (with optional query string)
path: str
#: Body of request
body: Optional[Union[dict, str]]
#: HTTP verb
http_verb: Optional[int]
#: HTTP headers
headers: dict
def __str__(self):
return (
f"{self.http_verb} {self.path} {self.headers} " + truncate(f"{self.params}")
if self.params is not None
else ""
)
string = f"{self.http_verb} {self.path}"
if self.headers:
string += f" {self.headers}"
if self.body is not None:
string += f'{truncate(f"{self.body}")}'
return string
def int_or_none(v):
@ -75,28 +75,29 @@ class FakeSocket:
return self.file
@dataclass
class Response:
def __init__(self, status, body, seqno, view, global_commit, headers):
self.status = status
self.body = body
self.seqno = seqno
self.view = view
self.global_commit = global_commit
self.headers = headers
"""
Response to request sent via :py:class:`ccf.clients.CCFClient`
"""
# TODO: what's this for?
def to_dict(self):
return {
"seqno": self.seqno,
"global_commit": self.global_commit,
"view": self.view,
"body": self.body,
}
#: Response HTTP status code
status_code: int
#: Response body
body: Optional[Union[str, dict]]
#: CCF sequence number
seqno: Optional[int]
#: CCF consensus view
view: Optional[int]
#: CCF global commit sequence number (deprecated)
global_commit: Optional[int]
#: Response HTTP headers
headers: dict
def __str__(self):
versioned = (self.view, self.seqno) != (None, None)
return (
f"{self.status} "
f"{self.status_code} "
+ (f"@{self.view}.{self.seqno} " if versioned else "")
+ truncate(f"{self.body}")
)
@ -114,7 +115,7 @@ class Response:
raise ValueError(f"Unhandled content type: {content_type}")
return Response(
status=rr.status_code,
status_code=rr.status_code,
body=parsed_body,
seqno=int_or_none(rr.headers.get(CCF_TX_SEQNO_HEADER)),
view=int_or_none(rr.headers.get(CCF_TX_VIEW_HEADER)),
@ -140,7 +141,7 @@ class Response:
raise ValueError(f"Unhandled content type: {content_type}")
return Response(
status=response.status,
response.status,
body=parsed_body,
seqno=int_or_none(response.getheader(CCF_TX_SEQNO_HEADER)),
view=int_or_none(response.getheader(CCF_TX_VIEW_HEADER)),
@ -159,14 +160,10 @@ def human_readable_size(n):
class CCFConnectionException(Exception):
pass
def build_query_string(params):
return "&".join(
f"{urllib.parse.quote_plus(k)}={urllib.parse.quote_plus(json.dumps(v))}"
for k, v in params.items()
)
"""
Exception raised if a :py:class:`ccf.clients.CCFClient` instance cannot successfully establish
a connection with a target CCF node.
"""
def get_curve(ca_file):
@ -179,28 +176,16 @@ def get_curve(ca_file):
class CurlClient:
"""
We keep this around in a limited fashion still, because
the resulting logs nicely illustrate manual usage in a way using the requests API doesn't
This client uses Curl to send HTTP requests to CCF, and logs all Curl commands it runs.
These commands could also be run manually, or used by another client tool.
"""
def __init__(
self,
host,
port,
cert=None,
key=None,
ca=None,
binary_dir=".",
request_timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
**kwargs,
):
def __init__(self, host, port, ca=None, cert=None, key=None):
self.host = host
self.port = port
self.ca = ca
self.cert = cert
self.key = key
self.ca = ca
self.binary_dir = binary_dir
self.request_timeout = request_timeout
ca_curve = get_curve(self.ca)
if ca_curve.name == "secp256k1":
@ -209,37 +194,33 @@ class CurlClient:
"Use RequestClient class instead."
)
def request(self, request, is_signed=False):
def request(self, request, signed=False, timeout=DEFAULT_REQUEST_TIMEOUT_SEC):
with tempfile.NamedTemporaryFile() as nf:
if is_signed:
cmd = [os.path.join(self.binary_dir, "scurl.sh")]
if signed:
cmd = ["scurl.sh"]
else:
cmd = ["curl"]
url = f"https://{self.host}:{self.port}{request.path}"
if request.params_in_query:
if request.params is not None:
url += f"?{build_query_string(request.params)}"
cmd += [
url,
"-X",
request.http_verb,
"-i",
f"-m {self.request_timeout}",
f"-m {timeout}",
]
if not request.params_in_query and request.params is not None:
if isinstance(request.params, str) and request.params.startswith("@"):
if request.body is not None:
if isinstance(request.body, str) and request.body.startswith("@"):
# Request is already a file path - pass it directly
cmd.extend(["--data-binary", request.params])
cmd.extend(["--data-binary", request.body])
else:
# Write request to temp file
if isinstance(request.params, bytes):
msg_bytes = request.params
# Write request body to temp file
if isinstance(request.body, bytes):
msg_bytes = request.body
else:
msg_bytes = json.dumps(request.params).encode()
msg_bytes = json.dumps(request.body).encode()
LOG.debug(f"Writing request body: {truncate(msg_bytes)}")
nf.write(msg_bytes)
nf.flush()
@ -273,6 +254,10 @@ class CurlClient:
class TlsAdapter(HTTPAdapter):
"""
Support for secp256k1 as node and network identity curve.
"""
def __init__(self, ca_file):
self.ca_curve = None
if ca_file is not None:
@ -289,6 +274,10 @@ class TlsAdapter(HTTPAdapter):
class HTTPSignatureAuth_AlwaysDigest(HTTPSignatureAuth):
"""
Support for HTTP signatures with empty body.
"""
def add_digest(self, request):
# Add digest of empty body, never leave it blank
if request.body is None:
@ -301,33 +290,27 @@ class HTTPSignatureAuth_AlwaysDigest(HTTPSignatureAuth):
class RequestClient:
def __init__(
self,
host,
port,
cert=None,
key=None,
ca=None,
request_timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
**kwargs,
):
"""
CCF default client and wrapper around Python Requests, handling HTTP signatures.
"""
def __init__(self, host, port, ca, cert=None, key=None):
self.host = host
self.port = port
self.ca = ca
self.cert = cert
self.key = key
self.ca = ca
self.request_timeout = request_timeout
self.session = requests.Session()
self.session.verify = self.ca
self.session.cert = (self.cert, self.key)
self.session.mount("https://", TlsAdapter(self.ca))
def request(self, request, is_signed=False):
def request(self, request, signed=False, timeout=DEFAULT_REQUEST_TIMEOUT_SEC):
extra_headers = {}
extra_headers.update(request.headers)
auth_value = None
if is_signed:
if signed:
auth_value = HTTPSignatureAuth_AlwaysDigest(
algorithm="ecdsa-sha256",
key=open(self.key, "rb").read(),
@ -344,21 +327,16 @@ class RequestClient:
"allow_redirects": False,
}
if request.params is not None:
request_params = request.params
if isinstance(request.params, str) and request.params.startswith("@"):
if request.body is not None:
if isinstance(request.body, str) and request.body.startswith("@"):
# Request is a file path - read contents, assume json
request_params = json.load(open(request.params[1:]))
if request.params_in_query:
request_args["params"] = build_query_string(request_params)
request_body = json.load(open(request.body[1:]))
else:
request_args["json"] = request_params
request_body = request.body
request_args["json"] = request_body
try:
response = self.session.request(
timeout=self.request_timeout, **request_args
)
response = self.session.request(timeout=timeout, **request_args)
except requests.exceptions.ReadTimeout as exc:
raise TimeoutError from exc
except requests.exceptions.SSLError as exc:
@ -370,26 +348,32 @@ class RequestClient:
class WSClient:
def __init__(
self,
host,
port,
cert=None,
key=None,
ca=None,
request_timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
**kwargs,
):
"""
CCF WebSocket client implementation.
Note: Client signatures over WebSocket are not supported by CCF.
"""
def __init__(self, host, port, ca, cert=None, key=None):
self.host = host
self.port = port
self.ca = ca
self.cert = cert
self.key = key
self.ca = ca
self.request_timeout = request_timeout
self.ws = None
def request(self, request, is_signed=False):
assert not is_signed
ca_curve = get_curve(self.ca)
if ca_curve.name == "secp256k1":
raise RuntimeError(
f"WSClient cannot perform TLS handshake with {ca_curve.name} ECDH curve. "
"Use RequestClient class instead."
)
def request(self, request, signed=False, timeout=DEFAULT_REQUEST_TIMEOUT_SEC):
if signed:
raise RuntimeError(
"Client signatures over WebSocket are not supported by CCF"
)
if not self.ws:
LOG.info("Creating WSS connection")
@ -401,11 +385,11 @@ class WSClient:
"keyfile": self.key,
"ca_certs": self.ca,
},
timeout=self.request_timeout,
timeout=timeout,
)
except Exception as exc:
raise CCFConnectionException from exc
payload = json.dumps(request.params).encode()
payload = json.dumps(request.body).encode()
path = (request.path).encode()
header = struct.pack("<h", len(path)) + path
# FIN, no RSV, BIN, UNMASKED every time, because it's all we support right now
@ -414,58 +398,114 @@ class WSClient:
)
self.ws.send_frame(frame)
out = self.ws.recv_frame().data
(status,) = struct.unpack("<h", out[:2])
(status_code,) = struct.unpack("<h", out[:2])
(seqno,) = struct.unpack("<Q", out[2:10])
(view,) = struct.unpack("<Q", out[10:18])
(global_commit,) = struct.unpack("<Q", out[18:26])
payload = out[26:]
# TODO: move out the decoding!
if status == 200:
if status_code == 200:
body = json.loads(payload) if payload else None
else:
body = payload.decode()
return Response(status, body, seqno, view, global_commit, headers={})
return Response(status_code, body, seqno, view, global_commit, headers={})
class CCFClient:
def __init__(self, host, port, *args, **kwargs):
self.description = (
kwargs.pop("description") if "description" in kwargs else None
)
self.connection_timeout = (
kwargs.pop("connection_timeout")
if "connection_timeout" in kwargs
else DEFAULT_CONNECTION_TIMEOUT_SEC
)
"""
Client used to connect securely and issue requests to a given CCF node.
This is a very thin wrapper around Python Requests with TLS with added:
- Retry logic when connecting to nodes that are joining the network
- Support for HTTP signatures (https://tools.ietf.org/html/draft-cavage-http-signatures-12).
Note: Experimental support for WebSocket is also available by setting the ``ws`` parameter to ``True``.
:param str host: RPC IP address or domain name of node to connect to.
:param int port: RPC port number of node to connect to.
:param str ca: Path to CCF network certificate.
:param str cert: Path to client certificate (optional).
:param str key: Path to client private key (optional).
:param int connection_timeout: Maximum time to wait for successful connection establishment before giving up.
:param str description: Message to print on each request emitted with this client.
:param bool ws: Use WebSocket client (experimental).
A :py:exc:`CCFConnectionException` exception is raised if the connection is not established successfully within ``connection_timeout`` seconds.
"""
def __init__(
self,
host,
port,
ca,
cert=None,
key=None,
connection_timeout=DEFAULT_CONNECTION_TIMEOUT_SEC,
description=None,
ws=False,
):
self.connection_timeout = connection_timeout
self.description = description
self.name = f"[{host}:{port}]"
if os.getenv("CURL_CLIENT"):
self.client_impl = CurlClient(host, port, *args, **kwargs)
elif os.getenv("WEBSOCKETS_CLIENT") or kwargs.get("ws"):
self.client_impl = WSClient(host, port, *args, **kwargs)
self.client_impl = CurlClient(host, port, ca, cert, key)
elif os.getenv("WEBSOCKETS_CLIENT") or ws:
self.client_impl = WSClient(host, port, ca, cert, key)
else:
self.client_impl = RequestClient(host, port, *args, **kwargs)
self.client_impl = RequestClient(host, port, ca, cert, key)
def _response(self, response):
LOG.info(response)
return response
# pylint: disable=method-hidden
def _direct_call(self, method, *args, **kwargs):
is_signed = "signed" in kwargs and kwargs.pop("signed")
r = Request(method, *args, **kwargs)
def _direct_call(
self,
path,
body=None,
http_verb="POST",
headers=None,
signed=False,
timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
):
description = ""
if self.description:
description = f" ({self.description})" + (" [signed]" if is_signed else "")
LOG.info(f"{self.name} {r} ({description})")
return self._response(self.client_impl.request(r, is_signed))
description = f"({self.description})" + (" [signed]" if signed else "")
def call(self, *args, **kwargs):
if headers is None:
headers = {}
r = Request(path, body, http_verb, headers)
LOG.info(f"{self.name} {r} {description}")
return self._response(self.client_impl.request(r, signed, timeout))
def call(
self,
path,
body=None,
http_verb="POST",
headers=None,
signed=False,
timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
):
"""
Issues one request, synchronously, and returns the response.
:param str path: URI of the targeted resource.
:param dict body: Request body (optional).
:param http_verb: HTTP verb (e.g. "POST" or "GET").
:param headers: HTTP request headers (optional).
:param bool signed: Sign request with client private key.
:param int timeout: Maximum time to wait corresponding response before giving up.
:return: :py:class:`ccf.clients.Response`
"""
end_time = time.time() + self.connection_timeout
while True:
try:
response = self._direct_call(*args, **kwargs)
response = self._direct_call(
path, body, http_verb, headers, signed, timeout
)
# Only the first request gets this timeout logic - future calls
# call _direct_call directly
self.call = self._direct_call
@ -481,45 +521,65 @@ class CCFClient:
time.sleep(0.1)
def get(self, *args, **kwargs):
"""
Issue ``GET`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="GET", **kwargs)
def post(self, *args, **kwargs):
"""
Issue ``POST`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="POST", **kwargs)
def put(self, *args, **kwargs):
"""
Issue ``PUT`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="PUT", **kwargs)
def delete(self, *args, **kwargs):
"""
Issue ``DELETE`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="DELETE", **kwargs)
def head(self, *args, **kwargs):
"""
Issue ``HEAD`` request.
See :py:meth:`ccf.clients.CCFClient.call`.
:return: :py:class:`ccf.clients.Response`
"""
return self.call(*args, http_verb="HEAD", **kwargs)
def wait_for_commit(self, response, timeout=DEFAULT_COMMIT_TIMEOUT_SEC):
"""
Given a :py:class:`ccf.clients.Response`, this functions waits
for the associated sequence number and view to be committed by the CCF network.
The client will poll the ``/node/tx`` endpoint until ``COMMITTED`` is returned.
:param ccf.clients.Response response: Response returned by :py:meth:`ccf.clients.CCFClient.call`
:param int timeout: Maximum time (secs) to wait for commit before giving up.
A ``TimeoutError`` exception is raised if the transaction is not committed within ``timeout`` seconds.
"""
ccf.commit.wait_for_commit(self, response.seqno, response.view, timeout)
@contextlib.contextmanager
def client(
host,
port,
cert=None,
key=None,
ca=None,
description=None,
binary_dir=".",
connection_timeout=DEFAULT_CONNECTION_TIMEOUT_SEC,
request_timeout=DEFAULT_REQUEST_TIMEOUT_SEC,
ws=False,
):
c = CCFClient(
host=host,
port=port,
cert=cert,
key=key,
ca=ca,
description=description,
binary_dir=binary_dir,
connection_timeout=connection_timeout,
request_timeout=request_timeout,
ws=ws,
)
yield c
def client(*args, **kwargs):
yield CCFClient(*args, **kwargs)

35
python/ccf/commit.py Normal file
Просмотреть файл

@ -0,0 +1,35 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import http
import time
from ccf.tx_status import TxStatus
def wait_for_commit(client, seqno, view, timeout=3):
"""
Given a client to a CCF network and a seqno/view pair, this function
waits for this specific commit index to be committed by the
network in this view.
A TimeoutError exception is raised if the commit index is not globally
committed within the given timeout.
"""
end_time = time.time() + timeout
while time.time() < end_time:
r = client.get(f"/node/tx?view={view}&seqno={seqno}")
assert (
r.status_code == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status_code}"
status = TxStatus(r.body["status"])
if status == TxStatus.Committed:
return
elif status == TxStatus.Invalid:
raise RuntimeError(
f"Transaction ID {view}.{seqno} is marked invalid and will never be committed"
)
else:
time.sleep(0.1)
raise TimeoutError("Timed out waiting for commit")

65
python/tutorial.py Normal file
Просмотреть файл

@ -0,0 +1,65 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import sys
import http
import json
import os
# SNIPPET: import_clients
import ccf.clients
# Load client info file.
if len(sys.argv) < 2:
print("Client info file should be specified as first argument")
sys.exit(1)
client_info_file = sys.argv[1]
client_info = {}
with open(sys.argv[1]) as client_info_file:
client_info = json.load(client_info_file)
host = client_info["host"]
port = client_info["port"]
common_dir = client_info["common_dir"]
ca = os.path.join(common_dir, "networkcert.pem")
cert = os.path.join(common_dir, "user0_cert.pem")
key = os.path.join(common_dir, "user0_privk.pem")
# Client info loaded. Tutorial starts below.
# SNIPPET: anonymous_client
anonymous_client = ccf.clients.CCFClient(host, port, ca)
# SNIPPET_START: anonymous_requests
r = anonymous_client.get("/node/state")
assert r.status_code == http.HTTPStatus.OK
r = anonymous_client.get("/node/network")
assert r.status_code == http.HTTPStatus.OK
# SNIPPET_END: anonymous_requests
# SNIPPET: authenticated_client
user_client = ccf.clients.CCFClient(host, port, ca, cert, key)
# SNIPPET_START: authenticated_post_requests
r = user_client.post("/app/log/private", body={"id": 0, "msg": "Private message"})
assert r.status_code == http.HTTPStatus.OK
r = user_client.post("/app/log/public", body={"id": 0, "msg": "Public message"})
assert r.status_code == http.HTTPStatus.OK
# SNIPPET_END: authenticated_post_requests
# SNIPPET: wait_for_commit
user_client.wait_for_commit(r)
# SNIPPET: any_client_can_wait
anonymous_client.wait_for_commit(r)
# SNIPPET_START: authenticated_get_requests
r = user_client.get("/app/log/private?id=0")
assert r.status_code == http.HTTPStatus.OK
assert r.body == {"msg": "Private message"}
r = user_client.get("/app/log/public?id=0")
assert r.status_code == http.HTTPStatus.OK
assert r.body == {"msg": "Public message"}
# SNIPPET_END: authenticated_get_requests

Просмотреть файл

@ -3,7 +3,7 @@
import infra.e2e_args
import infra.network
import ccf.proposal_generator
import ccf.checker
import infra.checker
import os
import logging
from time import gmtime, strftime, perf_counter
@ -98,7 +98,7 @@ def run(args):
for regulator in regulators:
with primary.user_client(format="msgpack", user_id=regulator.name) as c:
check = ccf.checker.Checker()
check = infra.checker.Checker()
check(
c.post(
@ -125,7 +125,7 @@ def run(args):
with primary.user_client(format="msgpack", user_id=regulators[0].name) as c:
for bank in banks:
check = ccf.checker.Checker()
check = infra.checker.Checker()
check(
c.post(

Просмотреть файл

@ -2,7 +2,7 @@
# Licensed under the Apache 2.0 License.
import infra.e2e_args
import infra.network
import ccf.checker
import infra.checker
import ccf.proposal_generator
import logging
@ -41,7 +41,7 @@ def run(args):
with infra.network.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
check = ccf.checker.Checker()
check = infra.checker.Checker()
network.start_and_join(args)
primary, others = network.find_nodes()
@ -107,7 +107,7 @@ def run(args):
# As permissioned manager, register regulator and banks
with primary.client() as mc:
check_commit = ccf.checker.Checker(mc)
check_commit = infra.checker.Checker(mc)
with primary.client(f"user{manager.name}") as c:
check(

Просмотреть файл

@ -320,7 +320,8 @@ namespace http
}
// On any exception, close the connection.
LOG_TRACE_FMT("Closing connection due to exception: {}", e.what());
LOG_FAIL_FMT("Closing connection");
LOG_DEBUG_FMT("Closing connection due to exception: {}", e.what());
close();
throw;
}

Просмотреть файл

@ -4,7 +4,7 @@ import infra.e2e_args
import time
import infra.network
import infra.proc
import ccf.checker
import infra.checker
import contextlib
import resource
import psutil
@ -19,7 +19,7 @@ def run(args):
with infra.network.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
check = ccf.checker.Checker()
check = infra.checker.Checker()
network.start_and_join(args)
primary, _ = network.find_nodes()

Просмотреть файл

@ -8,7 +8,7 @@ import infra.network
import infra.proc
import infra.notification
import infra.net
import ccf.checker
import infra.checker
import suite.test_requirements as reqs
import infra.e2e_args
@ -24,8 +24,8 @@ def test(network, args, batch_size=100, write_key_divisor=1, write_size_multipli
primary, _ = network.find_primary()
# Set extended timeout, since some of these successful transactions will take many seconds
with primary.client("user0", request_timeout=30) as c:
check = ccf.checker.Checker()
with primary.client("user0") as c:
check = infra.checker.Checker()
message_ids = [next(id_gen) for _ in range(batch_size)]
messages = [
@ -42,6 +42,7 @@ def test(network, args, batch_size=100, write_key_divisor=1, write_size_multipli
"write_key_divisor": write_key_divisor,
"write_size_multiplier": write_size_multiplier,
},
timeout=30,
),
result=len(messages),
)
@ -50,7 +51,7 @@ def test(network, args, batch_size=100, write_key_divisor=1, write_size_multipli
f"Submitting {batch_size} new keys took {post_submit - pre_submit}s"
)
fetch_response = c.post("/app/BATCH_fetch", message_ids)
fetch_response = c.post("/app/BATCH_fetch", message_ids, timeout=30)
if write_key_divisor == 1 and write_size_multiplier == 1:
check(fetch_response, result=messages)
@ -100,6 +101,7 @@ def run_to_destruction(args):
) as network:
network.start_and_join(args)
LOG.warning("About to issue transactions until destruction")
try:
wsm = 5000
while True:
@ -123,7 +125,7 @@ def run_to_destruction(args):
break
if time.time() > end_time:
raise TimeoutError(f"Node took longer than {end_time}s to terminate")
raise TimeoutError(f"Node took longer than {timeout}s to terminate")
network.ignore_errors_on_shutdown()

Просмотреть файл

@ -6,7 +6,7 @@ import suite.test_requirements as reqs
import infra.logging_app as app
import infra.e2e_args
from ccf.tx_status import TxStatus
import ccf.checker
import infra.checker
import inspect
import http
import ssl
@ -80,8 +80,8 @@ def test_large_messages(network, args):
primary, _ = network.find_primary()
with primary.client() as nc:
check_commit = ccf.checker.Checker(nc)
check = ccf.checker.Checker()
check_commit = infra.checker.Checker(nc)
check = infra.checker.Checker()
with primary.client("user0") as c:
log_id = 44
@ -91,9 +91,7 @@ def test_large_messages(network, args):
c.post("/app/log/private", {"id": log_id, "msg": long_msg}),
result=True,
)
check(
c.get("/app/log/private", {"id": log_id}), result={"msg": long_msg}
)
check(c.get(f"/app/log/private?id={log_id}"), result={"msg": long_msg})
log_id += 1
return network
@ -107,8 +105,8 @@ def test_remove(network, args):
primary, _ = network.find_primary()
with primary.client() as nc:
check_commit = ccf.checker.Checker(nc)
check = ccf.checker.Checker()
check_commit = infra.checker.Checker(nc)
check = infra.checker.Checker()
with primary.client("user0") as c:
log_id = 44
@ -119,12 +117,11 @@ def test_remove(network, args):
check_commit(
c.post(resource, {"id": log_id, "msg": msg}), result=True,
)
check(c.get(resource, {"id": log_id}), result={"msg": msg})
check(c.get(f"{resource}?id={log_id}"), result={"msg": msg})
check(
c.delete(resource, {"id": log_id}, params_in_query=True),
result=None,
c.delete(f"{resource}?id={log_id}"), result=None,
)
get_r = c.get(resource, {"id": log_id})
get_r = c.get(f"{resource}?id={log_id}")
if args.package == "libjs_generic":
check(
get_r, result={"error": "No such key"},
@ -154,7 +151,7 @@ def test_cert_prefix(network, args):
log_id = 101
msg = "This message will be prefixed"
c.post("/app/log/private/prefix_cert", {"id": log_id, "msg": msg})
r = c.get("/app/log/private", {"id": log_id})
r = c.get(f"/app/log/private?id={log_id}")
assert f"CN=user{user_id}" in r.body["msg"], r
else:
@ -179,11 +176,11 @@ def test_anonymous_caller(network, args):
with primary.client("user4") as c:
r = c.post("/app/log/private/anonymous", {"id": log_id, "msg": msg})
assert r.body == True
r = c.get("/app/log/private", {"id": log_id})
assert r.status == 403, r
r = c.get(f"/app/log/private?id={log_id}")
assert r.status_code == http.HTTPStatus.FORBIDDEN.value, r
with primary.client("user0") as c:
r = c.get("/app/log/private", {"id": log_id})
r = c.get(f"/app/log/private?id={log_id}")
assert msg in r.body["msg"], r
else:
@ -208,8 +205,8 @@ def test_raw_text(network, args):
msg,
headers={"content-type": "text/plain"},
)
assert r.status == http.HTTPStatus.OK.value
r = c.get("/app/log/private", {"id": log_id})
assert r.status_code == http.HTTPStatus.OK.value
r = c.get(f"/app/log/private?id={log_id}")
assert msg in r.body["msg"], r
else:
@ -241,7 +238,7 @@ def test_metrics(network, args):
with primary.client() as c:
r = c.get("/app/endpoint_metrics")
assert r.status == http.HTTPStatus.FORBIDDEN.value
assert r.status_code == http.HTTPStatus.FORBIDDEN.value
with primary.client("user0") as c:
r = c.get("/app/endpoint_metrics")
@ -261,8 +258,8 @@ def test_historical_query(network, args):
primary, _ = network.find_primary()
with primary.client() as nc:
check_commit = ccf.checker.Checker(nc)
check = ccf.checker.Checker()
check_commit = infra.checker.Checker(nc)
check = infra.checker.Checker()
with primary.client("user0") as c:
log_id = 10
@ -276,7 +273,7 @@ def test_historical_query(network, args):
check_commit(
c.post("/app/log/private", {"id": log_id, "msg": msg2}), result=True
)
check(c.get("/app/log/private", {"id": log_id}), result={"msg": msg2})
check(c.get(f"/app/log/private?id={log_id}"), result={"msg": msg2})
timeout = 15
found = False
@ -284,32 +281,31 @@ def test_historical_query(network, args):
ccf.clients.CCF_TX_VIEW_HEADER: str(view),
ccf.clients.CCF_TX_SEQNO_HEADER: str(seqno),
}
params = {"id": log_id}
end_time = time.time() + timeout
while time.time() < end_time:
get_response = c.get(
"/app/log/private/historical", params, headers=headers
f"/app/log/private/historical?id={log_id}", headers=headers
)
if get_response.status == http.HTTPStatus.ACCEPTED:
if get_response.status_code == http.HTTPStatus.ACCEPTED:
retry_after = get_response.headers.get("retry-after")
if retry_after is None:
raise ValueError(
f"Response with status {get_response.status} is missing 'retry-after' header"
f"Response with status {get_response.status_code} is missing 'retry-after' header"
)
retry_after = int(retry_after)
time.sleep(retry_after)
elif get_response.status == http.HTTPStatus.OK:
elif get_response.status_code == http.HTTPStatus.OK:
assert get_response.body["msg"] == msg, get_response
found = True
break
elif get_response.status == http.HTTPStatus.NO_CONTENT:
elif get_response.status_code == http.HTTPStatus.NO_CONTENT:
raise ValueError(
f"Historical query response claims there was no write to {log_id} at {view}.{seqno}"
)
else:
raise ValueError(
f"Unexpected response status {get_response.status}: {get_response.body}"
f"Unexpected response status code {get_response.status_code}: {get_response.body}"
)
if not found:
@ -332,19 +328,19 @@ def test_forwarding_frontends(network, args):
backup = network.find_any_backup()
with backup.client() as c:
check_commit = ccf.checker.Checker(c)
check_commit = infra.checker.Checker(c)
ack = network.consortium.get_any_active_member().ack(backup)
check_commit(ack)
with backup.client("user0") as c:
check_commit = ccf.checker.Checker(c)
check = ccf.checker.Checker()
check_commit = infra.checker.Checker(c)
check = infra.checker.Checker()
msg = "forwarded_msg"
log_id = 123
check_commit(
c.post("/app/log/private", {"id": log_id, "msg": msg}), result=True,
)
check(c.get("/app/log/private", {"id": log_id}), result={"msg": msg})
check(c.get(f"/app/log/private?id={log_id}"), result={"msg": msg})
return network
@ -356,7 +352,7 @@ def test_update_lua(network, args):
LOG.info("Updating Lua application")
primary, _ = network.find_primary()
check = ccf.checker.Checker()
check = infra.checker.Checker()
# Create a new lua application file (minimal app)
new_app_file = "new_lua_app.lua"
@ -393,7 +389,7 @@ def test_update_lua(network, args):
@reqs.description("Check for commit of every prior transaction")
@reqs.supports_methods("/node/commit", "/node/tx")
@reqs.supports_methods("/node/commit")
def test_view_history(network, args):
if args.consensus == "pbft":
# This appears to work in PBFT, but it is unacceptably slow:
@ -405,7 +401,7 @@ def test_view_history(network, args):
LOG.warning("Skipping view reconstruction in PBFT")
return network
check = ccf.checker.Checker()
check = infra.checker.Checker()
for node in network.get_joined_nodes():
with node.client("user0") as c:
@ -420,7 +416,7 @@ def test_view_history(network, args):
for seqno in range(1, commit_seqno + 1):
views = []
for view in range(1, commit_view + 1):
r = c.get("/node/tx", {"view": view, "seqno": seqno})
r = c.get(f"/node/tx?view={view}&seqno={seqno}")
check(r)
status = TxStatus(r.body["status"])
if status == TxStatus.Committed:
@ -496,12 +492,12 @@ class SentTxs:
@reqs.description("Build a list of Tx IDs, check they transition states as expected")
@reqs.supports_methods("log/private", "/node/tx")
@reqs.supports_methods("log/private")
def test_tx_statuses(network, args):
primary, _ = network.find_primary()
with primary.client("user0") as c:
check = ccf.checker.Checker()
check = infra.checker.Checker()
r = c.post("/app/log/private", {"id": 0, "msg": "Ignored"})
check(r)
# Until this tx is globally committed, poll for the status of this and some other
@ -521,7 +517,7 @@ def test_tx_statuses(network, args):
done = False
for view, seqno in SentTxs.get_all_tx_ids():
r = c.get("/node/tx", {"view": view, "seqno": seqno})
r = c.get(f"/node/tx?view={view}&seqno={seqno}")
check(r)
status = TxStatus(r.body["status"])
SentTxs.update_status(view, seqno, status)
@ -547,13 +543,13 @@ def test_primary(network, args, notifications_queue=None, verify=True):
LOG.error(f"PRIMARY {primary.pubhost}")
with primary.client() as c:
r = c.head("/node/primary")
assert r.status == http.HTTPStatus.OK.value
assert r.status_code == http.HTTPStatus.OK.value
backup = network.find_any_backup()
LOG.error(f"BACKUP {backup.pubhost}")
with backup.client() as c:
r = c.head("/node/primary")
assert r.status == http.HTTPStatus.PERMANENT_REDIRECT.value
assert r.status_code == http.HTTPStatus.PERMANENT_REDIRECT.value
assert (
r.headers["location"]
== f"https://{primary.pubhost}:{primary.rpc_port}/node/primary"

Просмотреть файл

@ -7,7 +7,7 @@ import random
import infra.network
import infra.proc
import infra.e2e_args
import ccf.checker
import infra.checker
from loguru import logger as LOG
@ -36,8 +36,8 @@ def run(args):
with primary.client() as mc:
check = ccf.checker.Checker()
check_commit = ccf.checker.Checker(mc)
check = infra.checker.Checker()
check_commit = infra.checker.Checker(mc)
for connection in scenario["connections"]:
with (
@ -54,7 +54,7 @@ def run(args):
for tx in txs:
r = client.call(
tx["method"],
params=tx["params"],
body=tx["body"],
http_verb=tx.get("verb", "POST"),
)

Просмотреть файл

@ -5,7 +5,7 @@ import math
import infra.network
import infra.proc
import infra.e2e_args
import ccf.checker
import infra.checker
import http
import suite.test_requirements as reqs
@ -42,10 +42,10 @@ def wait_for_seqno_to_commit(seqno, view, nodes):
up_to_date_f = []
for f in nodes:
with f.client() as c:
r = c.get("/node/tx", {"view": view, "seqno": seqno})
r = c.get(f"/node/tx?view={view}&seqno={seqno}")
assert (
r.status == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status}"
r.status_code == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status_code}"
status = TxStatus(r.body["status"])
if status == TxStatus.Committed:
up_to_date_f.append(f.node_id)
@ -71,7 +71,7 @@ def run(args):
with infra.network.network(
hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
check = ccf.checker.Checker()
check = infra.checker.Checker()
network.start_and_join(args)
current_view = None

Просмотреть файл

@ -2,6 +2,7 @@
# Licensed under the Apache 2.0 License.
import os
import sys
import http
import subprocess
import infra.network
import infra.path
@ -74,7 +75,7 @@ def test_user(network, args, notifications_queue=None, verify=True):
network.consortium.remove_user(primary, new_user_id)
with primary.client(f"user{new_user_id}") as c:
r = c.get("/app/log/private")
assert r.status == 403
assert r.status_code == http.HTTPStatus.FORBIDDEN.value
return network

Просмотреть файл

@ -92,8 +92,8 @@ def run(args):
response = network.consortium.get_member_by_id(
new_member_proposal.proposer_id
).withdraw(primary, new_member_proposal)
ccf.checker.Checker(c)(response)
assert response.status == http.HTTPStatus.OK.value
infra.checker.Checker(c)(response)
assert response.status_code == http.HTTPStatus.OK.value
assert response.body["state"] == ProposalState.Withdrawn.value
withdrawals_issued += 1

Просмотреть файл

@ -2,36 +2,9 @@
# Licensed under the Apache 2.0 License.
import json
import http
import time
from ccf.tx_status import TxStatus
def wait_for_global_commit(client, seqno, view, timeout=3):
"""
Given a client to a CCF network and a seqno/view pair, this function
waits for this specific commit index to be globally committed by the
network in this view.
A TimeoutError exception is raised if the commit index is not globally
committed within the given timeout.
"""
end_time = time.time() + timeout
while time.time() < end_time:
r = client.get("/node/tx", {"view": view, "seqno": seqno})
assert (
r.status == http.HTTPStatus.OK
), f"tx request returned HTTP status {r.status}"
status = TxStatus(r.body["status"])
if status == TxStatus.Committed:
return
elif status == TxStatus.Invalid:
raise RuntimeError(
f"Transaction ID {view}.{seqno} is marked invalid and will never be committed"
)
else:
time.sleep(0.1)
raise TimeoutError("Timed out waiting for commit")
from ccf.commit import wait_for_commit
class Checker:
@ -45,8 +18,8 @@ class Checker:
if error is not None:
if callable(error):
assert error(
rpc_result.status, rpc_result.body
), f"{rpc_result.status}: {rpc_result.body}"
rpc_result.status_code, rpc_result.body
), f"{rpc_result.status_code}: {rpc_result.body}"
else:
assert rpc_result.body == error, "Expected {}, got {}".format(
error, rpc_result.body
@ -64,7 +37,7 @@ class Checker:
assert rpc_result.seqno and rpc_result.view, rpc_result
if self.client:
wait_for_global_commit(self.client, rpc_result.seqno, rpc_result.view)
wait_for_commit(self.client, rpc_result.seqno, rpc_result.view)
if self.notification_queue:
end_time = time.time() + timeout

Просмотреть файл

@ -8,7 +8,7 @@ import http
import random
import infra.network
import infra.proc
import ccf.checker
import infra.checker
import infra.node
import infra.crypto
import infra.member
@ -156,7 +156,7 @@ class Consortium:
accept=True,
wait_for_global_commit=wait_for_global_commit,
)
assert response.status == http.HTTPStatus.OK.value
assert response.status_code == http.HTTPStatus.OK.value
proposal.state = infra.proposal.ProposalState(response.body["state"])
proposal.increment_votes_for()
@ -177,7 +177,7 @@ class Consortium:
proposals = []
with remote_node.client(f"member{self.get_any_active_member().member_id}") as c:
r = c.post("/gov/query", {"text": script})
assert r.status == http.HTTPStatus.OK.value
assert r.status_code == http.HTTPStatus.OK.value
for proposal_id, attr in r.body.items():
has_proposer_voted_for = False
for vote in attr["votes"]:
@ -231,7 +231,7 @@ class Consortium:
proposal = self.get_any_active_member().propose(remote_node, proposal_body)
proposal.vote_for = careful_vote
self.vote_using_majority(remote_node, proposal)
member_to_retire.status = infra.member.MemberStatus.RETIRED
member_to_retire.status_code = infra.member.MemberStatus.RETIRED
def open_network(self, remote_node, pbft_open=False):
"""
@ -304,7 +304,7 @@ class Consortium:
def recover_with_shares(self, remote_node, defunct_network_enc_pubk):
submitted_shares_count = 0
with remote_node.client() as nc:
check_commit = ccf.checker.Checker(nc)
check_commit = infra.checker.Checker(nc)
for m in self.get_active_members():
r = m.get_and_submit_recovery_share(
@ -348,10 +348,7 @@ class Consortium:
"""
# When opening the service in PBFT, the first transaction to be
# completed when f = 1 takes a significant amount of time
with remote_node.client(
f"member{self.get_any_active_member().member_id}",
request_timeout=(30 if pbft_open else 3),
) as c:
with remote_node.client(f"member{self.get_any_active_member().member_id}") as c:
r = c.post(
"/gov/query",
{
@ -361,7 +358,7 @@ class Consortium:
LOG_DEBUG("Service is nil")
else
LOG_DEBUG("Service version: ", tostring(service.version))
LOG_DEBUG("Service status: ", tostring(service.status))
LOG_DEBUG("Service status: ", tostring(service.status_code))
cert_len = #service.cert
LOG_DEBUG("Service cert len: ", tostring(cert_len))
LOG_DEBUG("Service cert bytes: " ..
@ -373,6 +370,7 @@ class Consortium:
return service
"""
},
timeout=(30 if pbft_open else 3),
)
current_status = r.body["status"]
current_cert = array.array("B", r.body["cert"]).tobytes()
@ -391,7 +389,7 @@ class Consortium:
with remote_node.client(f"member{self.get_any_active_member().member_id}") as c:
r = c.post("/gov/read", {"table": "ccf.nodes", "key": node_id})
if r.status != 200 or (
if r.status_code != http.HTTPStatus.OK.value or (
node_status and r.body["status"] != node_status.name
):
return False

Просмотреть файл

@ -1,10 +1,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import ccf.checker
import infra.checker
import ccf.clients
import suite.test_requirements as reqs
import time
import http
from loguru import logger as LOG
@ -89,8 +90,8 @@ class LoggingTxs:
):
LOG.success(f"Applying {number_txs} logging txs to node {remote_node.node_id}")
with remote_node.client() as mc:
check_commit = ccf.checker.Checker(mc)
check_commit_n = ccf.checker.Checker(mc, self.notifications_queue)
check_commit = infra.checker.Checker(mc)
check_commit_n = infra.checker.Checker(mc, self.notifications_queue)
with remote_node.client(self.user) as uc:
for _ in range(number_txs):
@ -161,12 +162,12 @@ class LoggingTxs:
end_time = time.time() + timeout
while time.time() < end_time:
with node.client(self.user) as uc:
rep = uc.get(cmd, {"id": idx})
if rep.status == 404:
rep = uc.get(f"{cmd}?id={idx}")
if rep.status_code == http.HTTPStatus.NOT_FOUND.value:
LOG.warning("User frontend is not yet opened")
time.sleep(0.1)
else:
check = ccf.checker.Checker(uc)
check = infra.checker.Checker(uc)
check(
rep, result={"msg": txs[idx]},
)

Просмотреть файл

@ -7,7 +7,7 @@ import infra.node
import infra.proposal
import infra.crypto
import ccf.clients
import ccf.checker
import infra.checker
import http
import os
import base64
@ -29,7 +29,7 @@ class Member:
def __init__(self, member_id, curve, common_dir, share_script, key_generator=None):
self.common_dir = common_dir
self.member_id = member_id
self.status = MemberStatus.ACCEPTED
self.status_code = MemberStatus.ACCEPTED
self.share_script = share_script
if key_generator is not None:
@ -59,16 +59,16 @@ class Member:
)
def is_active(self):
return self.status == MemberStatus.ACTIVE
return self.status_code == MemberStatus.ACTIVE
def set_active(self):
# Use this with caution (i.e. only when the network is opening)
self.status = MemberStatus.ACTIVE
self.status_code = MemberStatus.ACTIVE
def propose(self, remote_node, proposal, has_proposer_voted_for=True):
with remote_node.client(f"member{self.member_id}") as mc:
r = mc.post("/gov/proposals", proposal, signed=True,)
if r.status != http.HTTPStatus.OK.value:
if r.status_code != http.HTTPStatus.OK.value:
raise infra.proposal.ProposalNotCreated(r)
return infra.proposal.Proposal(
@ -84,11 +84,11 @@ class Member:
with remote_node.client(f"member{self.member_id}") as mc:
r = mc.post(
f"/gov/proposals/{proposal.proposal_id}/votes",
params=proposal.vote_for,
body=proposal.vote_for,
signed=True,
)
if r.status != 200:
if r.status_code != 200:
return r
# If the proposal was accepted, wait for it to be globally committed
@ -103,37 +103,37 @@ class Member:
# can only commit after it has successfully joined and caught up.
# Given that the retry timer on join RPC is 4 seconds, anything less is very
# likely to time out!
ccf.checker.wait_for_global_commit(mc, r.seqno, r.view, timeout=6)
ccf.commit.wait_for_commit(mc, r.seqno, r.view, timeout=6)
return r
def withdraw(self, remote_node, proposal):
with remote_node.client(f"member{self.member_id}") as c:
r = c.post(f"/gov/proposals/{proposal.proposal_id}/withdraw", signed=True)
if r.status == http.HTTPStatus.OK.value:
if r.status_code == http.HTTPStatus.OK.value:
proposal.state = infra.proposal.ProposalState.Withdrawn
return r
def update_ack_state_digest(self, remote_node):
with remote_node.client(f"member{self.member_id}") as mc:
r = mc.post("/gov/ack/update_state_digest")
assert r.status == 200, f"Error ack/update_state_digest: {r}"
assert r.status_code == 200, f"Error ack/update_state_digest: {r}"
return bytearray(r.body["state_digest"])
def ack(self, remote_node):
state_digest = self.update_ack_state_digest(remote_node)
with remote_node.client(f"member{self.member_id}") as mc:
r = mc.post(
"/gov/ack", params={"state_digest": list(state_digest)}, signed=True
"/gov/ack", body={"state_digest": list(state_digest)}, signed=True
)
assert r.status == 200, f"Error ACK: {r}"
self.status = MemberStatus.ACTIVE
assert r.status_code == 200, f"Error ACK: {r}"
self.status_code = MemberStatus.ACTIVE
return r
def get_and_decrypt_recovery_share(self, remote_node, defunct_network_enc_pubk):
with remote_node.client(f"member{self.member_id}") as mc:
r = mc.get("/gov/recovery_share")
if r.status != http.HTTPStatus.OK.value:
if r.status_code != http.HTTPStatus.OK.value:
raise NoRecoveryShareFound(r)
ctx = infra.crypto.CryptoBoxCtx(

Просмотреть файл

@ -520,10 +520,10 @@ class Network:
end_time = time.time() + timeout
while time.time() < end_time:
for node in self.get_joined_nodes():
with node.client(request_timeout=request_timeout) as c:
with node.client() as c:
try:
res = c.get("/node/primary_info")
if res.status == 200:
res = c.get("/node/primary_info", timeout=request_timeout)
if res.status_code == 200:
primary_id = res.body["primary_id"]
view = res.body["current_view"]
break
@ -580,8 +580,8 @@ class Network:
caught_up_nodes = []
for node in self.get_joined_nodes():
with node.client() as c:
resp = c.get("/node/tx", {"view": view, "seqno": seqno})
if resp.status != 200:
resp = c.get(f"/node/tx?view={view}&seqno={seqno}")
if resp.status_code != 200:
# Node may not have joined the network yet, try again
break
status = TxStatus(resp.body["status"])

Просмотреть файл

@ -236,7 +236,7 @@ class Node:
with self.client(connection_timeout=timeout) as nc:
rep = nc.get("/node/commit")
assert (
rep.status == 200
rep.status_code == 200
), f"An error occured after node {self.node_id} joined the network: {rep.body}"
except ccf.clients.CCFConnectionException:
raise TimeoutError(f"Node {self.node_id} failed to join the network")
@ -254,7 +254,6 @@ class Node:
else None,
"ca": os.path.join(self.common_dir, "networkcert.pem"),
"description": f"node {self.node_id} as {identity or 'unauthenticated'}",
"binary_dir": self.binary_dir,
}
akwargs.update(kwargs)
return ccf.clients.client(self.pubhost, self.rpc_port, **akwargs)

Просмотреть файл

@ -80,7 +80,7 @@ def test_missing_signature(network, args):
member = network.consortium.get_any_active_member()
with primary.client(f"member{member.member_id}") as mc:
r = mc.post("/gov/proposals", signed=False)
assert r.status == http.HTTPStatus.UNAUTHORIZED, r.status
assert r.status_code == http.HTTPStatus.UNAUTHORIZED, r.status_code
www_auth = "www-authenticate"
assert www_auth in r.headers, r.headers
auth_header = r.headers[www_auth]
@ -187,25 +187,25 @@ def run(args):
assert (
network.consortium.get_member_by_id(0)
.vote(primary, new_member_proposal, accept=True)
.status
.status_code
== params_error
)
assert (
network.consortium.get_member_by_id(0)
.vote(primary, new_member_proposal, accept=False)
.status
.status_code
== params_error
)
assert (
network.consortium.get_member_by_id(1)
.vote(primary, new_member_proposal, accept=True)
.status
.status_code
== params_error
)
assert (
network.consortium.get_member_by_id(1)
.vote(primary, new_member_proposal, accept=False)
.status
.status_code
== params_error
)
@ -213,7 +213,7 @@ def run(args):
response = network.consortium.get_member_by_id(
new_member_proposal.proposer_id
).withdraw(primary, new_member_proposal)
assert response.status == params_error
assert response.status_code == params_error
LOG.info("New non-active member should get insufficient rights response")
proposal_trust_0, careful_vote = ccf.proposal_generator.trust_node(
@ -225,7 +225,7 @@ def run(args):
False
), "New non-active member should get insufficient rights response"
except infra.proposal.ProposalNotCreated as e:
assert e.response.status == http.HTTPStatus.FORBIDDEN.value
assert e.response.status_code == http.HTTPStatus.FORBIDDEN.value
LOG.debug("New member ACK")
new_member.ack(primary)
@ -248,11 +248,11 @@ def run(args):
response = network.consortium.get_member_by_id(1).withdraw(
primary, trust_node_proposal
)
assert response.status == http.HTTPStatus.FORBIDDEN.value
assert response.status_code == http.HTTPStatus.FORBIDDEN.value
LOG.debug("Proposer withdraws their proposal")
response = new_member.withdraw(primary, trust_node_proposal)
assert response.status == http.HTTPStatus.OK.value
assert response.status_code == http.HTTPStatus.OK.value
assert trust_node_proposal.state == infra.proposal.ProposalState.Withdrawn
proposals = network.consortium.get_proposals(primary)
@ -265,14 +265,14 @@ def run(args):
LOG.debug("Further withdraw proposals fail")
response = new_member.withdraw(primary, trust_node_proposal)
assert response.status == params_error
assert response.status_code == params_error
LOG.debug("Further votes fail")
response = new_member.vote(primary, trust_node_proposal, accept=True)
assert response.status == params_error
assert response.status_code == params_error
response = new_member.vote(primary, trust_node_proposal, accept=False)
assert response.status == params_error
assert response.status_code == params_error
# Membership changes trigger re-sharing and re-keying and are
# only supported with Raft
@ -289,7 +289,7 @@ def run(args):
)
assert False, "Retired member cannot make a new proposal"
except infra.proposal.ProposalNotCreated as e:
assert e.response.status == http.HTTPStatus.FORBIDDEN.value
assert e.response.status_code == http.HTTPStatus.FORBIDDEN.value
assert e.response.body == "Member is not active"
LOG.debug("New member should still be able to make a new proposal")

Просмотреть файл

@ -6,7 +6,7 @@ import infra.notification
import infra.net
import suite.test_requirements as reqs
import infra.e2e_args
import ccf.checker
import infra.checker
from loguru import logger as LOG
@ -18,8 +18,8 @@ def test(network, args, notifications_queue=None):
primary, _ = network.find_primary_and_any_backup()
with primary.client() as mc:
check_commit = ccf.checker.Checker(mc, notifications_queue)
check = ccf.checker.Checker()
check_commit = infra.checker.Checker(mc, notifications_queue)
check = infra.checker.Checker()
msg = "Hello world"
@ -27,7 +27,7 @@ def test(network, args, notifications_queue=None):
with primary.client("user0") as c:
r = c.post("/app/log/private", {"id": 42, "msg": msg})
check_commit(r, result=True)
check(c.get("/app/log/private", {"id": 42}), result={"msg": msg})
check(c.get("/app/log/private?id=42"), result={"msg": msg})
for _ in range(10):
c.post(
"/app/log/private", {"id": 43, "msg": "Additional messages"},
@ -36,7 +36,7 @@ def test(network, args, notifications_queue=None):
c.post("/app/log/private", {"id": 43, "msg": "A final message"}),
result=True,
)
r = c.get("/app/receipt", {"commit": r.seqno})
r = c.get(f"/app/receipt?commit={r.seqno}")
check(
c.post("/app/receipt/verify", {"receipt": r.body["receipt"]}),
result={"valid": True},

Просмотреть файл

@ -3,7 +3,7 @@
import infra.e2e_args
import infra.network
import infra.logging_app as app
import ccf.checker
import infra.checker
import suite.test_requirements as reqs
import time
@ -51,7 +51,7 @@ def test_share_resilience(network, args):
last_member_to_submit = m
break
check_commit = ccf.checker.Checker(nc)
check_commit = infra.checker.Checker(nc)
check_commit(
m.get_and_submit_recovery_share(primary, defunct_network_enc_pubk)
)

Просмотреть файл

@ -7,7 +7,7 @@ import http
import infra.network
import infra.proc
import infra.e2e_args
import ccf.checker
import infra.checker
from loguru import logger as LOG
@ -41,9 +41,7 @@ def run(args):
for method in [m["path"] for m in methods]:
schema_found = False
schema_response = client.get(
f"/{prefix}/api/schema", params={"method": method}
)
schema_response = client.get(f'/{prefix}/api/schema?method="{method}"')
check(
schema_response,
error=lambda status, msg: status == http.HTTPStatus.OK.value,
@ -92,7 +90,7 @@ def run(args):
network.start_and_join(args)
primary, _ = network.find_primary()
check = ccf.checker.Checker()
check = infra.checker.Checker()
with primary.client("user0") as user_client:
LOG.info("user frontend")

Просмотреть файл

@ -9,7 +9,7 @@
"transactions": [
{
"method": "/app/log/private",
"params": {
"body": {
"id": 42,
"msg": "Hello world"
},
@ -17,10 +17,8 @@
},
{
"verb": "GET",
"method": "/app/log/private",
"params": {
"id": 42
},
"method": "/app/log/private?id=42",
"body": {},
"expected_result": {
"msg": "Hello world"
}
@ -32,7 +30,7 @@
"transactions": [
{
"method": "/app/log/private",
"params": {
"body": {
"id": 42,
"msg": "Hello world"
},
@ -40,10 +38,8 @@
},
{
"verb": "GET",
"method": "/app/log/private",
"params": {
"id": 42
},
"method": "/app/log/private?id=42",
"body": {},
"expected_result": {
"msg": "Hello world"
}

Просмотреть файл

@ -4,10 +4,24 @@ import infra.e2e_args
import infra.network
import time
import sys
import json
import os
from loguru import logger as LOG
def dump_network_info(path, network, node):
network_info = {}
network_info["host"] = node.pubhost
network_info["port"] = node.rpc_port
network_info["common_dir"] = network.common_dir
with open(path, "w") as network_info_file:
json.dump(network_info, network_info_file)
LOG.debug(f"Dumped network information to {os.path.abspath(path)}")
def run(args):
hosts = args.node or ["localhost"] * 3
@ -50,10 +64,16 @@ def run(args):
for b in backups:
LOG.info(" Node [{:2d}] = {}:{}".format(b.node_id, b.pubhost, b.rpc_port))
# Dump primary info to file for tutorial testing
if args.network_info_file is not None:
dump_network_info(args.network_info_file, network, primary)
LOG.info(
f"You can now issue business transactions to the {args.package} application."
)
LOG.info(f"Certificates have been copied to {network.common_dir}")
LOG.info(
f"Keys and certificates have been copied to the common folder: {network.common_dir}"
)
LOG.info(
"See https://microsoft.github.io/CCF/users/issue_commands.html for more information."
)
@ -91,6 +111,11 @@ if __name__ == "__main__":
action="store_true",
default=False,
)
parser.add_argument(
"--network-info-file",
help="Path to output file where network information will be dumped to (useful for scripting)",
default=None,
)
parser.add_argument(
"-r",
"--recover",

51
tests/test_install.sh Executable file
Просмотреть файл

@ -0,0 +1,51 @@
#!/bin/bash
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
set -ex
if [ "$#" -ne 1 ]; then
echo "Install prefix should be passed as first argument to $0"
exit 1
fi
echo "Install prefix is ${1}"
# Setup env
INSTALL_PREFIX="$1"
mkdir -p nested/run
cd nested/run
python3.7 -m venv env
source env/bin/activate
python -m pip install -U -r "$INSTALL_PREFIX"/bin/requirements.txt
pip freeze > "$INSTALL_PREFIX"/bin/requirements.txt
# Start ephemeral network in the background
network_info_file="network_info.txt"
timeout --signal=SIGINT --kill-after=30s --preserve-status 30s \
python "$INSTALL_PREFIX"/bin/start_network.py \
-p ../../../build/liblogging \
-b "$INSTALL_PREFIX"/bin \
-g "$(pwd)"/../../../src/runtime_config/gov.lua \
--network-info-file "$network_info_file" \
-v &
# Issue tutorial transactions to ephemeral network
sleep 20
python ../../../python/tutorial.py "$network_info_file"
sleep 15
# Recover network
cp -r ./workspace/start_network_0/0.ledger .
cp ./workspace/start_network_0/network_enc_pubk.pem .
timeout --signal=SIGINT --kill-after=30s --preserve-status 30s \
python "$INSTALL_PREFIX"/bin/start_network.py \
-p ../../../build/liblogging \
-b "$INSTALL_PREFIX"/bin \
-v \
--recover \
--ledger-dir 0.ledger \
--network-enc-pubk network_enc_pubk.pem \
--common-dir ./workspace/start_network_common/

Просмотреть файл

@ -32,11 +32,11 @@ def test(network, args, notifications_queue=None):
with other.client("user0") as nc:
while time.time() < end_time:
r = nc.post("/app/log/private", {"id": 42, "msg": msg * i})
if r.status == 200:
if r.status_code == 200:
break
else:
time.sleep(0.1)
assert r.status == 200, r
assert r.status_code == 200, r
LOG.info("Write on secondary through forwarding")
with other.client("user0", ws=True) as c: