moz_kinto_publisher: extract subject and spki hash from the certificate when they are not in the remote settings record

This commit is contained in:
John M. Schanck 2024-11-01 10:02:49 -07:00 коммит произвёл John Schanck
Родитель 006b178cb3
Коммит d84238ee5f
1 изменённых файлов: 30 добавлений и 11 удалений

Просмотреть файл

@ -16,6 +16,7 @@ import requests
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from kinto_http import Client from kinto_http import Client
from kinto_http.exceptions import KintoException from kinto_http.exceptions import KintoException
from kinto_http.patch_type import BasicPatch from kinto_http.patch_type import BasicPatch
@ -300,33 +301,37 @@ class Intermediate:
[ [
"derHash", "derHash",
"id", "id",
"pubKeyHash",
"subject",
], ],
kwargs, kwargs,
): ):
raise parseError raise parseError
try: try:
self.pubKeyHash = base64.b64decode( if "pubKeyHash" in kwargs:
kwargs["pubKeyHash"], altchars="-_", validate=True self.pubKeyHash = base64.b64decode(
) # sha256 of the SPKI kwargs["pubKeyHash"], altchars="-_", validate=True
) # sha256 of the SPKI
else:
self.pubKeyHash = None
except base64.binascii.Error:
raise parseError
if self.pubKeyHash and len(self.pubKeyHash) != 32:
raise IntermediateRecordError(f"Invalid pubkey hash: {kwargs}")
try:
if "derHash" in kwargs: if "derHash" in kwargs:
self.derHash = base64.b64decode( self.derHash = base64.b64decode(
kwargs["derHash"], altchars="-_", validate=True kwargs["derHash"], altchars="-_", validate=True
) )
else:
self.derHash = None
except base64.binascii.Error: except base64.binascii.Error:
raise parseError raise parseError
if len(self.pubKeyHash) != 32:
raise IntermediateRecordError(f"Invalid pubkey hash: {kwargs}")
if self.derHash and len(self.derHash) != 32: if self.derHash and len(self.derHash) != 32:
raise IntermediateRecordError(f"Invalid DER hash. {kwargs}") raise IntermediateRecordError(f"Invalid DER hash. {kwargs}")
self.subject = kwargs["subject"]
if "pem" in kwargs: if "pem" in kwargs:
self.set_pem(kwargs["pem"]) self.set_pem(kwargs["pem"])
@ -376,7 +381,6 @@ class Intermediate:
self.pemData = pem_data self.pemData = pem_data
self.pemHash = hashlib.sha256(pem_data.encode("utf-8")).hexdigest() self.pemHash = hashlib.sha256(pem_data.encode("utf-8")).hexdigest()
derCert = asciiPemToBinaryDer(pem_data) derCert = asciiPemToBinaryDer(pem_data)
self.derHash = hashlib.sha256(derCert).digest()
try: try:
self.cert = x509.load_pem_x509_certificate( self.cert = x509.load_pem_x509_certificate(
pem_data.encode("utf-8"), default_backend() pem_data.encode("utf-8"), default_backend()
@ -384,6 +388,21 @@ class Intermediate:
except Exception as e: except Exception as e:
raise IntermediateRecordError("Cannot parse PEM data: {}".format(e)) raise IntermediateRecordError("Cannot parse PEM data: {}".format(e))
derHash = hashlib.sha256(self.cert.public_bytes(Encoding.DER)).digest()
if self.derHash and self.derHash != derHash:
raise IntermediateRecordError("DER hash does not match")
self.derHash = derHash
self.subject = self.cert.subject.rfc4514_string()
derSpki = self.cert.public_key().public_bytes(
encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo
)
spkiHash = hashlib.sha256(derSpki).digest()
if self.pubKeyHash and self.pubKeyHash != spkiHash:
raise IntermediateRecordError("SPKI hash does not match")
self.pubKeyHash = spkiHash
def download_pem(self, kinto_client): def download_pem(self, kinto_client):
if not self.pemAttachment: if not self.pemAttachment:
raise Exception("pemAttachment not set") raise Exception("pemAttachment not set")