Enable COSE Sign1 authentication for governance endpoints (#4392)

This commit is contained in:
Amaury Chamayou 2022-10-26 16:39:26 +01:00 коммит произвёл GitHub
Родитель d7aadbed0c
Коммит 86375602ad
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 950 добавлений и 231 удалений

Просмотреть файл

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [3.0.0-rc0]
### Experimental
- Governance endpoints now support [COSE Sign1](https://www.rfc-editor.org/rfc/rfc8152#page-18) input, as well as signed HTTP requests (#4392).
### Removed
- The functions `starts_with`, `ends_with`, `remove_prefix`, and `remove_suffix`, and the type `remove_cvref` have been removed from `nonstd::`. The C++20 equivalents should be used instead.

Просмотреть файл

@ -632,8 +632,15 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
CONSENSUS cft
LABEL suite
ADDITIONAL_ARGS --test-duration 150 --test-suite rekey_recovery
--test-suite membership_recovery
ADDITIONAL_ARGS
--test-duration
150
--test-suite
rekey_recovery
--test-suite
membership_recovery
--jinja-templates-path
${CMAKE_SOURCE_DIR}/samples/templates
)
add_e2e_test(
@ -641,7 +648,9 @@ if(BUILD_TESTS)
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
CONSENSUS cft
LABEL suite
ADDITIONAL_ARGS --test-duration 200 --test-suite reconfiguration
ADDITIONAL_ARGS
--test-duration 200 --test-suite reconfiguration --jinja-templates-path
${CMAKE_SOURCE_DIR}/samples/templates
)
add_e2e_test(
@ -658,6 +667,8 @@ if(BUILD_TESTS)
200
--test-suite
all
--jinja-templates-path
${CMAKE_SOURCE_DIR}/samples/templates
)
add_e2e_test(

Просмотреть файл

@ -365,7 +365,7 @@ Service constitution: JavaScript module, exporting ``validate()``, ``resolve()``
``history``
~~~~~~~~~~~
Governance history of the service, captures all governance requests submitted by members.
Governance history of the service, captures signed governance requests submitted by members.
**Key** Member ID: SHA-256 fingerprint of the member certificate, represented as a hex-encoded string.
@ -373,6 +373,15 @@ Governance history of the service, captures all governance requests submitted by
See :cpp:struct:`ccf::SignedReq`
``cose_history``
~~~~~~~~~~~~~~~~
Governance history of the service, captures all COSE Sign 1 governance requests submitted by members.
**Key** Member ID: SHA-256 fingerprint of the member certificate, represented as a hex-encoded string.
**Value** COSE Sign1
``public:ccf.internal.``
------------------------

Просмотреть файл

@ -306,7 +306,7 @@
"type": "http"
},
"member_cose_sign1": {
"description": "Request payload must be a COSE Sign1 document, with expected protected headers.Signer must be a member identity registered with this service.",
"description": "Request payload must be a COSE Sign1 document, with expected protected headers. Signer must be a member identity registered with this service.",
"scheme": "cose_sign1",
"type": "http"
},

Просмотреть файл

@ -463,6 +463,11 @@
}
},
"securitySchemes": {
"member_cose_sign1": {
"description": "Request payload must be a COSE Sign1 document, with expected protected headers. Signer must be a member identity registered with this service.",
"scheme": "cose_sign1",
"type": "http"
},
"member_signature": {
"description": "Request must be signed according to the HTTP Signature scheme. The signer must be a member identity registered with this service.",
"scheme": "signature",
@ -487,7 +492,7 @@
"info": {
"description": "This API is used to submit and query proposals which affect CCF's public governance tables.",
"title": "CCF Governance API",
"version": "2.9.8"
"version": "2.12.0"
},
"openapi": "3.0.0",
"paths": {
@ -514,6 +519,9 @@
"security": [
{
"member_signature": []
},
{
"member_cose_sign1": []
}
],
"x-ccf-forwarding": {
@ -541,6 +549,9 @@
"security": [
{
"member_signature": []
},
{
"member_cose_sign1": []
}
],
"x-ccf-forwarding": {
@ -729,11 +740,6 @@
"$ref": "#/components/responses/default"
}
},
"security": [
{
"member_signature": []
}
],
"x-ccf-forwarding": {
"$ref": "#/components/x-ccf-forwarding/sometimes"
}
@ -767,6 +773,9 @@
"security": [
{
"member_signature": []
},
{
"member_cose_sign1": []
}
],
"x-ccf-forwarding": {
@ -791,11 +800,6 @@
"$ref": "#/components/responses/default"
}
},
"security": [
{
"member_signature": []
}
],
"x-ccf-forwarding": {
"$ref": "#/components/x-ccf-forwarding/sometimes"
}
@ -828,11 +832,6 @@
"$ref": "#/components/responses/default"
}
},
"security": [
{
"member_signature": []
}
],
"x-ccf-forwarding": {
"$ref": "#/components/x-ccf-forwarding/sometimes"
}
@ -888,6 +887,9 @@
"security": [
{
"member_signature": []
},
{
"member_cose_sign1": []
}
],
"x-ccf-forwarding": {
@ -912,11 +914,6 @@
"$ref": "#/components/responses/default"
}
},
"security": [
{
"member_signature": []
}
],
"x-ccf-forwarding": {
"$ref": "#/components/x-ccf-forwarding/sometimes"
}
@ -970,6 +967,9 @@
"security": [
{
"member_signature": []
},
{
"member_cose_sign1": []
}
],
"x-ccf-forwarding": {
@ -1029,6 +1029,9 @@
"security": [
{
"member_signature": []
},
{
"member_cose_sign1": []
}
],
"x-ccf-forwarding": {
@ -1064,6 +1067,9 @@
"security": [
{
"member_signature": []
},
{
"member_cose_sign1": []
}
],
"x-ccf-forwarding": {

Просмотреть файл

@ -109,6 +109,9 @@ namespace ccf
/// Signed request containing the last state digest.
std::optional<SignedReq> signed_req = std::nullopt;
/// COSE Sign1 containing the last state digest
std::optional<std::vector<uint8_t>> cose_sign1_req = std::nullopt;
MemberAck() {}
MemberAck(const crypto::Sha256Hash& root) : StateDigest(root) {}
@ -117,6 +120,13 @@ namespace ccf
StateDigest(root),
signed_req(signed_req_)
{}
MemberAck(
const crypto::Sha256Hash& root,
const std::vector<uint8_t>& cose_sign1_req_) :
StateDigest(root),
cose_sign1_req(cose_sign1_req_)
{}
};
DECLARE_JSON_TYPE_WITH_BASE_AND_OPTIONAL_FIELDS(MemberAck, StateDigest)
#pragma clang diagnostic push
@ -124,7 +134,7 @@ namespace ccf
#pragma clang diagnostic ignored "-Wgnu-zero-variadic-macro-arguments"
DECLARE_JSON_REQUIRED_FIELDS(MemberAck)
#pragma clang diagnostic pop
DECLARE_JSON_OPTIONAL_FIELDS(MemberAck, signed_req)
DECLARE_JSON_OPTIONAL_FIELDS(MemberAck, signed_req, cose_sign1_req)
using MemberAcks = ServiceMap<MemberId, MemberAck>;
namespace Tables
{

Просмотреть файл

@ -240,6 +240,6 @@ namespace ccf
{"scheme", "cose_sign1"},
{"description",
"Request payload must be a COSE Sign1 document, with expected "
"protected headers."
"protected headers. "
"Signer must be a member identity registered with this service."}});
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -63,6 +63,7 @@ namespace ccf
SnpHostDataMap host_data;
MemberAcks member_acks;
GovernanceHistory governance_history;
COSEGovernanceHistory cose_governance_history;
RecoveryShares shares;
EncryptedLedgerSecretsInfo encrypted_ledger_secrets;
EncryptedSubmittedShares encrypted_submitted_shares;
@ -121,6 +122,7 @@ namespace ccf
host_data(Tables::HOST_DATA),
member_acks(Tables::MEMBER_ACKS),
governance_history(Tables::GOV_HISTORY),
cose_governance_history(Tables::COSE_GOV_HISTORY),
shares(Tables::SHARES),
encrypted_ledger_secrets(Tables::ENCRYPTED_PAST_LEDGER_SECRET),
encrypted_submitted_shares(Tables::ENCRYPTED_SUBMITTED_SHARES),

Просмотреть файл

@ -13,4 +13,9 @@ namespace ccf
{
static constexpr auto GOV_HISTORY = "public:ccf.gov.history";
}
using COSEGovernanceHistory = ServiceMap<MemberId, std::vector<uint8_t>>;
namespace Tables
{
static constexpr auto COSE_GOV_HISTORY = "public:ccf.gov.cose_history";
}
}

Просмотреть файл

@ -217,6 +217,11 @@ if __name__ == "__main__":
help="If set, tests execution is skipped",
default=False,
)
parser.add_argument(
"--jinja-templates-path",
help="Path to directory containing sample Jinja templates",
required=True,
)
args = infra.e2e_args.cli_args(add)
args.package = "samples/apps/logging/liblogging"

Просмотреть файл

@ -627,6 +627,165 @@ def test_cose_auth(network, args):
headers={"content-type": "application/cose"},
)
assert r.status_code == 401
return network
@reqs.description("Test COSE ack")
def test_cose_ack(network, args):
primary, _ = network.find_primary()
identity = network.identity("member0")
signed_statement = signing.create_cose_sign1(
b"",
open(identity.key, encoding="utf-8").read(),
open(identity.cert, encoding="utf-8").read(),
{"ccf.gov.msg.type": "state_digest"},
)
with primary.client() as c:
r = c.post(
"/gov/ack/update_state_digest",
body=signed_statement,
headers={"content-type": "application/cose"},
)
assert r.status_code == 200
signed_state_digest = signing.create_cose_sign1(
r.body.data(),
open(identity.key, encoding="utf-8").read(),
open(identity.cert, encoding="utf-8").read(),
{"ccf.gov.msg.type": "ack"},
)
with primary.client() as c:
r = c.post(
"/gov/ack",
body=signed_state_digest,
headers={"content-type": "application/cose"},
)
assert r.status_code == 204
return network
@reqs.description("Test COSE proposal")
def test_cose_proposal(network, args):
primary, _ = network.find_primary()
identity = network.identity("member0")
other_identity = network.identity("member1")
new_user_local_id = "alice"
new_user = network.create_user(new_user_local_id, args.participants_curve)
template_loader = jinja2.ChoiceLoader(
[
jinja2.FileSystemLoader(args.jinja_templates_path),
jinja2.FileSystemLoader(os.path.dirname(new_user.cert_path)),
]
)
template_env = jinja2.Environment(
loader=template_loader, undefined=jinja2.StrictUndefined
)
proposal_template = template_env.get_template("set_user_proposal.json.jinja")
proposal_body = proposal_template.render(cert=os.path.basename(new_user.cert_path))
signed_statement = signing.create_cose_sign1(
proposal_body.encode(),
open(identity.key, encoding="utf-8").read(),
open(identity.cert, encoding="utf-8").read(),
{"ccf.gov.msg.type": "proposal"},
)
with primary.client() as c:
r = c.post(
"/gov/proposals",
body=signed_statement,
headers={"content-type": "application/cose"},
)
assert r.status_code == 200
proposal_id = r.body.json()["proposal_id"]
def vote(body):
return {"ballot": f"export function vote (proposal, proposer_id) {{ {body} }}"}
ballot_yes = vote("return true")
signed_ballot0 = signing.create_cose_sign1(
json.dumps(ballot_yes).encode(),
open(identity.key, encoding="utf-8").read(),
open(identity.cert, encoding="utf-8").read(),
{"ccf.gov.msg.type": "ballot", "ccf.gov.msg.proposal_id": proposal_id},
)
with primary.client() as c:
r = c.post(
f"/gov/proposals/{proposal_id}/ballots",
body=signed_ballot0,
headers={"content-type": "application/cose"},
)
assert r.status_code == 200
signed_ballot1 = signing.create_cose_sign1(
json.dumps(ballot_yes).encode(),
open(other_identity.key, encoding="utf-8").read(),
open(other_identity.cert, encoding="utf-8").read(),
{"ccf.gov.msg.type": "ballot", "ccf.gov.msg.proposal_id": proposal_id},
)
with primary.client() as c:
r = c.post(
f"/gov/proposals/{proposal_id}/ballots",
body=signed_ballot1,
headers={"content-type": "application/cose"},
)
assert r.status_code == 200
assert r.body.json()["state"] == "Accepted"
return network
@reqs.description("Test COSE withdraw")
def test_cose_withdrawal(network, args):
primary, _ = network.find_primary()
identity = network.identity("member0")
new_user_local_id = "alice"
new_user = network.create_user(new_user_local_id, args.participants_curve)
template_loader = jinja2.ChoiceLoader(
[
jinja2.FileSystemLoader(args.jinja_templates_path),
jinja2.FileSystemLoader(os.path.dirname(new_user.cert_path)),
]
)
template_env = jinja2.Environment(
loader=template_loader, undefined=jinja2.StrictUndefined
)
proposal_template = template_env.get_template("set_user_proposal.json.jinja")
proposal_body = proposal_template.render(cert=os.path.basename(new_user.cert_path))
signed_statement = signing.create_cose_sign1(
proposal_body.encode(),
open(identity.key, encoding="utf-8").read(),
open(identity.cert, encoding="utf-8").read(),
{"ccf.gov.msg.type": "proposal"},
)
with primary.client() as c:
r = c.post(
"/gov/proposals",
body=signed_statement,
headers={"content-type": "application/cose"},
)
assert r.status_code == 200
proposal_id = r.body.json()["proposal_id"]
signed_withdrawal = signing.create_cose_sign1(
b"",
open(identity.key, encoding="utf-8").read(),
open(identity.cert, encoding="utf-8").read(),
{"ccf.gov.msg.type": "withdrawal", "ccf.gov.msg.proposal_id": proposal_id},
)
with primary.client() as c:
r = c.post(
f"/gov/proposals/{proposal_id}/withdraw",
body=signed_withdrawal,
headers={"content-type": "application/cose"},
)
assert r.status_code == 200
return network
def gov(args):
@ -654,7 +813,11 @@ def gov(args):
test_all_nodes_cert_renewal(network, args)
test_service_cert_renewal(network, args)
test_service_cert_renewal_extended(network, args)
test_cose_auth(network, args)
if args.authenticate_session != "COSE":
test_cose_auth(network, args)
test_cose_ack(network, args)
test_cose_proposal(network, args)
test_cose_withdrawal(network, args)
def js_gov(args):
@ -689,6 +852,15 @@ if __name__ == "__main__":
infra.log_capture.COLORS = False
cr.add(
"session_coseauth",
gov,
package="samples/apps/logging/liblogging",
nodes=infra.e2e_args.max_nodes(cr.args, f=0),
initial_user_count=3,
authenticate_session="COSE",
)
cr.add(
"session_auth",
gov,

Просмотреть файл

@ -16,6 +16,7 @@ from loguru import logger as LOG
import suite.test_requirements as reqs
import ccf.read_ledger
import infra.logging_app as app
import governance
def check_operations(ledger, operations):
@ -205,6 +206,8 @@ def run(args):
(new_member_proposal.proposal_id, member.service_id, "withdraw")
)
governance.test_cose_proposal(network, args)
# Force ledger flush of all transactions so far
network.get_latest_ledger_public_state()
ledger = ccf.ledger.Ledger(ledger_directories)

Просмотреть файл

@ -28,6 +28,7 @@ from loguru import logger as LOG # type: ignore
import infra.commit
from infra.log_capture import flush_info
import infra.signing
class HttpSig(httpx.Auth):
@ -87,6 +88,7 @@ DEFAULT_COMMIT_TIMEOUT_SEC = 3
CONTENT_TYPE_TEXT = "text/plain"
CONTENT_TYPE_JSON = "application/json"
CONTENT_TYPE_BINARY = "application/octet-stream"
CONTENT_TYPE_COSE = "application/cose"
@dataclass
@ -304,6 +306,7 @@ class CurlClient:
ca=None,
session_auth=None,
signing_auth=None,
cose_signing_auth=None,
common_headers=None,
**kwargs,
):
@ -425,6 +428,7 @@ class HttpxClient:
ca: str,
session_auth: Optional[Identity] = None,
signing_auth: Optional[Identity] = None,
cose_signing_auth: Optional[Identity] = None,
common_headers: Optional[dict] = None,
**kwargs,
):
@ -432,6 +436,7 @@ class HttpxClient:
self.ca = ca
self.session_auth = session_auth
self.signing_auth = signing_auth
self.cose_signing_auth = cose_signing_auth
self.common_headers = common_headers
self.key_id = None
cert = None
@ -442,8 +447,9 @@ class HttpxClient:
self.protocol = kwargs.get("protocol")
kwargs.pop("protocol")
self.session = httpx.Client(verify=self.ca, cert=cert, **kwargs)
if self.signing_auth:
with open(self.signing_auth.cert, encoding="utf-8") as cert_file:
sig_auth = signing_auth or cose_signing_auth
if sig_auth:
with open(sig_auth.cert, encoding="utf-8") as cert_file:
self.key_id = (
x509.load_pem_x509_certificate(
cert_file.read().encode(), default_backend()
@ -503,6 +509,29 @@ class HttpxClient:
if not "content-type" in request.headers and len(request.body) > 0:
extra_headers["content-type"] = content_type
if self.cose_signing_auth is not None:
key = open(self.cose_signing_auth.key, encoding="utf-8").read()
cert = open(self.cose_signing_auth.cert, encoding="utf-8").read()
phdr = {}
if request.path.endswith("gov/ack/update_state_digest"):
phdr["ccf.gov.msg.type"] = "state_digest"
elif request.path.endswith("gov/ack"):
phdr["ccf.gov.msg.type"] = "ack"
elif request.path.endswith("gov/proposals"):
phdr["ccf.gov.msg.type"] = "proposal"
elif request.path.endswith("/ballots"):
pid = request.path.split("/")[-2]
phdr["ccf.gov.msg.type"] = "ballot"
phdr["ccf.gov.msg.proposal_id"] = pid
elif request.path.endswith("/withdraw"):
pid = request.path.split("/")[-2]
phdr["ccf.gov.msg.type"] = "withdrawal"
phdr["ccf.gov.msg.proposal_id"] = pid
request_body = infra.signing.create_cose_sign1(
request_body or b"", key, cert, phdr
)
extra_headers["content-type"] = CONTENT_TYPE_COSE
try:
response = self.session.request(
request.http_verb,
@ -571,6 +600,7 @@ class CCFClient:
ca: str,
session_auth: Optional[Identity] = None,
signing_auth: Optional[Identity] = None,
cose_signing_auth: Optional[Identity] = None,
connection_timeout: int = DEFAULT_CONNECTION_TIMEOUT_SEC,
description: Optional[str] = None,
curl: bool = False,
@ -584,6 +614,7 @@ class CCFClient:
self.is_connected = False
self.auth = bool(session_auth)
self.sign = bool(signing_auth)
self.cose = bool(cose_signing_auth)
impl_type = CCFClient.default_impl_type
@ -591,7 +622,13 @@ class CCFClient:
impl_type = CurlClient
self.client_impl = impl_type(
self.hostname, ca, session_auth, signing_auth, common_headers, **kwargs
self.hostname,
ca,
session_auth,
signing_auth,
cose_signing_auth,
common_headers,
**kwargs,
)
def _response(self, response: Response) -> Response:

Просмотреть файл

@ -112,6 +112,8 @@ class Member:
LOG.info(f"Member {self.local_id} created: {self.service_id}")
def auth(self, write=False):
if self.authenticate_session == "COSE":
return (None, None, self.local_id)
if self.authenticate_session:
if write:
return (self.local_id, self.local_id)

Просмотреть файл

@ -552,6 +552,9 @@ class Node:
def signing_auth(self, name=None):
return {"signing_auth": self.identity(name)}
def cose_signing_auth(self, name=None):
return {"cose_signing_auth": self.identity(name)}
def get_public_rpc_host(
self, interface_name=infra.interfaces.PRIMARY_RPC_INTERFACE
):
@ -597,6 +600,7 @@ class Node:
self,
identity=None,
signing_identity=None,
cose_signing_identity=None,
interface_name=infra.interfaces.PRIMARY_RPC_INTERFACE,
verify_ca=True,
**kwargs,
@ -627,9 +631,10 @@ class Node:
akwargs["http2"] = True
akwargs.update(self.session_auth(identity))
akwargs.update(self.signing_auth(signing_identity))
akwargs.update(self.cose_signing_auth(cose_signing_identity))
akwargs[
"description"
] = f"[{self.local_node_id}|{identity or ''}|{signing_identity or ''}]"
] = f"[{self.local_node_id}|{identity or ''}|{signing_identity or ''}|{cose_signing_identity or ''}]"
akwargs.update(kwargs)
if self.curl:

198
tests/infra/signing.py Normal file
Просмотреть файл

@ -0,0 +1,198 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.
from typing import Optional
import cbor2
import cose.headers
from cose.keys.ec2 import EC2Key
from cose.keys.curves import P256, P384, P521
from cose.keys.keyparam import EC2KpCurve, EC2KpX, EC2KpY, EC2KpD
from cose.messages import Sign1Message
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey,
EllipticCurvePublicKey,
)
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import load_pem_private_key
Pem = str
def from_cryptography_eckey_obj(ext_key) -> EC2Key:
"""
Returns an initialized COSE Key object of type EC2Key.
:param ext_key: Python cryptography key.
:return: an initialized EC key
"""
if hasattr(ext_key, "private_numbers"):
priv_nums = ext_key.private_numbers()
pub_nums = priv_nums.public_numbers
else:
priv_nums = None
pub_nums = ext_key.public_numbers()
if pub_nums.curve.name == "secp256r1":
curve = P256
elif pub_nums.curve.name == "secp384r1":
curve = P384
elif pub_nums.curve.name == "secp521r1":
curve = P521
else:
raise NotImplementedError("unsupported curve")
cose_key = {}
if pub_nums:
cose_key.update(
{
EC2KpCurve: curve,
EC2KpX: pub_nums.x.to_bytes(curve.size, "big"),
EC2KpY: pub_nums.y.to_bytes(curve.size, "big"),
}
)
if priv_nums:
cose_key.update(
{
EC2KpD: priv_nums.private_value.to_bytes(curve.size, "big"),
}
)
return EC2Key.from_dict(cose_key)
def default_algorithm_for_key(key) -> str:
"""
Get the default algorithm for a given key, based on its
type and parameters.
"""
if isinstance(key, EllipticCurvePublicKey):
if isinstance(key.curve, ec.SECP256R1):
return "ES256"
elif isinstance(key.curve, ec.SECP384R1):
return "ES384"
elif isinstance(key.curve, ec.SECP521R1):
return "ES512"
else:
raise NotImplementedError("unsupported curve")
else:
raise NotImplementedError("unsupported key type")
def get_priv_key_type(priv_pem: str) -> str:
key = load_pem_private_key(priv_pem.encode("ascii"), None, default_backend())
if isinstance(key, EllipticCurvePrivateKey):
return "ec"
raise NotImplementedError("unsupported key type")
def cert_fingerprint(cert_pem: Pem):
cert = load_pem_x509_certificate(cert_pem.encode("ascii"), default_backend())
return cert.fingerprint(hashes.SHA256()).hex().encode("utf-8")
def create_cose_sign1(
payload: bytes,
key_priv_pem: Pem,
cert_pem: Pem,
additional_headers: Optional[dict] = None,
) -> bytes:
key_type = get_priv_key_type(key_priv_pem)
cert = load_pem_x509_certificate(cert_pem.encode("ascii"), default_backend())
alg = default_algorithm_for_key(cert.public_key())
kid = cert_fingerprint(cert_pem)
headers = {cose.headers.Algorithm: alg, cose.headers.KID: kid}
headers.update(additional_headers or {})
msg = Sign1Message(phdr=headers, payload=payload)
key = load_pem_private_key(key_priv_pem.encode("ascii"), None, default_backend())
if key_type == "ec":
cose_key = from_cryptography_eckey_obj(key)
else:
raise NotImplementedError("unsupported key type")
msg.key = cose_key
return msg.encode()
def get_cert_key_type(cert_pem: str) -> str:
cert = load_pem_x509_certificate(cert_pem.encode("ascii"), default_backend())
if isinstance(cert.public_key(), EllipticCurvePublicKey):
return "ec"
raise NotImplementedError("unsupported key type")
def verify_cose_sign1(buf: bytes, cert_pem: str):
key_type = get_cert_key_type(cert_pem)
cert = load_pem_x509_certificate(cert_pem.encode("ascii"), default_backend())
key = cert.public_key()
if key_type == "ec":
cose_key = from_cryptography_eckey_obj(key)
else:
raise NotImplementedError("unsupported key type")
msg = Sign1Message.decode(buf)
msg.key = cose_key
if not msg.verify_signature():
raise ValueError("signature is invalid")
return msg
def detach_content(msg: bytes):
m = cbor2.loads(msg)
content = m.value[2]
m.value[2] = None
return content, cbor2.dumps(m)
def attach_content(content, detached_envelope):
m = cbor2.loads(detached_envelope)
m.value[2] = content
return cbor2.dumps(m)
PRIV = """-----BEGIN EC PARAMETERS-----
BgUrgQQAIg==
-----END EC PARAMETERS-----
-----BEGIN EC PRIVATE KEY-----
MIGkAgEBBDDMwIszb3ZmKpeHq/vPoz6qnxheI89T2IZpKFQHJwQrvuaFFLDUKK9Z
jKRMshAeALagBwYFK4EEACKhZANiAAQ38JreTF2uKVaTKBd7fAkIy2bg5U6T0O+H
wcxJOLgqK+fwidnVlPG+GQUwIj6ik7Xp/0Ig7RVSAyAjcpYWL4dHU5gJ/g9PruHz
cnmFtP88dARPH2EKy0n/iGh9yXD3bXw=
-----END EC PRIVATE KEY-----
"""
PUB = """-----BEGIN CERTIFICATE-----
MIIBtjCCATygAwIBAgIUJCUauYlNsJ76zOUomey4cF7F+pUwCgYIKoZIzj0EAwMw
EjEQMA4GA1UEAwwHbWVtYmVyMDAeFw0yMjA5MDYxMzQ2NDlaFw0yMzA5MDYxMzQ2
NDlaMBIxEDAOBgNVBAMMB21lbWJlcjAwdjAQBgcqhkjOPQIBBgUrgQQAIgNiAAQ3
8JreTF2uKVaTKBd7fAkIy2bg5U6T0O+HwcxJOLgqK+fwidnVlPG+GQUwIj6ik7Xp
/0Ig7RVSAyAjcpYWL4dHU5gJ/g9PruHzcnmFtP88dARPH2EKy0n/iGh9yXD3bXyj
UzBRMB0GA1UdDgQWBBTpme2NGI1y3OY8XYT5XwcuuvG55jAfBgNVHSMEGDAWgBTp
me2NGI1y3OY8XYT5XwcuuvG55jAPBgNVHRMBAf8EBTADAQH/MAoGCCqGSM49BAMD
A2gAMGUCMDg1QddcE5YFrcHmFvyXW2s7LaV0NYx24lwImrgWXQTOv7iNXAfrogzP
CQxyHqkSxgIxANmkmLCojf5NCvwxI5tf37i6zGQ0c9zR0P9b4FtcznEtrbzmXfdJ
b2H04E57XZmVdg==
-----END CERTIFICATE-----
"""
if __name__ == "__main__":
signed_statement = create_cose_sign1(
b"governance js here", PRIV, PUB, {"ccf_governance_action": "proposal"}
)
msg = verify_cose_sign1(signed_statement, PUB)
assert msg.phdr[cose.headers.KID] == cert_fingerprint(PUB), (
msg.phdr[cose.headers.KID],
cert_fingerprint(PUB),
)
content, detached_envelope = detach_content(signed_statement)
signed_statement = attach_content(content, detached_envelope)
msg = verify_cose_sign1(signed_statement, PUB)
signed_statement = create_cose_sign1(
b"governance js here",
PRIV,
PUB,
{"ccf_governance_action": "proposal", "ccf_governance_proposal_id": "12345"},
)

Просмотреть файл

@ -24,17 +24,18 @@ def test_missing_signature_header(network, args):
www_auth = "www-authenticate"
assert www_auth in r.headers, r.headers
auth_header = r.headers[www_auth]
assert auth_header.startswith("Signature"), auth_header
elements = {
e[0].strip(): e[1]
for e in (element.split("=") for element in auth_header.split(","))
}
assert "headers" in elements, elements
required_headers = elements["headers"]
assert required_headers.startswith('"'), required_headers
assert required_headers.endswith('"'), required_headers
assert "(request-target)" in required_headers, required_headers
assert "digest" in required_headers, required_headers
if auth_header != 'COSE-SIGN1 realm="Signed request access"':
assert auth_header.startswith("Signature"), auth_header
elements = {
e[0].strip(): e[1]
for e in (element.split("=") for element in auth_header.split(","))
}
assert "headers" in elements, elements
required_headers = elements["headers"]
assert required_headers.startswith('"'), required_headers
assert required_headers.endswith('"'), required_headers
assert "(request-target)" in required_headers, required_headers
assert "digest" in required_headers, required_headers
return network

Просмотреть файл

@ -123,6 +123,8 @@ all_tests_suite = [
# governance
governance.test_each_node_cert_renewal,
governance.test_service_cert_renewal,
governance.test_cose_ack,
governance.test_cose_withdrawal,
# e2e_operations:
e2e_operations.test_forced_ledger_chunk,
e2e_operations.test_forced_snapshot,