diff --git a/taskcluster/docker/funsize-update-generator/requirements.txt b/taskcluster/docker/funsize-update-generator/requirements.txt index 217bbd994272..f1647c454e70 100644 --- a/taskcluster/docker/funsize-update-generator/requirements.txt +++ b/taskcluster/docker/funsize-update-generator/requirements.txt @@ -2,6 +2,4 @@ mar==2.1.2 backports.lzma==0.0.8 datadog==0.17.0 redo==1.6 -aiohttp==2.3.6 awscli==1.14.10 -scriptworker==6.0.0 diff --git a/taskcluster/docker/funsize-update-generator/runme.sh b/taskcluster/docker/funsize-update-generator/runme.sh index 992c4091df38..9adc671917e0 100644 --- a/taskcluster/docker/funsize-update-generator/runme.sh +++ b/taskcluster/docker/funsize-update-generator/runme.sh @@ -18,8 +18,7 @@ curl --location --retry 10 --retry-delay 10 -o /home/worker/task.json \ S3_BUCKET_AND_PATH=$(jq -r '.scopes[] | select(contains ("auth:aws-s3"))' /home/worker/task.json | awk -F: '{print $4}') # Will be empty if there's no scope for AWS S3. -if [ -n "${S3_BUCKET_AND_PATH}" ] && getent hosts taskcluster -then +if [ -n "${S3_BUCKET_AND_PATH}" ]; then # Does this parse as we expect? S3_PATH=${S3_BUCKET_AND_PATH#*/} AWS_BUCKET_NAME=${S3_BUCKET_AND_PATH%/${S3_PATH}*} diff --git a/taskcluster/docker/funsize-update-generator/scripts/funsize.py b/taskcluster/docker/funsize-update-generator/scripts/funsize.py index 64a04001c401..5fcc3b6019e6 100755 --- a/taskcluster/docker/funsize-update-generator/scripts/funsize.py +++ b/taskcluster/docker/funsize-update-generator/scripts/funsize.py @@ -5,8 +5,6 @@ from __future__ import absolute_import, division, print_function -import asyncio -import aiohttp import configparser import argparse import hashlib @@ -20,7 +18,6 @@ import requests import sh import redo -from scriptworker.utils import retry_async from mardor.reader import MarReader from mardor.signing import get_keysize @@ -28,12 +25,6 @@ from datadog import initialize, ThreadStats log = logging.getLogger(__name__) - -# Create this even when not sending metrics, so the context manager -# statements work. -ddstats = ThreadStats(namespace='releng.releases.partials') - - ALLOWED_URL_PREFIXES = [ "http://download.cdn.mozilla.net/pub/mozilla.org/firefox/nightly/", "http://download.cdn.mozilla.net/pub/firefox/nightly/", @@ -80,87 +71,48 @@ def get_secret(secret_name): return r.json().get('secret', {}) -async def retry_download(*args, **kwargs): - """Retry download() calls.""" - await retry_async( - download, - retry_exceptions=( - aiohttp.ClientError - ), - args=args, - kwargs=kwargs - ) - - -async def download(url, dest, mode=None): # noqa: E999 - log.info("Downloading %s to %s", url, dest) +@redo.retriable() +def download(url, dest, mode=None): + log.debug("Downloading %s to %s", url, dest) + r = requests.get(url) + r.raise_for_status() bytes_downloaded = 0 + with open(dest, 'wb') as fd: + for chunk in r.iter_content(4096): + fd.write(chunk) + bytes_downloaded += len(chunk) - async with aiohttp.ClientSession(raise_for_status=True) as session: - async with session.get(url) as resp: - with open(dest, 'wb') as fd: - while True: - chunk = await resp.content.read(4096) - if not chunk: - break - fd.write(chunk) - bytes_downloaded += len(chunk) + log.debug('Downloaded %s bytes', bytes_downloaded) + if 'content-length' in r.headers: + log.debug('Content-Length: %s bytes', r.headers['content-length']) + if bytes_downloaded != int(r.headers['content-length']): + raise IOError('Unexpected number of bytes downloaded') - log.debug('Downloaded %s bytes', bytes_downloaded) - if 'content-length' in resp.headers: - log.debug('Content-Length: %s bytes', resp.headers['content-length']) - if bytes_downloaded != int(resp.headers['content-length']): - raise IOError('Unexpected number of bytes downloaded') - - if mode: - log.debug("chmod %o %s", mode, dest) - os.chmod(dest, mode) + if mode: + log.debug("chmod %o %s", mode, dest) + os.chmod(dest, mode) -async def run_command(cmd, cwd='/', env=None, label=None, silent=False): - if not env: - env = dict() - process = await asyncio.create_subprocess_shell(cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - cwd=cwd, env=env) - stdout, stderr = await process.communicate() - - await process.wait() - - if silent: - return - - if not stderr: - stderr = "" - if not stdout: - stdout = "" - - label = "{}: ".format(label) - - for line in stdout.splitlines(): - log.debug("%s%s", label, line.decode('utf-8')) - for line in stderr.splitlines(): - log.warn("%s%s", label, line.decode('utf-8')) - - -async def unpack(work_env, mar, dest_dir): +def unpack(work_env, mar, dest_dir): os.mkdir(dest_dir) + unwrap_cmd = sh.Command(os.path.join(work_env.workdir, + "unwrap_full_update.pl")) log.debug("Unwrapping %s", mar) env = work_env.env if not is_lzma_compressed_mar(mar): env['MAR_OLD_FORMAT'] = '1' elif 'MAR_OLD_FORMAT' in env: del env['MAR_OLD_FORMAT'] - - cmd = "{} {}".format(work_env.paths['unwrap_full_update.pl'], mar) - await run_command(cmd, cwd=dest_dir, env=env, label=dest_dir) + out = unwrap_cmd(mar, _cwd=dest_dir, _env=env, _timeout=240, + _err_to_out=True) + if out: + log.debug(out) def find_file(directory, filename): log.debug("Searching for %s in %s", filename, directory) - for root, _, files in os.walk(directory): + for root, dirs, files in os.walk(directory): if filename in files: f = os.path.join(root, filename) log.debug("Found %s", f) @@ -168,7 +120,7 @@ def find_file(directory, filename): def get_option(directory, filename, section, option): - log.debug("Extracting [%s]: %s from %s/**/%s", section, option, directory, + log.debug("Exctracting [%s]: %s from %s/**/%s", section, option, directory, filename) f = find_file(directory, filename) config = configparser.ConfigParser() @@ -178,9 +130,9 @@ def get_option(directory, filename, section, option): return rv -async def generate_partial(work_env, from_dir, to_dir, dest_mar, channel_ids, - version, use_old_format): - log.info("Generating partial %s", dest_mar) +def generate_partial(work_env, from_dir, to_dir, dest_mar, channel_ids, + version, use_old_format): + log.debug("Generating partial %s", dest_mar) env = work_env.env env["MOZ_PRODUCT_VERSION"] = version env["MOZ_CHANNEL_ID"] = channel_ids @@ -190,9 +142,11 @@ async def generate_partial(work_env, from_dir, to_dir, dest_mar, channel_ids, del env['MAR_OLD_FORMAT'] make_incremental_update = os.path.join(work_env.workdir, "make_incremental_update.sh") - cmd = " ".join([make_incremental_update, dest_mar, from_dir, to_dir]) - - await run_command(cmd, cwd=work_env.workdir, env=env, label=dest_mar.split('/')[-1]) + out = sh.bash(make_incremental_update, dest_mar, from_dir, to_dir, + _cwd=work_env.workdir, _env=env, _timeout=900, + _err_to_out=True) + if out: + log.debug(out) def get_hash(path, hash_type="sha512"): @@ -206,42 +160,32 @@ class WorkEnv(object): def __init__(self): self.workdir = tempfile.mkdtemp() - self.paths = { - 'unwrap_full_update.pl': os.path.join(self.workdir, 'unwrap_full_update.pl'), - 'mar': os.path.join(self.workdir, 'mar'), - 'mbsdiff': os.path.join(self.workdir, 'mbsdiff') - } - async def setup(self): - await self.download_unwrap() - await self.download_martools() + def setup(self): + self.download_unwrap() + self.download_martools() - async def clone(self, workenv): - for path in workenv.paths: - if os.path.exists(self.paths[path]): - os.unlink(self.paths[path]) - os.link(workenv.paths[path], self.paths[path]) - - async def download_unwrap(self): + def download_unwrap(self): # unwrap_full_update.pl is not too sensitive to the revision url = "https://hg.mozilla.org/mozilla-central/raw-file/default/" \ "tools/update-packaging/unwrap_full_update.pl" - await retry_download(url, dest=self.paths['unwrap_full_update.pl'], mode=0o755) + download(url, dest=os.path.join(self.workdir, "unwrap_full_update.pl"), + mode=0o755) - async def download_buildsystem_bits(self, repo, revision): + def download_buildsystem_bits(self, repo, revision): prefix = "{repo}/raw-file/{revision}/tools/update-packaging" prefix = prefix.format(repo=repo, revision=revision) - for f in ('make_incremental_update.sh', 'common.sh'): + for f in ("make_incremental_update.sh", "common.sh"): url = "{prefix}/{f}".format(prefix=prefix, f=f) - await retry_download(url, dest=os.path.join(self.workdir, f), mode=0o755) + download(url, dest=os.path.join(self.workdir, f), mode=0o755) - async def download_martools(self): + def download_martools(self): # TODO: check if the tools have to be branch specific prefix = "https://ftp.mozilla.org/pub/mozilla.org/firefox/nightly/" \ "latest-mozilla-central/mar-tools/linux64" - for f in ('mar', 'mbsdiff'): + for f in ("mar", "mbsdiff"): url = "{prefix}/{f}".format(prefix=prefix, f=f) - await retry_download(url, dest=self.paths[f], mode=0o755) + download(url, dest=os.path.join(self.workdir, f), mode=0o755) def cleanup(self): shutil.rmtree(self.workdir) @@ -262,151 +206,6 @@ def verify_allowed_url(mar): )) -async def manage_partial(partial_def, work_env, filename_template, artifacts_dir, signing_certs): - """Manage the creation of partial mars based on payload.""" - for mar in (partial_def["from_mar"], partial_def["to_mar"]): - verify_allowed_url(mar) - - complete_mars = {} - use_old_format = False - - for mar_type, f in (("from", partial_def["from_mar"]), ("to", partial_def["to_mar"])): - dest = os.path.join(work_env.workdir, "{}.mar".format(mar_type)) - unpack_dir = os.path.join(work_env.workdir, mar_type) - - with ddstats.timer('mar.download.time'): - await retry_download(f, dest) - - if not os.getenv("MOZ_DISABLE_MAR_CERT_VERIFICATION"): - verify_signature(dest, signing_certs) - - complete_mars["%s_size" % mar_type] = os.path.getsize(dest) - complete_mars["%s_hash" % mar_type] = get_hash(dest) - - with ddstats.timer('mar.unpack.time'): - await unpack(work_env, dest, unpack_dir) - - if mar_type == 'from': - version = get_option(unpack_dir, filename="application.ini", - section="App", option="Version") - major = int(version.split(".")[0]) - # The updater for versions less than 56.0 requires BZ2 - # compressed MAR files - if major < 56: - use_old_format = True - log.info("Forcing BZ2 compression for %s", f) - - log.info("AV-scanning %s ...", unpack_dir) - metric_tags = [ - "platform:{}".format(partial_def['platform']), - ] - with ddstats.timer('mar.clamscan.time', tags=metric_tags): - await run_command("clamscan -r {}".format(unpack_dir), label='clamscan') - log.info("Done.") - - to_path = os.path.join(work_env.workdir, "to") - from_path = os.path.join(work_env.workdir, "from") - - mar_data = { - "ACCEPTED_MAR_CHANNEL_IDS": get_option( - to_path, filename="update-settings.ini", section="Settings", - option="ACCEPTED_MAR_CHANNEL_IDS"), - "version": get_option(to_path, filename="application.ini", - section="App", option="Version"), - "to_buildid": get_option(to_path, filename="application.ini", - section="App", option="BuildID"), - "from_buildid": get_option(from_path, filename="application.ini", - section="App", option="BuildID"), - "appName": get_option(from_path, filename="application.ini", - section="App", option="Name"), - # Use Gecko repo and rev from platform.ini, not application.ini - "repo": get_option(to_path, filename="platform.ini", section="Build", - option="SourceRepository"), - "revision": get_option(to_path, filename="platform.ini", - section="Build", option="SourceStamp"), - "from_mar": partial_def["from_mar"], - "to_mar": partial_def["to_mar"], - "platform": partial_def["platform"], - "locale": partial_def["locale"], - } - # Override ACCEPTED_MAR_CHANNEL_IDS if needed - if "ACCEPTED_MAR_CHANNEL_IDS" in os.environ: - mar_data["ACCEPTED_MAR_CHANNEL_IDS"] = os.environ["ACCEPTED_MAR_CHANNEL_IDS"] - for field in ("update_number", "previousVersion", "previousBuildNumber", - "toVersion", "toBuildNumber"): - if field in partial_def: - mar_data[field] = partial_def[field] - mar_data.update(complete_mars) - - # if branch not set explicitly use repo-name - mar_data['branch'] = partial_def.get('branch', mar_data['repo'].rstrip('/').split('/')[-1]) - - if 'dest_mar' in partial_def: - mar_name = partial_def['dest_mar'] - else: - # default to formatted name if not specified - mar_name = filename_template.format(**mar_data) - - mar_data['mar'] = mar_name - dest_mar = os.path.join(work_env.workdir, mar_name) - - # TODO: download these once - await work_env.download_buildsystem_bits(repo=mar_data["repo"], - revision=mar_data["revision"]) - - metric_tags = [ - "branch:{}".format(mar_data['branch']), - "platform:{}".format(mar_data['platform']), - # If required. Shouldn't add much useful info, but increases - # cardinality of metrics substantially, so avoided. - # "locale:{}".format(mar_data['locale']), - ] - with ddstats.timer('generate_partial.time', tags=metric_tags): - await generate_partial(work_env, from_path, to_path, dest_mar, - mar_data["ACCEPTED_MAR_CHANNEL_IDS"], - mar_data["version"], - use_old_format) - - mar_data["size"] = os.path.getsize(dest_mar) - - metric_tags.append("unit:bytes") - # Allows us to find out how many releases there were between the two, - # making buckets of the file sizes easier. - metric_tags.append("update_number:{}".format(mar_data.get('update_number', 0))) - ddstats.gauge('partial_mar_size', mar_data['size'], tags=metric_tags) - - mar_data["hash"] = get_hash(dest_mar) - - shutil.copy(dest_mar, artifacts_dir) - work_env.cleanup() - - return mar_data - - -async def async_main(args, signing_certs): - tasks = [] - - master_env = WorkEnv() - await master_env.setup() - - task = json.load(args.task_definition) - # TODO: verify task["extra"]["funsize"]["partials"] with jsonschema - for definition in task["extra"]["funsize"]["partials"]: - workenv = WorkEnv() - await workenv.clone(master_env) - tasks.append(asyncio.ensure_future(manage_partial( - partial_def=definition, - filename_template=args.filename_template, - artifacts_dir=args.artifacts_dir, - work_env=workenv, - signing_certs=signing_certs) - )) - - manifest = await asyncio.gather(*tasks) - master_env.cleanup() - return manifest - - def main(): start = time.time() @@ -428,6 +227,8 @@ def main(): logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") log.setLevel(args.log_level) + task = json.load(args.task_definition) + # TODO: verify task["extra"]["funsize"]["partials"] with jsonschema signing_certs = { 'sha1': open(args.sha1_signing_cert, 'rb').read(), @@ -443,6 +244,10 @@ def main(): if not dd_api_key and os.environ.get('DATADOG_API_SECRET'): dd_api_key = get_secret(os.environ.get('DATADOG_API_SECRET')).get('key') + # Create this even when not sending metrics, so the context manager + # statements work. + ddstats = ThreadStats(namespace='releng.releases.partials') + if dd_api_key: dd_options = { 'api_key': dd_api_key, @@ -464,29 +269,133 @@ def main(): except sh.ErrorReturnCode: log.warning("Freshclam failed, skipping DB update") - loop = asyncio.get_event_loop() - manifest = loop.run_until_complete(async_main(args, signing_certs)) - loop.close() + manifest = [] + for e in task["extra"]["funsize"]["partials"]: + for mar in (e["from_mar"], e["to_mar"]): + verify_allowed_url(mar) + + work_env = WorkEnv() + # TODO: run setup once + work_env.setup() + complete_mars = {} + use_old_format = False + for mar_type, f in (("from", e["from_mar"]), ("to", e["to_mar"])): + dest = os.path.join(work_env.workdir, "{}.mar".format(mar_type)) + unpack_dir = os.path.join(work_env.workdir, mar_type) + with ddstats.timer('mar.download.time'): + download(f, dest) + if not os.getenv("MOZ_DISABLE_MAR_CERT_VERIFICATION"): + verify_signature(dest, signing_certs) + complete_mars["%s_size" % mar_type] = os.path.getsize(dest) + complete_mars["%s_hash" % mar_type] = get_hash(dest) + with ddstats.timer('mar.unpack.time'): + unpack(work_env, dest, unpack_dir) + if mar_type == 'from': + version = get_option(unpack_dir, filename="application.ini", + section="App", option="Version") + major = int(version.split(".")[0]) + # The updater for versions less than 56.0 requires BZ2 + # compressed MAR files + if major < 56: + use_old_format = True + log.info("Forcing BZ2 compression for %s", f) + log.info("AV-scanning %s ...", unpack_dir) + metric_tags = [ + "platform:{}".format(e['platform']), + ] + with ddstats.timer('mar.clamscan.time', tags=metric_tags): + sh.clamscan("-r", unpack_dir, _timeout=600, _err_to_out=True) + log.info("Done.") + + path = os.path.join(work_env.workdir, "to") + from_path = os.path.join(work_env.workdir, "from") + mar_data = { + "ACCEPTED_MAR_CHANNEL_IDS": get_option( + path, filename="update-settings.ini", section="Settings", + option="ACCEPTED_MAR_CHANNEL_IDS"), + "version": get_option(path, filename="application.ini", + section="App", option="Version"), + "to_buildid": get_option(path, filename="application.ini", + section="App", option="BuildID"), + "from_buildid": get_option(from_path, filename="application.ini", + section="App", option="BuildID"), + "appName": get_option(from_path, filename="application.ini", + section="App", option="Name"), + # Use Gecko repo and rev from platform.ini, not application.ini + "repo": get_option(path, filename="platform.ini", section="Build", + option="SourceRepository"), + "revision": get_option(path, filename="platform.ini", + section="Build", option="SourceStamp"), + "from_mar": e["from_mar"], + "to_mar": e["to_mar"], + "platform": e["platform"], + "locale": e["locale"], + } + # Override ACCEPTED_MAR_CHANNEL_IDS if needed + if "ACCEPTED_MAR_CHANNEL_IDS" in os.environ: + mar_data["ACCEPTED_MAR_CHANNEL_IDS"] = os.environ["ACCEPTED_MAR_CHANNEL_IDS"] + for field in ("update_number", "previousVersion", + "previousBuildNumber", "toVersion", + "toBuildNumber"): + if field in e: + mar_data[field] = e[field] + mar_data.update(complete_mars) + # if branch not set explicitly use repo-name + mar_data["branch"] = e.get("branch", + mar_data["repo"].rstrip("/").split("/")[-1]) + if 'dest_mar' in e: + mar_name = e['dest_mar'] + else: + # default to formatted name if not specified + mar_name = args.filename_template.format(**mar_data) + mar_data["mar"] = mar_name + dest_mar = os.path.join(work_env.workdir, mar_name) + # TODO: download these once + work_env.download_buildsystem_bits(repo=mar_data["repo"], + revision=mar_data["revision"]) + + metric_tags = [ + "branch:{}".format(mar_data['branch']), + "platform:{}".format(mar_data['platform']), + # If required. Shouldn't add much useful info, but increases + # cardinality of metrics substantially, so avoided. + # "locale:{}".format(mar_data['locale']), + ] + + with ddstats.timer('generate_partial.time', tags=metric_tags): + generate_partial(work_env, from_path, path, dest_mar, + mar_data["ACCEPTED_MAR_CHANNEL_IDS"], + mar_data["version"], + use_old_format) + + mar_data["size"] = os.path.getsize(dest_mar) + metric_tags.append("unit:bytes") + # Allows us to find out how many releases there were between the two, + # making buckets of the file sizes easier. + metric_tags.append("update_number:{}".format(mar_data.get('update_number', 0))) + ddstats.gauge('partial_mar_size', mar_data['size'], tags=metric_tags) + + mar_data["hash"] = get_hash(dest_mar) + + shutil.copy(dest_mar, args.artifacts_dir) + work_env.cleanup() + manifest.append(mar_data) manifest_file = os.path.join(args.artifacts_dir, "manifest.json") with open(manifest_file, "w") as fp: json.dump(manifest, fp, indent=2, sort_keys=True) - log.debug("{}".format(json.dumps(manifest, indent=2, sort_keys=True))) - # Warning: Assumption that one partials task will always be for one branch. metric_tags = [ - "branch:{}".format(manifest[0]['branch']), + "branch:{}".format(mar_data['branch']), ] ddstats.timing('task_duration', time.time() - start, start, tags=metric_tags) - # Wait for all the metrics to flush. If the program ends before # they've been sent, they'll be dropped. # Should be more than the flush_interval for the ThreadStats object - if dd_api_key: - time.sleep(10) + time.sleep(10) if __name__ == '__main__':