зеркало из https://github.com/microsoft/CCF.git
Add basic batched app load test (#568)
* Rename lua_logging_app requiremet to lua_generic_app * Add skeleton batched app, e2e test * Working e2e_batched test * Test until we crash * Back off around limit of batch sizes * Tidy test case for initial PR * Increment id inside loop * Remove unnecessary decode * Disable large batches for CI * Improve error handling for CurlClient * Batched sends large requests - don't run in HTTP
This commit is contained in:
Родитель
0d2830fc74
Коммит
6cddb38c6e
|
@ -324,6 +324,12 @@ if(BUILD_TESTS)
|
|||
--enforce-reqs
|
||||
)
|
||||
|
||||
add_e2e_test(
|
||||
NAME lua_end_to_end_batched
|
||||
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_batched.py
|
||||
ADDITIONAL_ARGS
|
||||
--app-script ${CMAKE_SOURCE_DIR}/src/apps/batched/batched.lua)
|
||||
|
||||
if (BUILD_SMALLBANK)
|
||||
include(${CMAKE_CURRENT_SOURCE_DIR}/samples/apps/smallbank/smallbank.cmake)
|
||||
endif()
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
-- Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
-- Licensed under the Apache 2.0 License.
|
||||
|
||||
return {
|
||||
__environment = [[
|
||||
function env.jsucc(result)
|
||||
return {result = result}
|
||||
end
|
||||
|
||||
function env.jerr(code, message)
|
||||
return {error = {code = code, message = message}}
|
||||
end
|
||||
]],
|
||||
|
||||
BATCH_submit = [[
|
||||
tables, gov_tables, args = ...
|
||||
count = 0
|
||||
for n, e in ipairs(args.params) do
|
||||
id = e.id
|
||||
msg = e.msg
|
||||
tables.priv0:put(id, msg)
|
||||
count = count + 1
|
||||
end
|
||||
return env.jsucc(count)
|
||||
]],
|
||||
|
||||
BATCH_fetch = [[
|
||||
tables, gov_tables, args = ...
|
||||
results = {}
|
||||
for n, id in ipairs(args.params) do
|
||||
table.insert(results, {id = id, msg = tables.priv0:get(id)})
|
||||
end
|
||||
return env.jsucc(results)
|
||||
]]
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
# Enclave settings:
|
||||
Debug=1
|
||||
NumHeapPages=50000
|
||||
NumStackPages=1024
|
||||
NumTCS=8
|
||||
ProductID=1
|
||||
SecurityVersion=1
|
|
@ -0,0 +1,80 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the Apache 2.0 License.
|
||||
from hashlib import md5
|
||||
import itertools
|
||||
import time
|
||||
|
||||
import infra.ccf
|
||||
import infra.proc
|
||||
import infra.jsonrpc
|
||||
import infra.notification
|
||||
import infra.net
|
||||
import suite.test_requirements as reqs
|
||||
import e2e_args
|
||||
|
||||
from loguru import logger as LOG
|
||||
|
||||
id_gen = itertools.count()
|
||||
|
||||
|
||||
@reqs.lua_generic_app
|
||||
def test(network, args, batch_size=100):
|
||||
LOG.info(f"Running batch submission of {batch_size} new entries")
|
||||
primary, _ = network.find_primary()
|
||||
|
||||
with primary.user_client() as c:
|
||||
message_ids = [next(id_gen) for _ in range(batch_size)]
|
||||
messages = [
|
||||
{"id": i, "msg": f"A unique message: {md5(bytes(i)).hexdigest()}"}
|
||||
for i in message_ids
|
||||
]
|
||||
|
||||
pre_submit = time.time()
|
||||
submit_response = c.rpc("BATCH_submit", messages)
|
||||
post_submit = time.time()
|
||||
LOG.warning(
|
||||
f"Submitting {batch_size} new keys took {post_submit - pre_submit}s"
|
||||
)
|
||||
assert submit_response.result == len(messages)
|
||||
|
||||
fetch_response = c.rpc("BATCH_fetch", message_ids)
|
||||
assert fetch_response.result is not None
|
||||
assert len(fetch_response.result) == len(message_ids)
|
||||
for n, m in enumerate(messages):
|
||||
fetched = fetch_response.result[n]
|
||||
assert m["id"] == fetched["id"]
|
||||
assert m["msg"] == fetched["msg"]
|
||||
|
||||
return network
|
||||
|
||||
|
||||
def run(args):
|
||||
hosts = ["localhost", "localhost", "localhost"]
|
||||
|
||||
with infra.ccf.network(
|
||||
hosts, args.build_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
|
||||
) as network:
|
||||
network.start_and_join(args)
|
||||
|
||||
network = test(network, args, batch_size=1)
|
||||
network = test(network, args, batch_size=10)
|
||||
network = test(network, args, batch_size=100)
|
||||
network = test(network, args, batch_size=1000)
|
||||
|
||||
# TODO: CI already takes ~25s for batch of 10k, so avoid large batches for now
|
||||
# bs = 10000
|
||||
# step_size = 10000
|
||||
|
||||
# # TODO: This tests fails with larger batch sizes, and with any transaction
|
||||
# # larger than ~2MB. Investigate why, then expand this test
|
||||
# while bs <= 30000:
|
||||
# network = test(network, args, batch_size=bs)
|
||||
# bs += step_size
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = e2e_args.cli_args()
|
||||
args.package = "libluagenericenc"
|
||||
args.enforce_reqs = True
|
||||
|
||||
run(args)
|
|
@ -18,7 +18,7 @@ import e2e_args
|
|||
from loguru import logger as LOG
|
||||
|
||||
|
||||
@reqs.lua_logging_app
|
||||
@reqs.lua_generic_app
|
||||
def test_update_lua(network, args):
|
||||
if args.package == "libluagenericenc":
|
||||
LOG.info("Updating Lua application")
|
||||
|
@ -111,7 +111,7 @@ def test(network, args, notifications_queue=None):
|
|||
c.rpc("LOG_record", {"id": id, "msg": long_msg}), result=True,
|
||||
)
|
||||
check(c.rpc("LOG_get", {"id": id}), result={"msg": long_msg})
|
||||
id += 1
|
||||
id += 1
|
||||
|
||||
return network
|
||||
|
||||
|
|
|
@ -420,9 +420,10 @@ class CurlClient:
|
|||
cmd.extend(["--cert", self.cert])
|
||||
LOG.debug(f"Running: {' '.join(cmd)}")
|
||||
rc = subprocess.run(cmd, capture_output=True)
|
||||
LOG.debug(f"Received {rc.stdout.decode()}")
|
||||
LOG.debug(f"Received {rc.stdout}")
|
||||
if rc.returncode != 0:
|
||||
LOG.debug(f"ERR {rc.stderr.decode()}")
|
||||
LOG.error(rc.stderr)
|
||||
raise RuntimeError("Curl failed")
|
||||
self.stream.update(rc.stdout)
|
||||
return r.id
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import infra.remote
|
|||
import infra.net
|
||||
import infra.path
|
||||
import infra.jsonrpc
|
||||
import time
|
||||
|
||||
from loguru import logger as LOG
|
||||
|
||||
|
@ -174,9 +175,12 @@ class Node:
|
|||
"""
|
||||
for _ in range(timeout):
|
||||
with self.node_client() as mc:
|
||||
rep = mc.do("getCommit", {})
|
||||
if rep.error == None and rep.result is not None:
|
||||
return
|
||||
try:
|
||||
rep = mc.do("getCommit", {})
|
||||
if rep.error == None and rep.result is not None:
|
||||
return
|
||||
except:
|
||||
pass
|
||||
time.sleep(1)
|
||||
raise TimeoutError(f"Node {self.node_id} failed to join the network")
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ def logging_app(func):
|
|||
return wrapper
|
||||
|
||||
|
||||
def lua_logging_app(func):
|
||||
def lua_generic_app(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if args[1].enforce_reqs is False:
|
||||
|
@ -72,7 +72,7 @@ def lua_logging_app(func):
|
|||
# Lua app is by looking at the package passed to the nodes at startup
|
||||
args_ = args[1]
|
||||
if args_.package is not "libluagenericenc":
|
||||
raise TestRequirementsNotMet("Lua logging app not installed")
|
||||
raise TestRequirementsNotMet("Lua generic app not installed")
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче