Bug 1519472 - [taskgraph] Factor logic for adding a cache in job.common to a new function, r=tomprince

We add caches at various places in common.py. This consolidates the logic into
a re-useable function. This is in preparation for adding generic-worker cache
support.

This also adds a test. The test is not terribly useful, but I've been looking
for an excuse to lay some groundwork for further tests in the 'job' submodule.
This will do.

Differential Revision: https://phabricator.services.mozilla.com/D17689

--HG--
extra : moz-landing-system : lando
This commit is contained in:
Andrew Halberstadt 2019-02-11 22:19:52 +00:00
Родитель d3aa5063fc
Коммит 5012f75394
3 изменённых файлов: 150 добавлений и 41 удалений

Просмотреть файл

@ -15,6 +15,7 @@ skip-if = python == 3
[test_target_tasks.py]
[test_taskgraph.py]
[test_transforms_base.py]
[test_transforms_job.py]
[test_try_option_syntax.py]
[test_util_attributes.py]
[test_util_docker.py]

Просмотреть файл

@ -0,0 +1,93 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Tests for the 'job' transform subsystem.
"""
from __future__ import absolute_import, print_function, unicode_literals
import os
from copy import deepcopy
import pytest
from mozunit import main
from taskgraph import GECKO
from taskgraph.config import load_graph_config
from taskgraph.transforms import job
from taskgraph.transforms.base import TransformConfig
from taskgraph.transforms.job.common import add_cache
from taskgraph.transforms.task import payload_builders
from taskgraph.util.schema import Schema, validate_schema
from taskgraph.util.workertypes import worker_type_implementation
here = os.path.abspath(os.path.dirname(__file__))
TASK_DEFAULTS = {
'description': 'fake description',
'label': 'fake-task-label',
'run': {
'using': 'run-task',
},
}
@pytest.fixture(scope='module')
def config():
graph_config = load_graph_config(os.path.join(GECKO, 'taskcluster', 'ci'))
return TransformConfig('job_test', here, {}, {}, [], graph_config)
@pytest.fixture()
def transform(monkeypatch, config):
"""Run the job transforms on the specified task but return the inputs to
`configure_taskdesc_for_run` without executing it.
This gives test functions an easy way to generate the inputs required for
many of the `run_using` subsystems.
"""
def inner(task_input):
task = deepcopy(TASK_DEFAULTS)
task.update(task_input)
frozen_args = []
def _configure_taskdesc_for_run(*args):
frozen_args.extend(args)
monkeypatch.setattr(job, 'configure_taskdesc_for_run', _configure_taskdesc_for_run)
for func in job.transforms._transforms:
task = list(func(config, [task]))[0]
return frozen_args
return inner
@pytest.mark.parametrize('task', [
{'worker-type': 'aws-provisioner-v1/gecko-1-b-linux'},
{'worker-type': 'releng-hardware/gecko-t-win10-64-hw'},
], ids=lambda t: worker_type_implementation(t['worker-type'])[0])
def test_worker_caches(task, transform):
config, job, taskdesc, impl = transform(task)
add_cache(job, taskdesc, 'cache1', '/cache1')
add_cache(job, taskdesc, 'cache2', '/cache2', skip_untrusted=True)
if impl != 'docker-worker':
pytest.xfail("caches not implemented for '{}'".format(impl))
key = 'caches'
assert key in taskdesc['worker']
assert len(taskdesc['worker'][key]) == 2
# Create a new schema object with just the part relevant to caches.
partial_schema = Schema(payload_builders[impl].schema.schema.schema[key])
validate_schema(partial_schema, taskdesc['worker'][key], "validation error")
if __name__ == '__main__':
main()

Просмотреть файл

@ -14,27 +14,54 @@ from taskgraph.util.taskcluster import get_artifact_prefix
SECRET_SCOPE = 'secrets:get:project/releng/gecko/{}/level-{}/{}'
def add_cache(job, taskdesc, name, mount_point, skip_untrusted=False):
"""Adds a cache based on the worker's implementation.
Args:
job (dict): Task's job description.
taskdesc (dict): Target task description to modify.
name (str): Name of the cache.
mount_point (path): Path on the host to mount the cache.
skip_untrusted (bool): Whether cache is used in untrusted environments
(default: False). Only applies to docker-worker.
"""
worker = job['worker']
if worker['implementation'] in ('docker-worker', 'docker-engine'):
taskdesc['worker'].setdefault('caches', []).append({
'type': 'persistent',
'name': name,
'mount-point': mount_point,
'skip-untrusted': skip_untrusted,
})
else:
# Caches not implemented
pass
def docker_worker_add_workspace_cache(config, job, taskdesc, extra=None):
"""Add the workspace cache.
``extra`` is an optional kwarg passed in that supports extending the cache
key name to avoid undesired conflicts with other caches."""
taskdesc['worker'].setdefault('caches', []).append({
'type': 'persistent',
'name': 'level-{}-{}-build-{}-{}-workspace'.format(
config.params['level'], config.params['project'],
taskdesc['attributes']['build_platform'],
taskdesc['attributes']['build_type'],
),
'mount-point': "{workdir}/workspace".format(**job['run']),
# Don't enable the workspace cache when we can't guarantee its
# behavior, like on Try.
'skip-untrusted': True,
})
Args:
config (TransformConfig): Transform configuration object.
job (dict): Task's job description.
taskdesc (dict): Target task description to modify.
extra (str): Optional context passed in that supports extending the cache
key name to avoid undesired conflicts with other caches.
"""
cache_name = 'level-{}-{}-build-{}-{}-workspace'.format(
config.params['level'], config.params['project'],
taskdesc['attributes']['build_platform'],
taskdesc['attributes']['build_type'],
)
if extra:
taskdesc['worker']['caches'][-1]['name'] += '-{}'.format(
extra
)
cache_name = '{}-{}'.format(cache_name, extra)
mount_point = "{workdir}/workspace".format(**job['run'])
# Don't enable the workspace cache when we can't guarantee its
# behavior, like on Try.
add_cache(job, taskdesc, cache_name, mount_point, skip_untrusted=True)
def add_artifacts(config, job, taskdesc, path):
@ -83,27 +110,19 @@ def support_vcs_checkout(config, job, taskdesc, sparse=False):
geckodir = '{}/gecko'.format(checkoutdir)
hgstore = '{}/hg-store'.format(checkoutdir)
level = config.params['level']
# native-engine and generic-worker do not support caches (yet), so we just
# do a full clone every time :(
if worker['implementation'] in ('docker-worker', 'docker-engine'):
name = 'level-%s-checkouts' % level
cache_name = 'level-{}-checkouts'.format(config.params['level'])
# comm-central checkouts need their own cache, because clobber won't
# remove the comm-central checkout
if job['run'].get('comm-checkout', False):
name += '-comm'
# comm-central checkouts need their own cache, because clobber won't
# remove the comm-central checkout
if job['run'].get('comm-checkout', False):
cache_name += '-comm'
# Sparse checkouts need their own cache because they can interfere
# with clients that aren't sparse aware.
if sparse:
name += '-sparse'
# Sparse checkouts need their own cache because they can interfere
# with clients that aren't sparse aware.
if sparse:
cache_name += '-sparse'
taskdesc['worker'].setdefault('caches', []).append({
'type': 'persistent',
'name': name,
'mount-point': checkoutdir,
})
add_cache(job, taskdesc, cache_name, checkoutdir)
taskdesc['worker'].setdefault('env', {}).update({
'GECKO_BASE_REPOSITORY': config.params['base_repository'],
@ -190,12 +209,8 @@ def docker_worker_add_tooltool(config, job, taskdesc, internal=False):
assert job['worker']['implementation'] in ('docker-worker', 'docker-engine')
level = config.params['level']
taskdesc['worker'].setdefault('caches', []).append({
'type': 'persistent',
'name': 'level-%s-tooltool-cache' % level,
'mount-point': '{workdir}/tooltool-cache'.format(**job['run']),
})
add_cache(job, taskdesc, 'level-{}-tooltool-cache'.format(level),
'{workdir}/tooltool-cache'.format(**job['run']))
taskdesc['worker'].setdefault('env', {}).update({
'TOOLTOOL_CACHE': '{workdir}/tooltool-cache'.format(**job['run']),