diff --git a/taskcluster/taskgraph/optimize/__init__.py b/taskcluster/taskgraph/optimize/__init__.py index 15caf860b6f5..30cf8aaafb37 100644 --- a/taskcluster/taskgraph/optimize/__init__.py +++ b/taskcluster/taskgraph/optimize/__init__.py @@ -14,10 +14,8 @@ See ``taskcluster/docs/optimization.rst`` for more information. from __future__ import absolute_import, print_function, unicode_literals import logging -from abc import ABCMeta, abstractmethod, abstractproperty from collections import defaultdict -import six from slugid import nice as slugid from taskgraph.graph import Graph @@ -33,8 +31,6 @@ def register_strategy(name, args=()): def wrap(cls): if name not in registry: registry[name] = cls(*args) - if not hasattr(registry[name], 'description'): - registry[name].description = name return cls return wrap @@ -259,95 +255,44 @@ class OptimizationStrategy(object): return False -@six.add_metaclass(ABCMeta) -class CompositeStrategy(OptimizationStrategy): - +class Either(OptimizationStrategy): + """Given one or more optimization strategies, remove a task if any of them + says to, and replace with a task if any finds a replacement (preferring the + earliest). By default, each substrategy gets the same arg, but split_args + can return a list of args for each strategy, if desired.""" def __init__(self, *substrategies, **kwargs): - self.substrategies = [] - missing = set() - for sub in substrategies: - if isinstance(sub, six.text_type): - if sub not in registry.keys(): - missing.add(sub) - continue - sub = registry[sub] - - self.substrategies.append(sub) - + missing = set(substrategies) - set(registry.keys()) if missing: raise TypeError("substrategies aren't registered: {}".format( ", ".join(sorted(missing)))) + self.description = "-or-".join(substrategies) + self.substrategies = [registry[sub] for sub in substrategies] self.split_args = kwargs.pop('split_args', None) if not self.split_args: self.split_args = lambda arg: [arg] * len(substrategies) if kwargs: raise TypeError("unexpected keyword args") - @abstractproperty - def description(self): - """A textual description of the combined substrategies.""" - pass - - @abstractmethod - def reduce(self, results): - """Given all substrategy results as a generator, return the overall - result.""" - pass - - def _generate_results(self, fname, task, params, arg): + def _for_substrategies(self, arg, fn): for sub, arg in zip(self.substrategies, self.split_args(arg)): - yield getattr(sub, fname)(task, params, arg) - - def should_remove_task(self, *args): - results = self._generate_results('should_remove_task', *args) - return self.reduce(results) - - def should_replace_task(self, *args): - results = self._generate_results('should_replace_task', *args) - return self.reduce(results) - - -class Any(CompositeStrategy): - """Given one or more optimization strategies, remove or replace a task if any of them - says to. - - Replacement will use the value returned by the first strategy that says to replace. - """ - - @property - def description(self): - return "-or-".join([s.description for s in self.substrategies]) - - @classmethod - def reduce(cls, results): - for rv in results: + rv = fn(sub, arg) if rv: return rv return False + def should_remove_task(self, task, params, arg): + return self._for_substrategies( + arg, + lambda sub, arg: sub.should_remove_task(task, params, arg)) -class All(CompositeStrategy): - """Given one or more optimization strategies, remove or replace a task if all of them - says to. - - Replacement will use the value returned by the first strategy passed in. - Note the values used for replacement need not be the same, as long as they - all say to replace. - """ - @property - def description(self): - return "-and-".join([s.description for s in self.substrategies]) - - @classmethod - def reduce(cls, results): - rvs = list(results) - if all(rvs): - return rvs[0] - return False + def should_replace_task(self, task, params, arg): + return self._for_substrategies( + arg, + lambda sub, arg: sub.should_replace_task(task, params, arg)) -class Alias(CompositeStrategy): +class Alias(Either): """Provides an alias to an existing strategy. This can be useful to swap strategies in and out without needing to modify @@ -356,23 +301,16 @@ class Alias(CompositeStrategy): def __init__(self, strategy): super(Alias, self).__init__(strategy) - @property - def description(self): - return self.substrategies[0].description - - def reduce(self, results): - return next(results) - # Trigger registration in sibling modules. import_sibling_modules() # Register composite strategies. -register_strategy('test', args=(Any('skip-unless-schedules', 'seta'), 'backstop'))(All) +register_strategy('test', args=('skip-unless-schedules', 'seta'))(Either) register_strategy('test-inclusive', args=('skip-unless-schedules',))(Alias) register_strategy('test-try', args=('skip-unless-schedules',))(Alias) -register_strategy('fuzzing-builds', args=('skip-unless-schedules', 'seta'))(Any) +register_strategy('fuzzing-builds', args=('skip-unless-schedules', 'seta'))(Either) class experimental(object): @@ -385,12 +323,12 @@ class experimental(object): """ relevant_tests = { - 'test': Any('skip-unless-schedules', 'skip-unless-has-relevant-tests'), + 'test': Either('skip-unless-schedules', 'skip-unless-has-relevant-tests'), } """Runs task containing tests in the same directories as modified files.""" seta = { - 'test': Any('skip-unless-schedules', 'seta'), + 'test': Either('skip-unless-schedules', 'seta'), } """Provides a stable history of SETA's performance in the event we make it non-default in the future. Only useful as a benchmark.""" @@ -400,22 +338,22 @@ class experimental(object): learning to determine which tasks to run.""" all = { - 'test': Any('skip-unless-schedules', 'bugbug-all'), + 'test': Either('skip-unless-schedules', 'bugbug-all'), } """Doesn't limit platforms, medium confidence threshold.""" all_low = { - 'test': Any('skip-unless-schedules', 'bugbug-all-low'), + 'test': Either('skip-unless-schedules', 'bugbug-all-low'), } """Doesn't limit platforms, low confidence threshold.""" all_high = { - 'test': Any('skip-unless-schedules', 'bugbug-all-high'), + 'test': Either('skip-unless-schedules', 'bugbug-all-high'), } """Doesn't limit platforms, high confidence threshold.""" debug = { - 'test': Any('skip-unless-schedules', 'bugbug-debug'), + 'test': Either('skip-unless-schedules', 'bugbug-debug'), } """Restricts tests to debug platforms.""" diff --git a/taskcluster/taskgraph/optimize/backstop.py b/taskcluster/taskgraph/optimize/backstop.py deleted file mode 100644 index 96d08d7f711b..000000000000 --- a/taskcluster/taskgraph/optimize/backstop.py +++ /dev/null @@ -1,128 +0,0 @@ -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. - -from __future__ import absolute_import, print_function, unicode_literals - -import logging -from collections import defaultdict - -import requests -from redo import retry - -from taskgraph.optimize import OptimizationStrategy, register_strategy - -logger = logging.getLogger(__name__) -PUSH_ENDPOINT = "{head_repository}/json-pushes/?startID={push_id_start}&endID={push_id_end}" - - -@register_strategy('backstop', args=(10, 60)) -class Backstop(OptimizationStrategy): - """Ensures that no task gets left behind. - - Will schedule all tasks either every Nth push, or M minutes. - - Args: - push_interval (int): Number of pushes - """ - def __init__(self, push_interval, time_interval): - self.push_interval = push_interval - self.time_interval = time_interval - - # cached push dates by project - self.push_dates = defaultdict(dict) - # cached push_ids that failed to retrieve datetime for - self.failed_json_push_calls = [] - - def should_remove_task(self, task, params, _): - project = params['project'] - pushid = int(params['pushlog_id']) - pushdate = int(params['pushdate']) - - # Only enable the backstop on autoland since we always want the *real* - # optimized tasks on try and release branches. - if project != 'autoland': - return True - - # On every Nth push, want to run all tasks. - if pushid % self.push_interval == 0: - return False - - # We also want to ensure we run all tasks at least once per N minutes. - if self.minutes_between_pushes( - params["head_repository"], - project, - pushid, - pushdate) >= self.time_interval: - return False - return True - - def minutes_between_pushes(self, repository, project, cur_push_id, cur_push_date): - # figure out the minutes that have elapsed between the current push and previous one - # defaulting to max min so if we can't get value, defaults to run the task - min_between_pushes = self.time_interval - prev_push_id = cur_push_id - 1 - - # cache the pushdate for the current push so we can use it next time - self.push_dates[project].update({cur_push_id: cur_push_date}) - - # check if we already have the previous push id's datetime cached - prev_push_date = self.push_dates[project].get(prev_push_id, 0) - - # we have datetime of current and previous push, so return elapsed minutes and bail - if cur_push_date > 0 and prev_push_date > 0: - return (cur_push_date - prev_push_date) / 60 - - # datetime for previous pushid not cached, so must retrieve it - # if we already tried to retrieve the datetime for this pushid - # before and the json-push request failed, don't try it again - if prev_push_id in self.failed_json_push_calls: - return min_between_pushes - - url = PUSH_ENDPOINT.format( - head_repository=repository, - push_id_start=prev_push_id - 1, - push_id_end=prev_push_id, - ) - - try: - response = retry(requests.get, attempts=2, sleeptime=10, - args=(url, ), - kwargs={'timeout': 60, 'headers': {'User-Agent': 'TaskCluster'}}) - prev_push_date = response.json().get(str(prev_push_id), {}).get('date', 0) - - # cache it for next time - self.push_dates[project].update({prev_push_id: prev_push_date}) - - # now have datetime of current and previous push - if cur_push_date > 0 and prev_push_date > 0: - min_between_pushes = (cur_push_date - prev_push_date) / 60 - - # In the event of request times out, requests will raise a TimeoutError. - except requests.exceptions.Timeout: - logger.warning("json-pushes timeout, enabling backstop") - self.failed_json_push_calls.append(prev_push_id) - - # In the event of a network problem (e.g. DNS failure, refused connection, etc), - # requests will raise a ConnectionError. - except requests.exceptions.ConnectionError: - logger.warning("json-pushes connection error, enabling backstop") - self.failed_json_push_calls.append(prev_push_id) - - # In the event of the rare invalid HTTP response(e.g 404, 401), - # requests will raise an HTTPError exception - except requests.exceptions.HTTPError: - logger.warning("Bad Http response, enabling backstop") - self.failed_json_push_calls.append(prev_push_id) - - # When we get invalid JSON (i.e. 500 error), it results in a ValueError (bug 1313426) - except ValueError as error: - logger.warning("Invalid JSON, possible server error: {}".format(error)) - self.failed_json_push_calls.append(prev_push_id) - - # We just print the error out as a debug message if we failed to catch the exception above - except requests.exceptions.RequestException as error: - logger.warning(error) - self.failed_json_push_calls.append(prev_push_id) - - return min_between_pushes diff --git a/taskcluster/taskgraph/optimize/seta.py b/taskcluster/taskgraph/optimize/seta.py index f1442d8b0b2f..01dce91a92cc 100644 --- a/taskcluster/taskgraph/optimize/seta.py +++ b/taskcluster/taskgraph/optimize/seta.py @@ -6,9 +6,10 @@ from __future__ import absolute_import, print_function, unicode_literals import json import logging +import requests +from collections import defaultdict import attr -import requests from redo import retry from requests import exceptions @@ -23,6 +24,7 @@ SETA_LOW_PRIORITY = 5 SETA_ENDPOINT = "https://treeherder.mozilla.org/api/project/%s/seta/" \ "job-priorities/?build_system_type=%s&priority=%s" +PUSH_ENDPOINT = "https://hg.mozilla.org/integration/%s/json-pushes/?startID=%d&endID=%d" @attr.s(frozen=True) @@ -35,6 +37,10 @@ class SETA(object): # cached low value tasks, by project low_value_tasks = attr.ib(factory=dict, init=False) low_value_bb_tasks = attr.ib(factory=dict, init=False) + # cached push dates by project + push_dates = attr.ib(factory=lambda: defaultdict(dict), init=False) + # cached push_ids that failed to retrieve datetime for + failed_json_push_calls = attr.ib(factory=list, init=False) def _get_task_string(self, task_tuple): # convert task tuple to single task string, so the task label sent in can match @@ -156,13 +162,96 @@ class SETA(object): return low_value_tasks - def is_low_value_task(self, label, project): + def minutes_between_pushes(self, project, cur_push_id, cur_push_date, time_interval): + # figure out the minutes that have elapsed between the current push and previous one + # defaulting to max min so if we can't get value, defaults to run the task + min_between_pushes = time_interval + prev_push_id = cur_push_id - 1 + + # cache the pushdate for the current push so we can use it next time + self.push_dates[project].update({cur_push_id: cur_push_date}) + + # check if we already have the previous push id's datetime cached + prev_push_date = self.push_dates[project].get(prev_push_id, 0) + + # we have datetime of current and previous push, so return elapsed minutes and bail + if cur_push_date > 0 and prev_push_date > 0: + return (cur_push_date - prev_push_date) / 60 + + # datetime for previous pushid not cached, so must retrieve it + # if we already tried to retrieve the datetime for this pushid + # before and the json-push request failed, don't try it again + if prev_push_id in self.failed_json_push_calls: + return min_between_pushes + + url = PUSH_ENDPOINT % (project, cur_push_id - 2, prev_push_id) + + try: + response = retry(requests.get, attempts=2, sleeptime=10, + args=(url, ), + kwargs={'timeout': 60, 'headers': {'User-Agent': 'TaskCluster'}}) + prev_push_date = json.loads(response.content).get(str(prev_push_id), {}).get('date', 0) + + # cache it for next time + self.push_dates[project].update({prev_push_id: prev_push_date}) + + # now have datetime of current and previous push + if cur_push_date > 0 and prev_push_date > 0: + min_between_pushes = (cur_push_date - prev_push_date) / 60 + + # In the event of request times out, requests will raise a TimeoutError. + except exceptions.Timeout: + logger.warning("json-pushes timeout, treating task as high value") + self.failed_json_push_calls.append(prev_push_id) + + # In the event of a network problem (e.g. DNS failure, refused connection, etc), + # requests will raise a ConnectionError. + except exceptions.ConnectionError: + logger.warning("json-pushes connection error, treating task as high value") + self.failed_json_push_calls.append(prev_push_id) + + # In the event of the rare invalid HTTP response(e.g 404, 401), + # requests will raise an HTTPError exception + except exceptions.HTTPError: + logger.warning("Bad Http response, treating task as high value") + self.failed_json_push_calls.append(prev_push_id) + + # When we get invalid JSON (i.e. 500 error), it results in a ValueError (bug 1313426) + except ValueError as error: + logger.warning("Invalid JSON, possible server error: {}".format(error)) + self.failed_json_push_calls.append(prev_push_id) + + # We just print the error out as a debug message if we failed to catch the exception above + except exceptions.RequestException as error: + logger.warning(error) + self.failed_json_push_calls.append(prev_push_id) + + return min_between_pushes + + def is_low_value_task(self, label, project, pushlog_id, push_date, + push_interval, time_interval): # marking a task as low_value means it will be optimized out by tc if project not in SETA_PROJECTS: return False - # The SETA service has a superficial check preventing try, so spoof autoland - project = 'autoland' + # Disable the "run all tasks" feature if we're on try (e.g pushed via `mach try auto`) + if project != 'try': + # on every Nth push, want to run all tasks + if int(pushlog_id) % push_interval == 0: + return False + + # Nth push, so time to call seta based on number of pushes; however + # we also want to ensure we run all tasks at least once per N minutes + if self.minutes_between_pushes( + project, + int(pushlog_id), + int(push_date), + time_interval) >= time_interval: + return False + + else: + # The SETA service has a superficial check preventing try, so spoof autoland + project = 'autoland' # cache the low value tasks per project to avoid repeated SETA server queries if project not in self.low_value_tasks: @@ -175,9 +264,17 @@ class SETA(object): is_low_value_task = SETA().is_low_value_task -@register_strategy('seta') +@register_strategy('seta', args=(10, 60)) class SkipLowValue(OptimizationStrategy): + def __init__(self, push_interval, time_interval): + self.push_interval = push_interval + self.time_interval = time_interval + def should_remove_task(self, task, params, _): # Return True to optimize a low value task. - return is_low_value_task(task.label, params['project']) + return is_low_value_task(task.label, params.get('project'), + params.get('pushlog_id'), + params.get('pushdate'), + self.push_interval, + self.time_interval) diff --git a/taskcluster/taskgraph/test/test_optimize.py b/taskcluster/taskgraph/test/test_optimize.py index 2bf9f668516d..e32dc3dc4997 100644 --- a/taskcluster/taskgraph/test/test_optimize.py +++ b/taskcluster/taskgraph/test/test_optimize.py @@ -6,20 +6,14 @@ from __future__ import absolute_import, print_function, unicode_literals import unittest -import pytest from taskgraph import graph, optimize -from taskgraph.optimize import OptimizationStrategy, All, Any +from taskgraph.optimize import OptimizationStrategy from taskgraph.taskgraph import TaskGraph from taskgraph.task import Task from mozunit import main from slugid import nice as slugid -@pytest.fixture -def set_monkeypatch(request, monkeypatch): - request.cls.monkeypatch = monkeypatch - - class Remove(OptimizationStrategy): def should_remove_task(self, task, params, arg): @@ -32,7 +26,6 @@ class Replace(OptimizationStrategy): return taskid -@pytest.mark.usefixtures("set_monkeypatch") class TestOptimize(unittest.TestCase): strategies = { @@ -76,11 +69,11 @@ class TestOptimize(unittest.TestCase): ('t3', 't1', 'dep2'), ('t2', 't1', 'dep')) - def assert_remove_tasks(self, graph, exp_removed, do_not_optimize=set(), strategies=None): - strategies = strategies or self.strategies + def assert_remove_tasks(self, graph, exp_removed, do_not_optimize=set()): + optimize.registry = self.strategies got_removed = optimize.remove_tasks( target_task_graph=graph, - optimizations=optimize._get_optimizations(graph, strategies), + optimizations=optimize._get_optimizations(graph, self.strategies), params={}, do_not_optimize=do_not_optimize) self.assertEqual(got_removed, exp_removed) @@ -98,29 +91,6 @@ class TestOptimize(unittest.TestCase): t3={'remove': None}) self.assert_remove_tasks(graph, {'t1', 't2', 't3'}) - def test_composite_strategies_any(self): - self.monkeypatch.setattr(optimize, 'registry', self.strategies) - strategies = self.strategies.copy() - strategies['any'] = Any('never', 'remove') - - graph = self.make_triangle( - t1={'any': None}, - t2={'any': None}, - t3={'any': None}) - - self.assert_remove_tasks(graph, {'t1', 't2', 't3'}, strategies=strategies) - - def test_composite_strategies_all(self): - self.monkeypatch.setattr(optimize, 'registry', self.strategies) - strategies = self.strategies.copy() - strategies['all'] = All('never', 'remove') - - graph = self.make_triangle( - t1={'all': None}, - t2={'all': None}, - t3={'all': None}) - self.assert_remove_tasks(graph, set(), strategies=strategies) - def test_remove_tasks_blocked(self): "Removable tasks that are depended on by non-removable tasks are not removed" graph = self.make_triangle( diff --git a/taskcluster/taskgraph/test/test_optimize_strategies.py b/taskcluster/taskgraph/test/test_optimize_strategies.py index c57450744164..4199a5f92310 100644 --- a/taskcluster/taskgraph/test/test_optimize_strategies.py +++ b/taskcluster/taskgraph/test/test_optimize_strategies.py @@ -6,11 +6,8 @@ from __future__ import absolute_import import time import pytest -from datetime import datetime from mozunit import main -from time import mktime -from taskgraph.optimize.backstop import Backstop from taskgraph.optimize.bugbug import BugBugPushSchedules, BugbugTimeoutException, platform from taskgraph.task import Task @@ -18,12 +15,9 @@ from taskgraph.task import Task @pytest.fixture(scope='module') def params(): return { - 'branch': 'integration/autoland', 'head_repository': 'https://hg.mozilla.org/integration/autoland', 'head_rev': 'abcdef', - 'project': 'autoland', - 'pushlog_id': 1, - 'pushdate': mktime(datetime.now().timetuple()), + 'branch': 'integration/autoland', } @@ -157,32 +151,5 @@ def test_bugbug_timeout(monkeypatch, responses, params, tasks): opt.should_remove_task(tasks[0], params, None) -def test_backstop(params, tasks): - all_labels = {t.label for t in tasks} - opt = Backstop(10, 60) # every 10th push or 1 hour - - # If there's no previous push date, run tasks - params['pushlog_id'] = 8 - scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)} - assert scheduled == all_labels - - # Only multiples of 10 schedule tasks. Pushdate from push 8 was cached. - params['pushlog_id'] = 9 - params['pushdate'] += 3599 - scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)} - assert scheduled == set() - - params['pushlog_id'] = 10 - params['pushdate'] += 1 - scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)} - assert scheduled == all_labels - - # Tasks are also scheduled if an hour has passed. - params['pushlog_id'] = 11 - params['pushdate'] += 3600 - scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)} - assert scheduled == all_labels - - if __name__ == '__main__': main()