Basic consistency TV app and client (#6116)

This commit is contained in:
Amaury Chamayou 2024-04-11 10:32:11 +01:00 коммит произвёл GitHub
Родитель cf812afe35
Коммит 1d094c5150
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
5 изменённых файлов: 226 добавлений и 0 удалений

Просмотреть файл

@ -1432,6 +1432,11 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/connections.py
)
add_e2e_test(
NAME consistency_trace_validation
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/consistency_trace_validation.py
)
add_e2e_test(
NAME fuzz_test PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/fuzzing.py
)

Просмотреть файл

@ -0,0 +1,22 @@
{
"endpoints": {
"/records/{key}": {
"get": {
"js_module": "basic.js",
"js_function": "get_record",
"forwarding_required": "always",
"authn_policies": ["no_auth"],
"mode": "readonly",
"openapi": {}
},
"put": {
"js_module": "basic.js",
"js_function": "put_record",
"forwarding_required": "always",
"authn_policies": ["no_auth"],
"mode": "readwrite",
"openapi": {}
}
}
}
}

Просмотреть файл

@ -0,0 +1,46 @@
let records_table = ccf.kv["records"];
export function put_record(request) {
const key = request.params.key;
if (key === undefined) {
return { statusCode: 404, body: "Missing key" };
}
records_table.set(ccf.strToBuf(key), request.body.arrayBuffer());
return {
statusCode: 204,
};
}
export function get_record(request) {
const key = request.params.key;
if (key === undefined) {
return { statusCode: 404, body: "Missing key" };
}
const val = records_table.get(ccf.strToBuf(key));
if (val === undefined) {
return { statusCode: 404, body: "No such key" };
}
return {
statusCode: 200,
headers: {
"content-type": "text/plain",
},
body: val,
};
}
export function post_records(request) {
const records = request.body.json();
for (let key in records) {
records_table.set(ccf.strToBuf(key), ccf.strToBuf(records[key]));
}
return {
statusCode: 204,
};
}

Просмотреть файл

@ -0,0 +1,46 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import infra.e2e_args
import subprocess
import sys
import time
def run(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
targets = [
f"https://{n.get_public_rpc_address()}" for n in network.get_joined_nodes()
]
cli = [sys.executable, "../tests/tvc.py"]
for target in targets:
cli.append("-t")
cli.append(target)
cli.append("--ca")
cli.append(network.cert_path)
with open("consistency_trace.ndjson", "w") as trace_file:
tvc = subprocess.Popen(cli, stdout=trace_file)
# Do some normal transactions
time.sleep(2)
# Suspend the primary long enough to cause an election
primary, _ = network.find_primary()
primary.suspend()
time.sleep(network.election_duration * 2)
primary.resume()
# Do some more transactions
time.sleep(5)
tvc.send_signal(subprocess.signal.SIGINT)
tvc.wait()
if __name__ == "__main__":
args = infra.e2e_args.cli_args()
args.package = "libjs_generic"
args.js_app_bundle = "../samples/apps/basic_tv/js/"
args.nodes = infra.e2e_args.nodes(args, 5)
# Long signature interval to maximise the chace of an InvalidStatus transaction
args.sig_ms_interval = 1000
run(args)

107
tests/tvc.py Normal file
Просмотреть файл

@ -0,0 +1,107 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import httpx
import random
import json
import argparse
"""
1. Run sandbox
~/CCF/build$ ../tests/sandbox/sandbox.sh --js-app-bundle ../samples/apps/basic_tv/js/ -n local://127.0.0.1:8000 -n local://127.0.0.1:8001
...
2. Run tvc.py
~/CCF/tests$ python3 tvc.py -t https://127.0.0.1:8000 -t https://127.0.0.1:8001 --ca ../build/workspace/sandbox_common/service_cert.pem
3. Trace is printed to stdout
{"action": "RwTxRequestAction", "type": "RwTxRequest", "tx": 0}
{"action": "RwTxResponseAction", "type": "RwTxRequest", "tx": 0, "tx_id": [2, 197]}
{"action": "StatusCommittedResponseAction", "type": "TxStatusReceived", "tx_id": [2, 197], "status": "CommittedStatus"}
{"action": "RwTxRequestAction", "type": "RwTxRequest", "tx": 1}
{"action": "RwTxResponseAction", "type": "RwTxRequest", "tx": 1, "tx_id": [2, 199]}
{"action": "StatusCommittedResponseAction", "type": "TxStatusReceived", "tx_id": [2, 199], "status": "CommittedStatus"}
{"action": "RwTxRequestAction", "type": "RwTxRequest", "tx": 2}
{"action": "RwTxResponseAction", "type": "RwTxRequest", "tx": 2, "tx_id": [2, 201]}
"""
KEY = "0"
VALUE = "value"
def log(**kwargs):
print(json.dumps(kwargs))
def tx_id(string):
view, seqno = string.split(".")
return int(view), int(seqno)
def run(targets, cacert):
session = httpx.Client(verify=cacert)
tx = 0
while True:
target = random.choice(targets)
# Always start with a write, to avoid having to handle missing values
txtype = random.choice(["Ro", "Rw"]) if tx else "Rw"
if txtype == "Ro":
response = session.get(f"{target}/records/{KEY}")
if response.status_code == 200:
log(action=f"{txtype}TxRequestAction", type=f"{txtype}TxRequest", tx=tx)
assert response.text == VALUE
txid = response.headers["x-ms-ccf-transaction-id"]
log(
action="RoTxResponseAction",
type="RoTxRequest",
tx=tx,
tx_id=tx_id(txid),
)
elif txtype == "Rw":
response = session.put(f"{target}/records/{KEY}", data=VALUE)
if response.status_code == 204:
log(action=f"{txtype}TxRequestAction", type=f"{txtype}TxRequest", tx=tx)
txid = response.headers["x-ms-ccf-transaction-id"]
log(
action="RwTxResponseAction",
type="RwTxRequest",
tx=tx,
tx_id=tx_id(txid),
)
status = "Pending"
final = False
while not final:
response = session.get(f"{target}/tx?transaction_id={txid}")
if response.status_code == 200:
status = response.json()["status"]
if status in ("Committed", "Invalid"):
log(
action=f"Status{status}ResponseAction",
type="TxStatusReceived",
tx_id=tx_id(txid),
status=f"{status}Status",
)
final = True
else:
# if our target has gone away or does not know who is primary, try another one
target = random.choice(targets)
else:
raise ValueError(f"Unknown Tx type: {txtype}")
tx += 1
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run --txs steps, a ~50% mix of reads and writes, randomly distributed across --target nodes"
)
parser.add_argument("-t", "--target", help="Host to connect to", action="append")
parser.add_argument("--ca", help="CA for the server")
args = parser.parse_args()
try:
run(args.target, args.ca)
except KeyboardInterrupt:
pass