Bug 1346359 - Script to submit transparency certs to CT r=rail

MozReview-Commit-ID: F8uPgoJTFr

--HG--
extra : rebase_source : d45e3db6ff3836057cb3934d2e8d2956f715c8c8
This commit is contained in:
Richard Barnes 2017-03-10 17:48:37 -05:00
Родитель 81ad69be7e
Коммит b698283963
3 изменённых файлов: 242 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,130 @@
#!/usr/bin/env python
import struct
import base64
class SignedCertificateTimestamp:
"""
Represents a Signed Certificate Timestamp from a Certificate Transparency
log, which is how the log indicates that it has seen and logged a
certificate. The format for SCTs in RFC 6962 is as follows:
struct {
Version sct_version;
LogID id;
uint64 timestamp;
CtExtensions extensions;
digitally-signed struct {
Version sct_version;
SignatureType signature_type = certificate_timestamp;
uint64 timestamp;
LogEntryType entry_type;
select(entry_type) {
case x509_entry: ASN.1Cert;
case precert_entry: PreCert;
} signed_entry;
CtExtensions extensions;
};
} SignedCertificateTimestamp;
Here, the "digitally-signed" is just a fixed struct encoding the algorithm
and signature:
struct {
SignatureAndHashAlgorithm algorithm;
opaque signature<0..2^16-1>;
} DigitallySigned;
In other words the whole serialized SCT comprises:
- 1 octet of version = v1 = resp["sct_version"]
- 32 octets of LogID = resp["id"]
- 8 octets of timestamp = resp["timestamp"]
- 2 octets of extensions length + resp["extensions"]
- 2+2+N octets of signature
These are built from RFC 6962 API responses, which are encoded in JSON
object of the following form:
{
"sct_version": 0,
"id": "...",
"timestamp": ...,
"extensions": "",
"signature": "...",
}
The "signature" field contains the whole DigitallySigned struct.
"""
# We only support SCTs from RFC 6962 logs
SCT_VERSION = 0
def __init__(self, response_json=None):
self.version = SignedCertificateTimestamp.SCT_VERSION
if response_json is not None:
if response_json['sct_version'] is not SignedCertificateTimestamp.SCT_VERSION:
raise Exception('Incorrect version for SCT')
self.id = base64.b64decode(response_json['id'])
self.timestamp = response_json['timestamp']
self.signature = base64.b64decode(response_json['signature'])
self.extensions = b''
if 'extensions' in response_json:
self.extensions = base64.b64decode(response_json['extensions'])
@staticmethod
def from_rfc6962(serialized):
start = 0
read = 1 + 32 + 8
if len(serialized) < start + read:
raise Exception('SCT too short for version, log ID, and timestamp')
version, = struct.unpack('B', serialized[0])
log_id = serialized[1:1+32]
timestamp, = struct.unpack('!Q', serialized[1+32:1+32+8])
start += read
if version is not SignedCertificateTimestamp.SCT_VERSION:
raise Exception('Incorrect version for SCT')
read = 2
if len(serialized) < start + read:
raise Exception('SCT too short for extension length')
ext_len, = struct.unpack('!H', serialized[start:start+read])
start += read
read = ext_len
if len(serialized) < start + read:
raise Exception('SCT too short for extensions')
extensions = serialized[start:read]
start += read
read = 4
if len(serialized) < start + read:
raise Exception('SCT too short for signature header')
alg, sig_len, = struct.unpack('!HH', serialized[start:start+read])
start += read
read = sig_len
if len(serialized) < start + read:
raise Exception('SCT too short for signature')
sig = serialized[start:start+read]
sct = SignedCertificateTimestamp()
sct.id = log_id
sct.timestamp = timestamp
sct.extensions = extensions
sct.signature = struct.pack('!HH', alg, sig_len) + sig
return sct
def to_rfc6962(self):
version = struct.pack("B", self.version)
timestamp = struct.pack("!Q", self.timestamp)
ext_len = struct.pack("!H", len(self.extensions))
return version + self.id + timestamp + \
ext_len + self.extensions + self.signature

Просмотреть файл

@ -0,0 +1,81 @@
import os
import sys
import base64
from OpenSSL import crypto
sys.path.insert(1, os.path.dirname(os.path.dirname(sys.path[0])))
from mozharness.base.script import BaseScript
from mozharness.base.python import VirtualenvMixin, virtualenv_config_options
from mozharness.mozilla.signed_certificate_timestamp import SignedCertificateTimestamp
class CTSubmitter(BaseScript, VirtualenvMixin):
config_options = virtualenv_config_options
config_options = [
[["--chain"], {
"dest": "chain",
"help": "URL from which to download the cert chain to be submitted to CT (in PEM format)"
}],
[["--log"], {
"dest": "log",
"help": "URL for the log to which the chain should be submitted"
}],
[["--sct"], {
"dest": "sct",
"help": "File where the SCT from the log should be written"
}],
]
def __init__(self):
BaseScript.__init__(self,
config_options=self.config_options,
config={
"virtualenv_modules": [
"pem",
"redo",
"requests",
],
"virtualenv_path": "venv",
},
require_config_file=False,
all_actions=["add-chain"],
default_actions=["add-chain"],
)
self.chain_url = self.config["chain"]
self.log_url = self.config["log"]
self.sct_filename = self.config["sct"]
def add_chain(self):
from redo import retry
import requests
import pem
def get_chain():
r = requests.get(self.chain_url)
r.raise_for_status()
return r.text
chain = retry(get_chain)
req = { "chain": [] }
chain = pem.parse(chain)
for i in range(len(chain)):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, str(chain[i]))
der = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
req["chain"].append(base64.b64encode(der))
def post_chain():
r = requests.post(self.log_url + '/ct/v1/add-chain', json=req)
r.raise_for_status()
return r.json()
resp = retry(post_chain)
sct = SignedCertificateTimestamp(resp)
self.write_to_file(self.sct_filename, sct.to_rfc6962())
if __name__ == "__main__":
myScript = CTSubmitter()
myScript.run_and_exit()

Просмотреть файл

@ -0,0 +1,31 @@
import unittest
import struct
from mozharness.mozilla.signed_certificate_timestamp import SignedCertificateTimestamp
log_id = 'pLkJkLQYWBSHuxOizGdwCjw1mAT5G9+443fNDsgN3BA='.decode('base64')
timestamp = 1483206164907
signature = 'BAMARzBFAiEAsyJov/LF1DIxurR+6xkxP/ZJzb3whHQ+1+PrJNuXfnoCIG28p1XRxkQqRprnCIDDBniKbJngig/NQnIEQ5VZOYG+'.decode('base64')
json_sct = {
'sct_version': 0,
'id': log_id.encode('base64'),
'timestamp': timestamp,
'signature': signature.encode('base64'),
}
hex_timestamp = struct.pack('!Q', timestamp).encode('hex')
hex_sct = '00' + log_id.encode('hex') + hex_timestamp + '0000' + signature.encode('hex')
binary_sct = hex_sct.decode('hex')
class TestSignedCertificateTimestamp(unittest.TestCase):
def testEncode(self):
sct = SignedCertificateTimestamp(json_sct)
self.assertEquals(sct.to_rfc6962(), binary_sct)
def testDecode(self):
sct = SignedCertificateTimestamp.from_rfc6962(binary_sct)
self.assertEquals(sct.version, 0)
self.assertEquals(sct.id, log_id)
self.assertEquals(sct.timestamp, timestamp)
self.assertEquals(sct.signature, signature)