gecko-dev/taskcluster/scripts/misc/fetch-content

346 строки
11 KiB
Python
Executable File

#!/usr/bin/python3 -u
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import argparse
import bz2
import concurrent.futures
import gzip
import hashlib
import json
import lzma
import multiprocessing
import os
import pathlib
import subprocess
import sys
import tempfile
import time
import urllib.request
try:
import zstandard
except ImportError:
zstandard = None
PUBLIC_ARTIFACT_URL = ('https://queue.taskcluster.net/v1/task/{task}/artifacts/'
'{artifact}')
PRIVATE_ARTIFACT_URL = ('https://taskcluster/queue/v1/task/{task}/artifacts/'
'{artifact}')
CONCURRENCY = multiprocessing.cpu_count()
def log(msg):
print(msg, file=sys.stderr)
sys.stderr.flush()
class IntegrityError(Exception):
"""Represents an integrity error when downloading a URL."""
def stream_download(url, sha256=None, size=None):
"""Download a URL to a generator, optionally with content verification.
If ``sha256`` or ``size`` are defined, the downloaded URL will be
validated against those requirements and ``IntegrityError`` will be
raised if expectations do not match.
Because verification cannot occur until the file is completely downloaded
it is recommended for consumers to not do anything meaningful with the
data if content verification is being used. To securely handle retrieved
content, it should be streamed to a file or memory and only operated
on after the generator is exhausted without raising.
"""
log('Downloading %s' % url)
h = hashlib.sha256()
length = 0
t0 = time.time()
with urllib.request.urlopen(url) as fh:
if not url.endswith('.gz') and fh.info().get('Content-Encoding') == 'gzip':
fh = gzip.GzipFile(fileobj=fh)
while True:
chunk = fh.read(65536)
if not chunk:
break
h.update(chunk)
length += len(chunk)
yield chunk
duration = time.time() - t0
digest = h.hexdigest()
log('%s resolved to %d bytes with sha256 %s in %.3fs' % (
url, length, digest, duration))
if size:
if size == length:
log('Verified size of %s' % url)
else:
raise IntegrityError('size mismatch on %s: wanted %d; got %d' % (
url, size, length))
if sha256:
if digest == sha256:
log('Verified sha256 integrity of %s' % url)
else:
raise IntegrityError('sha256 mismatch on %s: wanted %s; got %s' % (
url, sha256, digest))
def download_to_path(url, path, sha256=None, size=None):
"""Download a URL to a filesystem path, possibly with verification."""
# We download to a temporary file and rename at the end so there's
# no chance of the final file being partially written or containing
# bad data.
try:
path.unlink()
except FileNotFoundError:
pass
tmp = path.with_name('%s.tmp' % path.name)
log('Downloading %s to %s' % (url, tmp))
try:
with tmp.open('wb') as fh:
for chunk in stream_download(url, sha256=sha256, size=size):
fh.write(chunk)
log('Renaming to %s' % path)
tmp.rename(path)
except IntegrityError:
tmp.unlink()
raise
def gpg_verify_path(path: pathlib.Path, public_key_data: bytes,
signature_data: bytes):
"""Verify that a filesystem path verifies using GPG.
Takes a Path defining a file to verify. ``public_key_data`` contains
bytes with GPG public key data. ``signature_data`` contains a signed
GPG document to use with ``gpg --verify``.
"""
log('Validating GPG signature of %s' % path)
log('GPG key data:\n%s' % public_key_data.decode('ascii'))
with tempfile.TemporaryDirectory() as td:
try:
# --batch since we're running unattended.
gpg_args = ['gpg', '--homedir', td, '--batch']
log('Importing GPG key...')
subprocess.run(gpg_args + ['--import'],
input=public_key_data,
check=True)
log('Verifying GPG signature...')
subprocess.run(gpg_args + ['--verify', '-', '%s' % path],
input=signature_data,
check=True)
log('GPG signature verified!')
finally:
# There is a race between the agent self-terminating and
# shutil.rmtree() from the temporary directory cleanup that can
# lead to exceptions. Kill the agent before cleanup to prevent this.
env = dict(os.environ)
env['GNUPGHOME'] = td
subprocess.run(['gpgconf', '--kill', 'gpg-agent'], env=env)
def archive_type(path: pathlib.Path):
"""Attempt to identify a path as an extractable archive."""
if path.suffixes[-2:-1] == ['.tar']:
return 'tar'
elif path.suffix == '.zip':
return 'zip'
else:
return None
def extract_archive(path, dest_dir, typ):
"""Extract an archive to a destination directory."""
# We pipe input to the decompressor program so that we can apply
# custom decompressors that the program may not know about.
if typ == 'tar':
if path.suffix == '.bz2':
ifh = bz2.open(str(path), 'rb')
elif path.suffix == '.gz':
ifh = gzip.open(str(path), 'rb')
elif path.suffix == '.xz':
ifh = lzma.open(str(path), 'rb')
elif path.suffix == '.zst':
if not zstandard:
raise ValueError('zstandard Python package not available')
dctx = zstandard.ZstdDecompressor()
ifh = dctx.stream_reader(path.open('rb'))
elif path.suffix == '.tar':
ifh = path.open('rb')
else:
raise ValueError('unknown archive format for tar file: %s' % path)
args = ['tar', 'xf', '-']
pipe_stdin = True
elif typ == 'zip':
# unzip from stdin has wonky behavior. We don't use a pipe for it.
ifh = open(os.devnull, 'rb')
args = ['unzip', '-o', str(path)]
pipe_stdin = False
else:
raise ValueError('unknown archive format: %s' % path)
log('Extracting %s to %s using %r' % (path, dest_dir, args))
t0 = time.time()
with ifh, subprocess.Popen(args, cwd=str(dest_dir), bufsize=0,
stdin=subprocess.PIPE) as p:
while True:
if not pipe_stdin:
break
chunk = ifh.read(131072)
if not chunk:
break
p.stdin.write(chunk)
if p.returncode:
raise Exception('%r exited %d' % (args, p.returncode))
log('%s extracted in %.3fs' % (path, time.time() - t0))
def fetch_and_extract(url, dest_dir, extract=True, sha256=None, size=None):
"""Fetch a URL and extract it to a destination path.
If the downloaded URL is an archive, it is extracted automatically
and the archive is deleted. Otherwise the file remains in place in
the destination directory.
"""
basename = url.split('/')[-1]
dest_path = dest_dir / basename
download_to_path(url, dest_path, sha256=sha256, size=size)
if not extract:
return
typ = archive_type(dest_path)
if typ:
extract_archive(dest_path, dest_dir, typ)
log('Removing %s' % dest_path)
dest_path.unlink()
def fetch_urls(downloads):
"""Fetch URLs pairs to a pathlib.Path."""
with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e:
fs = []
for download in downloads:
fs.append(e.submit(fetch_and_extract, *download))
for f in fs:
f.result()
def command_static_url(args):
gpg_sig_url = args.gpg_sig_url
gpg_env_key = args.gpg_key_env
if bool(gpg_sig_url) != bool(gpg_env_key):
print('--gpg-sig-url and --gpg-key-env must both be defined')
return 1
if gpg_sig_url:
gpg_signature = b''.join(stream_download(gpg_sig_url))
gpg_key = os.environb[gpg_env_key.encode('ascii')]
dest = pathlib.Path(args.dest)
dest.parent.mkdir(parents=True, exist_ok=True)
try:
download_to_path(args.url, dest, sha256=args.sha256, size=args.size)
if gpg_sig_url:
gpg_verify_path(dest, gpg_key, gpg_signature)
except Exception:
try:
dest.unlink()
except FileNotFoundError:
pass
raise
def command_task_artifacts(args):
fetches = json.loads(os.environ['MOZ_FETCHES'])
downloads = []
for fetch in fetches:
extdir = pathlib.Path(args.dest)
if 'dest' in fetch:
extdir = extdir.joinpath(fetch['dest'])
extdir.mkdir(parents=True, exist_ok=True)
if fetch['artifact'].startswith('public/'):
url = PUBLIC_ARTIFACT_URL.format(task=fetch['task'],
artifact=fetch['artifact'])
else:
url = PRIVATE_ARTIFACT_URL.format(task=fetch['task'],
artifact=fetch['artifact'])
downloads.append((url, extdir, fetch['extract']))
fetch_urls(downloads)
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(title='sub commands')
url = subparsers.add_parser('static-url', help='Download a static URL')
url.set_defaults(func=command_static_url)
url.add_argument('--sha256', required=True,
help='SHA-256 of downloaded content')
url.add_argument('--size', required=True, type=int,
help='Size of downloaded content, in bytes')
url.add_argument('--gpg-sig-url',
help='URL containing signed GPG document validating '
'URL to fetch')
url.add_argument('--gpg-key-env',
help='Environment variable containing GPG key to validate')
url.add_argument('url', help='URL to fetch')
url.add_argument('dest', help='Destination path')
artifacts = subparsers.add_parser('task-artifacts',
help='Fetch task artifacts')
artifacts.set_defaults(func=command_task_artifacts)
artifacts.add_argument('-d', '--dest', default=os.environ.get('MOZ_FETCHES_DIR'),
help='Destination directory which will contain all '
'artifacts (defaults to $MOZ_FETCHES_DIR)')
args = parser.parse_args()
if not args.dest:
parser.error('no destination directory specified, either pass in --dest '
'or set $MOZ_FETCHES_DIR')
return args.func(args)
if __name__ == '__main__':
sys.exit(main())