CCF/tests/js-custom-authorization/custom_authorization.py

1345 строки
49 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
import http
import infra.network
import infra.path
import infra.proc
import infra.net
import infra.e2e_args
import suite.test_requirements as reqs
from infra.runner import ConcurrentRunner
import os
import tempfile
import base64
import json
import time
import infra.jwt_issuer
import datetime
import re
import uuid
from http import HTTPStatus
import subprocess
from contextlib import contextmanager
from loguru import logger as LOG
@reqs.description("Test custom authorization")
def test_custom_auth(network, args):
primary, _ = network.find_nodes()
with primary.client("user0") as c:
r = c.get("/app/custom_auth", headers={"Authorization": "Bearer 42"})
assert r.status_code == http.HTTPStatus.OK, r.status_code
assert r.body.json()
return network
def run(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
network = test_custom_auth(network, args)
# Context manager to temporarily set JS execution limits.
# NB: Limits are currently applied to governance runtimes as well, so limits
# must be high enough that a proposal to restore the defaults can pass.
@contextmanager
def temporary_js_limits(network, primary, **kwargs):
with primary.client() as c:
# fetch defaults from js_metrics endpoint
r = c.get("/node/js_metrics")
assert r.status_code == http.HTTPStatus.OK, r.status_code
body = r.body.json()
default_max_heap_size = body["max_heap_size"]
default_max_stack_size = body["max_stack_size"]
default_max_execution_time = body["max_execution_time"]
default_kwargs = {
"max_heap_bytes": default_max_heap_size,
"max_stack_bytes": default_max_stack_size,
"max_execution_time_ms": default_max_execution_time,
"return_exception_details": True,
}
temp_kwargs = default_kwargs.copy()
temp_kwargs.update(**kwargs)
LOG.info(f"Setting JS runtime options: {temp_kwargs}")
network.consortium.set_js_runtime_options(
primary,
**temp_kwargs,
)
yield
# Restore defaults
network.consortium.set_js_runtime_options(primary, **default_kwargs)
def set_issuer_with_a_key(primary, network, issuer, kid, constraint):
with tempfile.NamedTemporaryFile(prefix="ccf", mode="w+") as metadata_fp:
jwt_cert_der = infra.crypto.cert_pem_to_der(issuer.cert_pem)
der_b64 = base64.b64encode(jwt_cert_der).decode("ascii")
data = {
"issuer": issuer.issuer_url,
"auto_refresh": False,
"jwks": {
"keys": [
{
"kty": "RSA",
"kid": kid,
"x5c": [der_b64],
"issuer": constraint,
}
]
},
}
json.dump(data, metadata_fp)
metadata_fp.flush()
network.consortium.set_jwt_issuer(primary, metadata_fp.name)
def try_auth(primary, issuer, kid, iss, tid):
with primary.client("user0") as c:
LOG.info(f"Creating JWT with kid={kid} iss={iss} tenant={tid}")
r = c.get(
"/app/jwt",
headers=infra.jwt_issuer.make_bearer_header(
issuer.issue_jwt(kid, claims={"iss": iss, "tid": tid})
),
)
assert r.status_code
return r.status_code
@reqs.description("Test stack size limit")
def test_stack_size_limit(network, args):
primary, _ = network.find_nodes()
safe_depth = 1
depth = safe_depth
max_depth = 8192
with primary.client("user0") as c:
r = c.post("/app/recursive", body={"depth": safe_depth})
assert r.status_code == http.HTTPStatus.OK, r.status_code
max_stack_bytes = (
512 * 1024
) # Lower than 1024 * 1024 default, but enough to pass a proposal to restore the limit
with temporary_js_limits(network, primary, max_stack_bytes=max_stack_bytes):
while depth <= max_depth:
depth *= 2
r = c.post("/app/recursive", body={"depth": depth})
if r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR:
message = r.body.json()["error"]["details"][0]["message"]
assert message == "InternalError: stack overflow", message
LOG.info(
f"Stack overflow at depth={depth} with max_stack_bytes={max_stack_bytes}"
)
break
assert depth < max_depth, f"No stack overflow trigger at max depth {depth}"
r = c.post("/app/recursive", body={"depth": safe_depth})
assert r.status_code == http.HTTPStatus.OK, r
# Lower the cap until we likely run out of stack out of user code,
# and check that we don't crash. Check we return an error message
cap = max_stack_bytes
while cap > 0:
cap //= 2
LOG.info(f"Max stack size: {cap}")
with temporary_js_limits(network, primary, max_stack_bytes=cap):
r = c.post("/app/recursive", body={"depth": 1})
if r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR:
message = r.body.json()["error"]["message"]
assert message == "Exception thrown while executing.", message
break
# Cap is so low that we must run out before we enter user code
with temporary_js_limits(network, primary, max_stack_bytes=100):
r = c.post("/app/recursive", body={"depth": 1})
assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
message = r.body.json()["error"]["message"]
assert message == "Exception thrown while executing.", message
return network
@reqs.description("Test heap size limit")
def test_heap_size_limit(network, args):
primary, _ = network.find_nodes()
safe_size = 5 * 1024 * 1024
unsafe_size = 500 * 1024 * 1024
with primary.client("user0") as c:
r = c.post("/app/alloc", body={"size": safe_size})
assert r.status_code == http.HTTPStatus.OK, r
with temporary_js_limits(network, primary, max_heap_bytes=3 * 1024 * 1024):
r = c.post("/app/alloc", body={"size": safe_size})
assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR, r
message = r.body.json()["error"]["details"][0]["message"]
assert message == "InternalError: out of memory", message
r = c.post("/app/alloc", body={"size": safe_size})
assert r.status_code == http.HTTPStatus.OK, r
r = c.post("/app/alloc", body={"size": unsafe_size})
message = r.body.json()["error"]["details"][0]["message"]
assert message == "InternalError: out of memory", message
# Lower the cap until we likely run out of heap out of user code,
# and check that we don't crash and return an error message
cap = safe_size
while cap > 0:
cap //= 2
LOG.info(f"Heap max size: {cap}")
with temporary_js_limits(network, primary, max_heap_bytes=cap):
r = c.post("/app/alloc", body={"size": 1})
if r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR:
message = r.body.json()["error"]["message"]
assert message == "Exception thrown while executing.", message
break
# Cap is so low that we must run out before we enter user code
with temporary_js_limits(network, primary, max_heap_bytes=100):
r = c.post("/app/alloc", body={"size": 1})
assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
message = r.body.json()["error"]["message"]
assert message == "Exception thrown while executing.", message
return network
@reqs.description("Test execution time limit")
def test_execution_time_limit(network, args):
primary, _ = network.find_nodes()
safe_time = 50
unsafe_time = 5000
with primary.client("user0") as c:
r = c.post("/app/sleep", body={"time": safe_time})
assert r.status_code == http.HTTPStatus.OK, r
with temporary_js_limits(network, primary, max_execution_time_ms=30):
r = c.post("/app/sleep", body={"time": safe_time})
assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR, r
message = r.body.json()["error"]["details"][0]["message"]
assert message == "InternalError: interrupted", message
r = c.post("/app/sleep", body={"time": safe_time})
assert r.status_code == http.HTTPStatus.OK, r
r = c.post("/app/sleep", body={"time": unsafe_time})
assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR, r
message = r.body.json()["error"]["details"][0]["message"]
assert message == "InternalError: interrupted", message
# Lower the cap until we likely run out of heap out of user code,
# and check that we don't crash and return an error message
cap = safe_time
while cap > 0:
cap //= 2
LOG.info(f"Max exec time: {cap}")
with temporary_js_limits(network, primary, max_execution_time_ms=cap):
r = c.post("/app/sleep", body={"time": 10})
if r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR:
message = r.body.json()["error"]["message"]
assert message == "Operation took too long to complete.", message
break
# Cap is so low that we must run out before we enter user code
with temporary_js_limits(network, primary, max_execution_time_ms=0):
r = c.post("/app/sleep", body={"time": 10})
assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR
message = r.body.json()["error"]["message"]
assert message == "Operation took too long to complete.", message
return network
def run_limits(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
network = test_stack_size_limit(network, args)
network = test_heap_size_limit(network, args)
network = test_execution_time_limit(network, args)
@reqs.description("Cert authentication")
def test_cert_auth(network, args):
def create_keypair(local_id, valid_from, validity_days):
privk_pem, _ = infra.crypto.generate_ec_keypair()
with open(
os.path.join(network.common_dir, f"{local_id}_privk.pem"),
"w",
encoding="ascii",
) as f:
f.write(privk_pem)
cert = infra.crypto.generate_cert(
privk_pem,
valid_from=valid_from,
validity_days=validity_days,
)
with open(
os.path.join(network.common_dir, f"{local_id}_cert.pem"),
"w",
encoding="ascii",
) as f:
f.write(cert)
primary, _ = network.find_primary()
LOG.info("User with old cert cannot call user-authenticated endpoint")
local_user_id = "in_the_past"
create_keypair(
local_user_id, datetime.datetime.utcnow() - datetime.timedelta(days=50), 3
)
network.consortium.add_user(primary, local_user_id)
with primary.client(local_user_id) as c:
r = c.get("/app/cert")
assert r.status_code == HTTPStatus.UNAUTHORIZED, r
assert "Not After" in r.body.json()["error"]["details"][0]["message"], r
LOG.info("User with future cert cannot call user-authenticated endpoint")
local_user_id = "in_the_future"
create_keypair(
local_user_id, datetime.datetime.utcnow() + datetime.timedelta(days=50), 3
)
network.consortium.add_user(primary, local_user_id)
with primary.client(local_user_id) as c:
r = c.get("/app/cert")
assert r.status_code == HTTPStatus.UNAUTHORIZED, r
assert "Not Before" in r.body.json()["error"]["details"][0]["message"], r
LOG.info("No leeway added to cert time evaluation")
local_user_id = "just_expired"
valid_from = datetime.datetime.utcnow() - datetime.timedelta(days=1, seconds=2)
create_keypair(local_user_id, valid_from, 1)
network.consortium.add_user(primary, local_user_id)
with primary.client(local_user_id) as c:
r = c.get("/app/cert")
assert r.status_code == HTTPStatus.UNAUTHORIZED, r
assert "Not After" in r.body.json()["error"]["details"][0]["message"], r
return network
@reqs.description("JWT authentication as by OpenID spec")
def test_jwt_auth(network, args):
primary, _ = network.find_nodes()
issuer = infra.jwt_issuer.JwtIssuer("https://example.issuer")
jwt_kid = "my_key_id"
LOG.info("Add JWT issuer with initial keys")
set_issuer_with_a_key(primary, network, issuer, jwt_kid, issuer.name)
LOG.info("Calling jwt endpoint after storing keys")
with primary.client("user0") as c:
r = c.get("/app/jwt", headers=infra.jwt_issuer.make_bearer_header("garbage"))
assert r.status_code == HTTPStatus.UNAUTHORIZED, r.status_code
jwt_mismatching_key_priv_pem, _ = infra.crypto.generate_rsa_keypair(2048)
jwt = infra.crypto.create_jwt({}, jwt_mismatching_key_priv_pem, jwt_kid)
r = c.get("/app/jwt", headers=infra.jwt_issuer.make_bearer_header(jwt))
assert r.status_code == HTTPStatus.UNAUTHORIZED, r.status_code
r = c.get(
"/app/jwt",
headers=infra.jwt_issuer.make_bearer_header(issuer.issue_jwt(jwt_kid)),
)
assert r.status_code == HTTPStatus.OK, r.status_code
LOG.info("Calling JWT with too-late nbf")
r = c.get(
"/app/jwt",
headers=infra.jwt_issuer.make_bearer_header(
issuer.issue_jwt(jwt_kid, claims={"nbf": time.time() + 60})
),
)
assert r.status_code == HTTPStatus.UNAUTHORIZED, r.status_code
LOG.info("Calling JWT with too-early exp")
r = c.get(
"/app/jwt",
headers=infra.jwt_issuer.make_bearer_header(
issuer.issue_jwt(jwt_kid, claims={"exp": time.time() - 60})
),
)
assert r.status_code == HTTPStatus.UNAUTHORIZED, r.status_code
network.consortium.remove_jwt_issuer(primary, issuer.name)
return network
@reqs.description("JWT authentication as by MSFT Entra (single tenant)")
def test_jwt_auth_msft_single_tenant(network, args):
"""For a specific tenant, only tokens with this issuer+tenant can auth."""
primary, _ = network.find_nodes()
TENANT_ID = "9188050d-6c67-4c5b-b112-36a304b66da"
ISSUER_TENANT = (
"https://login.microsoftonline.com/9188050d-6c67-4c5b-b112-36a304b66da/v2.0"
)
issuer = infra.jwt_issuer.JwtIssuer(name="https://login.microsoftonline.com")
jwt_kid = "my_key_id"
set_issuer_with_a_key(primary, network, issuer, jwt_kid, ISSUER_TENANT)
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, "garbage_tenant")
== HTTPStatus.UNAUTHORIZED
)
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, "{tenantid}")
== HTTPStatus.UNAUTHORIZED
)
assert try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID) == HTTPStatus.OK
network.consortium.remove_jwt_issuer(primary, issuer.name)
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
return network
@reqs.description("JWT authentication as by MSFT Entra (multiple tenants)")
def test_jwt_auth_msft_multitenancy(network, args):
"""For a common tenant, all tokens from this issuer can auth,
no matter which tenant is specified."""
primary, _ = network.find_nodes()
COMMNON_ISSUER = "https://login.microsoftonline.com/{tenantid}/v2.0"
TENANT_ID = "9188050d-6c67-4c5b-b112-36a304b66da"
ISSUER_TENANT = f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"
ANOTHER_TENANT_ID = "deadbeef-6c67-4c5b-b112-36a304b66da"
ISSUER_ANOTHER = f"https://login.microsoftonline.com/{ANOTHER_TENANT_ID}/v2.0"
issuer = infra.jwt_issuer.JwtIssuer(name="https://login.microsoftonline.com")
jwt_kid_1 = "my_key_id_1"
jwt_kid_2 = "my_key_id_2"
with tempfile.NamedTemporaryFile(prefix="ccf", mode="w+") as metadata_fp:
jwt_cert_der = infra.crypto.cert_pem_to_der(issuer.cert_pem)
der_b64 = base64.b64encode(jwt_cert_der).decode("ascii")
data = {
"issuer": issuer.issuer_url,
"auto_refresh": False,
"jwks": {
"keys": [
{
"kty": "RSA",
"kid": jwt_kid_1,
"x5c": [der_b64],
"issuer": COMMNON_ISSUER,
},
{
"kty": "RSA",
"kid": jwt_kid_2,
"x5c": [der_b64],
"issuer": ISSUER_TENANT,
},
]
},
}
json.dump(data, metadata_fp)
metadata_fp.flush()
network.consortium.set_jwt_issuer(primary, metadata_fp.name)
assert (
try_auth(primary, issuer, jwt_kid_1, ISSUER_TENANT, TENANT_ID) == HTTPStatus.OK
)
assert (
try_auth(primary, issuer, jwt_kid_1, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.OK
)
assert (
try_auth(primary, issuer, jwt_kid_1, ISSUER_TENANT, ANOTHER_TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
assert (
try_auth(primary, issuer, jwt_kid_2, ISSUER_TENANT, TENANT_ID) == HTTPStatus.OK
)
assert (
try_auth(primary, issuer, jwt_kid_2, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
network.consortium.remove_jwt_issuer(primary, issuer.name)
assert (
try_auth(primary, issuer, jwt_kid_1, ISSUER_TENANT, TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
assert (
try_auth(primary, issuer, jwt_kid_2, ISSUER_TENANT, TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
return network
@reqs.description("JWT authentication with same kids for different issuers")
def test_jwt_auth_msft_same_kids_different_issuers(network, args):
"""Multiple issuer can share same kid with their own constraints specified.
So when issuer1 adds it with constraint1, and issuer2 added it with constraint2,
then tokens from both issuer1 and issuer2 can go.
However, when issuer1 is removed, issuer2 has to remain capable of
authenticating with their tokens."""
primary, _ = network.find_nodes()
TENANT_ID = "9188050d-6c67-4c5b-b112-36a304b66da"
ISSUER_TENANT = f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"
ANOTHER_TENANT_ID = "deadbeef-6c67-4c5b-b112-36a304b66da"
ISSUER_ANOTHER = f"https://login.microsoftonline.com/{ANOTHER_TENANT_ID}/v2.0"
issuer = infra.jwt_issuer.JwtIssuer(name=ISSUER_TENANT)
another = infra.jwt_issuer.JwtIssuer(name=ISSUER_ANOTHER)
# Immitate same key sharing
another.cert_pem, another.key_priv_pem = issuer.cert_pem, issuer.key_priv_pem
jwt_kid = "my_key_id"
set_issuer_with_a_key(primary, network, issuer, jwt_kid, ISSUER_TENANT)
assert try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID) == HTTPStatus.OK
assert (
try_auth(primary, another, jwt_kid, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
set_issuer_with_a_key(primary, network, another, jwt_kid, ISSUER_ANOTHER)
assert try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID) == HTTPStatus.OK
assert (
try_auth(primary, another, jwt_kid, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.OK
)
network.consortium.remove_jwt_issuer(primary, issuer.name)
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
assert (
try_auth(primary, another, jwt_kid, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.OK
)
network.consortium.remove_jwt_issuer(primary, another.name)
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
assert (
try_auth(primary, another, jwt_kid, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
return network
@reqs.description("JWT authentication with same kid with constraint overwrite")
def test_jwt_auth_msft_same_kids_overwrite_constraint(network, args):
"""If issuer sets the same kid with different constraint we have to
overwrite it. Test exists because this was found as a bug during feature
development and was very easy to miss updating the constraint for
existing kids."""
primary, _ = network.find_nodes()
COMMNON_ISSUER = "https://login.microsoftonline.com/{tenantid}/v2.0"
TENANT_ID = "9188050d-6c67-4c5b-b112-36a304b66da"
ISSUER_TENANT = f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"
ANOTHER_TENANT_ID = "deadbeef-6c67-4c5b-b112-36a304b66da"
ISSUER_ANOTHER = f"https://login.microsoftonline.com/{ANOTHER_TENANT_ID}/v2.0"
issuer = infra.jwt_issuer.JwtIssuer(name=ISSUER_TENANT)
jwt_kid = "my_key_id"
set_issuer_with_a_key(primary, network, issuer, jwt_kid, COMMNON_ISSUER)
assert try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID) == HTTPStatus.OK
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.OK
)
set_issuer_with_a_key(primary, network, issuer, jwt_kid, ISSUER_TENANT)
assert try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID) == HTTPStatus.OK
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
network.consortium.remove_jwt_issuer(primary, issuer.name)
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_TENANT, TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
assert (
try_auth(primary, issuer, jwt_kid, ISSUER_ANOTHER, ANOTHER_TENANT_ID)
== HTTPStatus.UNAUTHORIZED
)
return network
@reqs.description("Role-based access")
def test_role_based_access(network, args):
primary, _ = network.find_nodes()
users = network.users
assert len(users) >= 3
# Confirm that initially, no user can read or write from the secret, or modify roles
for user in users:
with primary.client(user.local_id) as c:
r = c.get("/app/secret")
assert r.status_code == 401, r.status_code
r = c.post("/app/secret", {"new_secret": "I'm in charge"})
assert r.status_code == 401, r.status_code
r = c.post(
"/app/roles",
{"target_id": user.service_id, "target_role": "secret_reader"},
)
assert r.status_code == 401, r.status_code
# Bootstrap with a member-governance operation to give first user special role, allowing them to set other users' roles
network.consortium.set_user_data(
primary, users[0].service_id, {"role": "role_master"}
)
readers = []
writers = []
# First user can now assign roles of itself and others
with primary.client(users[0].local_id) as c:
r = c.post(
"/app/roles",
{"target_id": users[0].service_id, "target_role": "secret_reader"},
)
assert r.status_code == 204, r.status_code
readers.append(users[0])
r = c.post(
"/app/roles",
{"target_id": users[1].service_id, "target_role": "role_master"},
)
assert r.status_code == 204, r.status_code
# Second user can now assign roles, thanks to assignment from first user
with primary.client(users[1].local_id) as c:
r = c.post(
"/app/roles",
{"target_id": users[2].service_id, "target_role": "secret_reader"},
)
assert r.status_code == 204, r.status_code
readers.append(users[2])
r = c.post(
"/app/roles",
{"target_id": users[2].service_id, "target_role": "secret_writer"},
)
assert r.status_code == 204, r.status_code
writers.append(users[2])
r = c.post(
"/app/roles",
{"target_id": users[3].service_id, "target_role": "secret_reader"},
)
assert r.status_code == 204, r.status_code
readers.append(users[3])
# Those role assignments allow some users to read and write
for user in users:
with primary.client(user.local_id) as c:
r = c.post(
"/app/secret", {"new_secret": f"My favourite person is {user.local_id}"}
)
if user in writers:
assert r.status_code == 204, r.status_code
else:
assert r.status_code == 401, r.status_code
r = c.get("/app/secret")
if user in readers:
assert r.status_code == 200, r.status_code
else:
assert r.status_code == 401, r.status_code
def run_authn(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
network = test_cert_auth(network, args)
network = test_jwt_auth(network, args)
network = test_jwt_auth_msft_single_tenant(network, args)
network = test_jwt_auth_msft_multitenancy(network, args)
network = test_jwt_auth_msft_same_kids_different_issuers(network, args)
network = test_jwt_auth_msft_same_kids_overwrite_constraint(network, args)
network = test_role_based_access(network, args)
@reqs.description("Test content types")
def test_content_types(network, args):
primary, _ = network.find_nodes()
with primary.client("user0") as c:
r = c.post("/app/text", body="text")
assert r.status_code == http.HTTPStatus.OK, r.status_code
assert r.headers["content-type"] == "text/plain"
assert r.body.text() == "text"
r = c.post("/app/json", body={"foo": "bar"})
assert r.status_code == http.HTTPStatus.OK, r.status_code
assert r.headers["content-type"] == "application/json"
assert r.body.json() == {"foo": "bar"}
r = c.post("/app/binary", body=b"\x00" * 42)
assert r.status_code == http.HTTPStatus.OK, r.status_code
assert r.headers["content-type"] == "application/octet-stream"
assert r.body.data() == b"\x00" * 42, r.body
r = c.post("/app/custom", body="text", headers={"content-type": "foo/bar"})
assert r.status_code == http.HTTPStatus.OK, r.status_code
assert r.headers["content-type"] == "text/plain"
assert r.body.text() == "text"
return network
@reqs.description("Test accept header")
def test_accept_header(network, args):
primary, _ = network.find_nodes()
with primary.client() as c:
r = c.get("/node/commit", headers={"accept": "nonsense"})
assert r.status_code == http.HTTPStatus.BAD_REQUEST.value
r = c.get("/node/commit", headers={"accept": "text/html"})
assert r.status_code == http.HTTPStatus.NOT_ACCEPTABLE.value
r = c.get(
"/node/commit",
headers={"accept": "text/html;q=0.9,image/jpeg;video/mpeg;q=0.8"},
)
assert r.status_code == http.HTTPStatus.NOT_ACCEPTABLE.value
r = c.get("/node/commit", headers={"accept": "*/*"})
assert r.status_code == http.HTTPStatus.OK.value
r = c.get("/node/commit", headers={"accept": "application/*"})
assert r.status_code == http.HTTPStatus.OK.value
r = c.get("/node/commit", headers={"accept": "application/json"})
assert r.status_code == http.HTTPStatus.OK.value
assert r.headers["content-type"] == "application/json"
r = c.get(
"/node/commit",
headers={"accept": "text/html;q=0.9,image/jpeg;video/mpeg;q=0.8,*/*;q=0.1"},
)
assert r.status_code == http.HTTPStatus.OK.value
r = c.get(
"/node/commit",
headers={
"accept": "text/html;q=0.9,image/jpeg;video/mpeg;q=0.8,application/json;q=0.1"
},
)
assert r.status_code == http.HTTPStatus.OK.value
assert r.headers["content-type"] == "application/json"
return network
@reqs.description("Test supported methods")
def test_supported_methods(network, args):
primary, _ = network.find_nodes()
with primary.client("user0") as c:
# Test ALLOW header when wrong method is used
r = c.delete("/app/text")
assert r.status_code == http.HTTPStatus.METHOD_NOT_ALLOWED
allow = r.headers.get("allow")
assert allow is not None
assert "OPTIONS" in allow
assert "POST" in allow
# Test ALLOW header when OPTIONS method is used on POST-only app endpoint
r = c.options("/app/text")
assert r.status_code == http.HTTPStatus.NO_CONTENT
allow = r.headers.get("allow")
assert allow is not None
assert "OPTIONS" in allow
assert "POST" in allow
# Test ALLOW header when OPTIONS method is used on GET-only framework endpoint
r = c.options("/node/commit")
assert r.status_code == http.HTTPStatus.NO_CONTENT
allow = r.headers.get("allow")
assert allow is not None
assert "OPTIONS" in allow
assert "GET" in allow
return network
@reqs.description("Test unknown path")
def test_unknown_path(network, args):
primary, _ = network.find_nodes()
with primary.client("user0") as c:
r = c.get("/app/not/a/real/path")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
r = c.post("/app/not/a/real/path")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
r = c.delete("/app/not/a/real/path")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
r = c.post("/app/unknown")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
with primary.client() as c:
r = c.get("/app/not/a/real/path")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
r = c.post("/app/not/a/real/path")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
r = c.delete("/app/not/a/real/path")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
r = c.post("/app/unknown")
assert r.status_code == http.HTTPStatus.NOT_FOUND, r.status_code
return network
def run_content_types(args):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
network = test_content_types(network, args)
network = test_accept_header(network, args)
network = test_supported_methods(network, args)
network = test_unknown_path(network, args)
@reqs.description("Test request object API")
def test_random_api(args):
# Test that Math.random() does not repeat values:
# - on multiple invocations within a single request
# - on multiple requests
# - on network restarts
seen_values = set()
def assert_fresh(n):
assert n not in seen_values, f"{n} has already been returned"
seen_values.add(n)
n_repeats = 3
for _ in range(n_repeats):
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
primary, _ = network.find_nodes()
for _ in range(n_repeats):
with primary.client() as c:
r = c.get("/app/make_randoms")
assert r.status_code == 200, r
for _, n in r.body.json().items():
assert_fresh(n)
@reqs.description("Test request object API")
def test_request_object_api(network, args):
primary, _ = network.find_nodes()
def test_expectations(expected_fields, response):
assert response.status_code == http.HTTPStatus.OK, response
j = response.body.json()
for k, v in expected_fields.items():
ks = k.split(".")
current = j
for k_ in ks:
assert k_ in current, f"Missing field {k} from response body"
current = current[k_]
actual = current
assert (
v == actual
), f"Mismatch at field {k}. Expected '{v}', response contains '{actual}'"
# LOG.success(f"{k} == {v}")
def test_url(expected_fields, client, base_url, query="", url_params=None):
url = base_url
expected_fields["route"] = base_url
if url_params is not None:
url = url.format(**url_params)
expected_fields["path"] = url
if len(query) > 0:
url += f"?{query}"
expected_fields["query"] = query
expected_fields["url"] = url
expected_fields["method"] = "GET"
test_expectations(expected_fields, client.get(url))
expected_fields["method"] = "POST"
test_expectations(expected_fields, client.post(url))
expected_fields["method"] = "DELETE"
test_expectations(expected_fields, client.delete(url))
def test_client(expected_fields, client):
test_url(
{**expected_fields, "hostname": f"{client.hostname}"},
client,
"/app/echo",
)
test_url(
{**expected_fields, "hostname": f"{client.hostname}"},
client,
"/app/echo",
query="a=42&b=hello_world",
)
test_url(
{**expected_fields, "hostname": f"{client.hostname}"},
client,
"/app/echo/{foo}",
url_params={"foo": "bar"},
)
test_url(
{**expected_fields, "hostname": f"{client.hostname}"},
client,
"/app/echo/{foo}",
query="a=42&b=hello_world",
url_params={"foo": "bar"},
)
user = network.users[0]
with primary.client(user.local_id) as c:
test_client({"caller.policy": "user_cert", "caller.id": user.service_id}, c)
with primary.client() as c:
test_client({"caller.policy": "no_auth"}, c)
return network
@reqs.description("Test Date API")
def test_datetime_api(network, args):
primary, _ = network.find_nodes()
with primary.client() as c:
r = c.get("/time_now")
local_time = datetime.datetime.now(datetime.timezone.utc)
assert r.status_code == http.HTTPStatus.OK, r
body = r.body.json()
# Python datetime "ISO" doesn't parse Z suffix, so replace it
default = body["default"].replace("Z", "+00:00")
definitely_now = body["definitely_now"].replace("Z", "+00:00")
definitely_1970 = body["definitely_1970"].replace("Z", "+00:00")
# Assume less than 5ms of execution time between grabbing timestamps, and confirm that default call gets real timestamp from global activation
default_time = datetime.datetime.fromisoformat(default)
service_time = datetime.datetime.fromisoformat(definitely_now)
diff = (service_time - default_time).total_seconds()
assert diff < 0.005, diff
# Assume less than 1 second of clock skew + execution time
diff = (local_time - service_time).total_seconds()
assert abs(diff) < 1, diff
local_epoch_start = datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
service_epoch_start = datetime.datetime.fromisoformat(definitely_1970)
assert local_epoch_start == service_epoch_start, service_epoch_start
return network
@reqs.description("Test metrics logging")
def test_metrics_logging(network, args):
primary, _ = network.find_nodes()
# Add and test on a new node, so we can kill it to safely read its logs
new_node = network.create_node("local://localhost")
network.join_node(
new_node,
args.package,
args,
)
network.trust_node(new_node, args)
# Submit several requests
assertions = []
with new_node.client() as c:
c.get("/app/echo")
assertions.append({"Method": "GET", "Path": "/app/echo"})
c.post("/app/echo")
assertions.append({"Method": "POST", "Path": "/app/echo"})
c.get("/app/echo/hello")
assertions.append({"Method": "GET", "Path": "/app/echo/{foo}"})
c.post("/app/echo/bar")
assertions.append({"Method": "POST", "Path": "/app/echo/{foo}"})
c.get("/app/fibonacci/10")
assertions.append({"Method": "GET", "Path": "/app/fibonacci/{n}"})
c.get("/app/fibonacci/20")
assertions.append({"Method": "GET", "Path": "/app/fibonacci/{n}"})
c.get("/app/fibonacci/30")
assertions.append({"Method": "GET", "Path": "/app/fibonacci/{n}"})
# Remove node
network.retire_node(primary, new_node)
new_node.stop()
# Read node's logs
metrics_regex = re.compile(
r".*\[js\].*\| JS execution complete: Method=(?P<Method>.*), Path=(?P<Path>.*), Status=(?P<Status>\d+), ExecMilliseconds=(?P<ExecMilliseconds>\d+)$"
)
out_path, _ = new_node.get_logs()
for line in open(out_path, "r", encoding="utf-8").readlines():
match = metrics_regex.match(line)
if match is not None:
expected_groups = assertions.pop(0)
for k, v in expected_groups.items():
actual_match = match.group(k)
assert actual_match == v
LOG.success(f"Found metrics logging line: {line}")
LOG.info(f"Parsed to: {match.groups()}")
assert len(assertions) == 0
return network
def run_api(args):
test_random_api(args)
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
network = test_request_object_api(network, args)
network = test_datetime_api(network, args)
network = test_metrics_logging(network, args)
def test_reused_interpreter_behaviour(network, args):
primary, _ = network.find_nodes()
def timed(fn):
start = datetime.datetime.now()
result = fn()
end = datetime.datetime.now()
duration = (end - start).total_seconds()
LOG.debug(f"({duration:.2f}s)")
return duration, result
# Extremely crude "same order-of-magnitude" comparisons
def much_smaller(a, b):
return a < b / 2
# Not actual assertions because they'll often fail for unrelated
# reasons - the JS execution got a caching benefit, but some
# scheduling unluckiness caused the roundtrip time to be slow.
# Instead we assert on the deterministic wasCached bool, but log
# errors if this doesn't correspond with expected run time impact.
def expect_much_smaller(a, b):
if not (much_smaller(a, b)):
LOG.error(
f"Expected to complete much faster, but took {a:.4f} and {b:.4f} seconds"
)
def expect_similar(a, b):
if much_smaller(a, b) or much_smaller(b, a):
LOG.error(
f"Expected similar execution times, but took {a:.4f} and {b:.4f} seconds"
)
def was_cached(response):
return response.body.json()["wasCached"]
fib_body = {"n": 25}
with primary.client() as c:
LOG.info("Testing with no caching benefit")
baseline, res0 = timed(lambda: c.post("/fibonacci/reuse/none", fib_body))
repeat1, res1 = timed(lambda: c.post("/fibonacci/reuse/none", fib_body))
repeat2, res2 = timed(lambda: c.post("/fibonacci/reuse/none", fib_body))
results = (res0, res1, res2)
assert all(r.status_code == http.HTTPStatus.OK for r in results), results
assert all(not was_cached(r) for r in results), results
expect_similar(baseline, repeat1)
expect_similar(baseline, repeat2)
LOG.info("Testing cached interpreter benefit")
baseline, res0 = timed(lambda: c.post("/fibonacci/reuse/a", fib_body))
repeat1, res1 = timed(lambda: c.post("/fibonacci/reuse/a", fib_body))
repeat2, res2 = timed(lambda: c.post("/fibonacci/reuse/a", fib_body))
results = (res0, res1, res2)
assert all(r.status_code == http.HTTPStatus.OK for r in results), results
assert not was_cached(res0), res0
assert was_cached(res1), res1
assert was_cached(res2), res2
expect_much_smaller(repeat1, baseline)
expect_much_smaller(repeat2, baseline)
LOG.info("Testing cached app behaviour")
# For this app, different key means re-execution, so same as no cache benefit, first time
baseline, res0 = timed(lambda: c.post("/fibonacci/reuse/a", {"n": 26}))
repeat1, res1 = timed(lambda: c.post("/fibonacci/reuse/a", {"n": 26}))
results = (res0, res1)
assert all(r.status_code == http.HTTPStatus.OK for r in results), results
assert not was_cached(res0), res0
assert was_cached(res1), res1
expect_much_smaller(repeat1, baseline)
LOG.info("Testing behaviour of multiple interpreters")
baseline, res0 = timed(lambda: c.post("/fibonacci/reuse/b", fib_body))
repeat1, res1 = timed(lambda: c.post("/fibonacci/reuse/b", fib_body))
repeat2, res2 = timed(lambda: c.post("/fibonacci/reuse/b", fib_body))
results = (res0, res1, res2)
assert all(r.status_code == http.HTTPStatus.OK for r in results), results
assert not was_cached(res0), res0
assert was_cached(res1), res1
assert was_cached(res2), res2
expect_much_smaller(repeat1, baseline)
expect_much_smaller(repeat2, baseline)
LOG.info("Testing cap on number of interpreters")
# Call twice so we should definitely be cached, regardless of what previous tests did
c.post("/fibonacci/reuse/a", fib_body)
c.post("/fibonacci/reuse/b", fib_body)
c.post("/fibonacci/reuse/c", fib_body)
resa = c.post("/fibonacci/reuse/a", fib_body)
resb = c.post("/fibonacci/reuse/b", fib_body)
resc = c.post("/fibonacci/reuse/c", fib_body)
results = (resa, resb, resc)
assert all(was_cached(res) for res in results), results
# Get current metrics to pass existing/default values
r = c.get("/node/js_metrics")
body = r.body.json()
default_max_heap_size = body["max_heap_size"]
default_max_stack_size = body["max_stack_size"]
default_max_execution_time = body["max_execution_time"]
default_max_cached_interpreters = body["max_cached_interpreters"]
network.consortium.set_js_runtime_options(
primary,
max_heap_bytes=default_max_heap_size,
max_stack_bytes=default_max_stack_size,
max_execution_time_ms=default_max_execution_time,
max_cached_interpreters=2,
)
# If we round-robin through too many interpreters, we flush them from the LRU cache
c.post("/fibonacci/reuse/a", fib_body)
c.post("/fibonacci/reuse/b", fib_body)
c.post("/fibonacci/reuse/c", fib_body)
resa = c.post("/fibonacci/reuse/a", fib_body)
resb = c.post("/fibonacci/reuse/b", fib_body)
resc = c.post("/fibonacci/reuse/c", fib_body)
results = (resa, resb, resc)
assert all(not was_cached(res) for res in results), results
# But if we stay within the interpreter cap, then we get a cached interpreter
resb = c.post("/fibonacci/reuse/b", fib_body)
resc = c.post("/fibonacci/reuse/c", fib_body)
results = (resb, resc)
assert all(was_cached(res) for res in results), results
# Restoring original cap
network.consortium.set_js_runtime_options(
primary,
max_heap_bytes=default_max_heap_size,
max_stack_bytes=default_max_stack_size,
max_execution_time_ms=default_max_execution_time,
max_cached_interpreters=default_max_cached_interpreters,
)
LOG.info("Testing Dependency Injection sample endpoint")
baseline, res0 = timed(lambda: c.post("/app/di"))
repeat1, res1 = timed(lambda: c.post("/app/di"))
repeat2, res2 = timed(lambda: c.post("/app/di"))
repeat3, res3 = timed(lambda: c.post("/app/di"))
results = (res0, res1, res2, res3)
assert all(r.status_code == http.HTTPStatus.OK for r in results), results
expect_much_smaller(repeat1, baseline)
expect_much_smaller(repeat2, baseline)
expect_much_smaller(repeat3, baseline)
return network
def test_caching_of_kv_handles(network, args):
primary, _ = network.find_nodes()
with primary.client() as c:
LOG.info("Testing caching of KV handles")
r = c.post("/app/increment")
assert r.status_code == http.HTTPStatus.OK, r
r = c.post("/app/increment")
assert r.status_code == http.HTTPStatus.OK, r
r = c.post("/app/increment")
assert r.status_code == http.HTTPStatus.OK, r
r = c.post("/app/increment")
assert r.status_code == http.HTTPStatus.OK, r
r = c.post("/app/increment")
assert r.status_code == http.HTTPStatus.OK, r
LOG.info("Testing caching of ccf JS globals")
def make_body():
return {str(uuid.uuid4()): str(uuid.uuid4())}
body = make_body()
r = c.post("/app/globals", body)
assert r.status_code == http.HTTPStatus.OK, r
assert r.body.json() == body
body = make_body()
r = c.post("/app/globals", body)
assert r.status_code == http.HTTPStatus.OK, r
assert r.body.json() == body
body = make_body()
r = c.post("/app/globals", body)
assert r.status_code == http.HTTPStatus.OK, r
assert r.body.json() == body
return network
def test_caching_of_app_code(network, args):
primary, backups = network.find_nodes()
LOG.info(
"Testing that interpreter reuse does not persist functions past app update"
)
def set_app_with_placeholder(new_val):
LOG.info(f"Replacing placeholder with {new_val}")
bundle = network.consortium.read_bundle_from_dir(args.js_app_bundle)
# Replace placeholder blindly, in the raw bundle JSON as string
s = json.dumps(bundle).replace("<func_caching_placeholder>", new_val)
bundle = json.loads(s)
return network.consortium.set_js_app_from_bundle(primary, bundle)
for _ in range(5):
v = str(uuid.uuid4())
p = set_app_with_placeholder(v)
for node in [primary, *backups]:
with node.client() as c:
infra.commit.wait_for_commit(client=c, view=p.view, seqno=p.seqno)
r = c.get("/app/func_caching")
assert r.status_code == http.HTTPStatus.OK, r
assert r.body.text() == v
return network
def run_interpreter_reuse(args):
# The js_app_bundle arg includes TS and Node dependencies, so must be built here
# before deploying (and then we deploy the produces /dist folder)
js_src_dir = args.js_app_bundle
LOG.info("Building mixed JS/TS app, with dependencies")
subprocess.run(["npm", "install", "--no-package-lock"], cwd=js_src_dir, check=True)
subprocess.run(["npm", "run", "build"], cwd=js_src_dir, check=True)
args.js_app_bundle = os.path.join(js_src_dir, "dist")
with infra.network.network(
args.nodes, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb
) as network:
network.start_and_open(args)
network = test_reused_interpreter_behaviour(network, args) #
network = test_caching_of_kv_handles(network, args)
network = test_caching_of_app_code(network, args)
if __name__ == "__main__":
cr = ConcurrentRunner()
# cr.add(
# "authz",
# run,
# nodes=infra.e2e_args.nodes(cr.args, 1),
# js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-custom-authorization"),
# )
# cr.add(
# "limits",
# run_limits,
# nodes=infra.e2e_args.nodes(cr.args, 1),
# js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-limits"),
# )
# cr.add(
# "authn",
# run_authn,
# nodes=infra.e2e_args.nodes(cr.args, 1),
# js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-authentication"),
# initial_user_count=4,
# initial_member_count=2,
# )
cr.add(
"content_types",
run_content_types,
nodes=infra.e2e_args.nodes(cr.args, 1),
js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-content-types"),
)
# cr.add(
# "api",
# run_api,
# nodes=infra.e2e_args.nodes(cr.args, 1),
# js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-api"),
# )
# cr.add(
# "interpreter_reuse",
# run_interpreter_reuse,
# nodes=infra.e2e_args.min_nodes(cr.args, f=1),
# js_app_bundle=os.path.join(cr.args.js_app_bundle, "js-interpreter-reuse"),
# )
cr.run()