Backed out 3 changesets (bug 1625200) for breaking gecko decision task on a CLOSED TREE

Backed out changeset efffde76e782 (bug 1625200)
Backed out changeset 23bfe65864c1 (bug 1625200)
Backed out changeset 7d1a3690be7d (bug 1625200)

--HG--
extra : rebase_source : cbca1f001730c0973dbb3a2ee335a93301797c33
This commit is contained in:
Andreea Pavel 2020-04-15 18:49:02 +03:00
Родитель 8b5729e497
Коммит f7dc8eb0a5
5 изменённых файлов: 135 добавлений и 291 удалений

Просмотреть файл

@ -14,10 +14,8 @@ See ``taskcluster/docs/optimization.rst`` for more information.
from __future__ import absolute_import, print_function, unicode_literals from __future__ import absolute_import, print_function, unicode_literals
import logging import logging
from abc import ABCMeta, abstractmethod, abstractproperty
from collections import defaultdict from collections import defaultdict
import six
from slugid import nice as slugid from slugid import nice as slugid
from taskgraph.graph import Graph from taskgraph.graph import Graph
@ -33,8 +31,6 @@ def register_strategy(name, args=()):
def wrap(cls): def wrap(cls):
if name not in registry: if name not in registry:
registry[name] = cls(*args) registry[name] = cls(*args)
if not hasattr(registry[name], 'description'):
registry[name].description = name
return cls return cls
return wrap return wrap
@ -259,95 +255,44 @@ class OptimizationStrategy(object):
return False return False
@six.add_metaclass(ABCMeta) class Either(OptimizationStrategy):
class CompositeStrategy(OptimizationStrategy): """Given one or more optimization strategies, remove a task if any of them
says to, and replace with a task if any finds a replacement (preferring the
earliest). By default, each substrategy gets the same arg, but split_args
can return a list of args for each strategy, if desired."""
def __init__(self, *substrategies, **kwargs): def __init__(self, *substrategies, **kwargs):
self.substrategies = [] missing = set(substrategies) - set(registry.keys())
missing = set()
for sub in substrategies:
if isinstance(sub, six.text_type):
if sub not in registry.keys():
missing.add(sub)
continue
sub = registry[sub]
self.substrategies.append(sub)
if missing: if missing:
raise TypeError("substrategies aren't registered: {}".format( raise TypeError("substrategies aren't registered: {}".format(
", ".join(sorted(missing)))) ", ".join(sorted(missing))))
self.description = "-or-".join(substrategies)
self.substrategies = [registry[sub] for sub in substrategies]
self.split_args = kwargs.pop('split_args', None) self.split_args = kwargs.pop('split_args', None)
if not self.split_args: if not self.split_args:
self.split_args = lambda arg: [arg] * len(substrategies) self.split_args = lambda arg: [arg] * len(substrategies)
if kwargs: if kwargs:
raise TypeError("unexpected keyword args") raise TypeError("unexpected keyword args")
@abstractproperty def _for_substrategies(self, arg, fn):
def description(self):
"""A textual description of the combined substrategies."""
pass
@abstractmethod
def reduce(self, results):
"""Given all substrategy results as a generator, return the overall
result."""
pass
def _generate_results(self, fname, task, params, arg):
for sub, arg in zip(self.substrategies, self.split_args(arg)): for sub, arg in zip(self.substrategies, self.split_args(arg)):
yield getattr(sub, fname)(task, params, arg) rv = fn(sub, arg)
def should_remove_task(self, *args):
results = self._generate_results('should_remove_task', *args)
return self.reduce(results)
def should_replace_task(self, *args):
results = self._generate_results('should_replace_task', *args)
return self.reduce(results)
class Any(CompositeStrategy):
"""Given one or more optimization strategies, remove or replace a task if any of them
says to.
Replacement will use the value returned by the first strategy that says to replace.
"""
@property
def description(self):
return "-or-".join([s.description for s in self.substrategies])
@classmethod
def reduce(cls, results):
for rv in results:
if rv: if rv:
return rv return rv
return False return False
def should_remove_task(self, task, params, arg):
return self._for_substrategies(
arg,
lambda sub, arg: sub.should_remove_task(task, params, arg))
class All(CompositeStrategy): def should_replace_task(self, task, params, arg):
"""Given one or more optimization strategies, remove or replace a task if all of them return self._for_substrategies(
says to. arg,
lambda sub, arg: sub.should_replace_task(task, params, arg))
Replacement will use the value returned by the first strategy passed in.
Note the values used for replacement need not be the same, as long as they
all say to replace.
"""
@property
def description(self):
return "-and-".join([s.description for s in self.substrategies])
@classmethod
def reduce(cls, results):
rvs = list(results)
if all(rvs):
return rvs[0]
return False
class Alias(CompositeStrategy): class Alias(Either):
"""Provides an alias to an existing strategy. """Provides an alias to an existing strategy.
This can be useful to swap strategies in and out without needing to modify This can be useful to swap strategies in and out without needing to modify
@ -356,23 +301,16 @@ class Alias(CompositeStrategy):
def __init__(self, strategy): def __init__(self, strategy):
super(Alias, self).__init__(strategy) super(Alias, self).__init__(strategy)
@property
def description(self):
return self.substrategies[0].description
def reduce(self, results):
return next(results)
# Trigger registration in sibling modules. # Trigger registration in sibling modules.
import_sibling_modules() import_sibling_modules()
# Register composite strategies. # Register composite strategies.
register_strategy('test', args=(Any('skip-unless-schedules', 'seta'), 'backstop'))(All) register_strategy('test', args=('skip-unless-schedules', 'seta'))(Either)
register_strategy('test-inclusive', args=('skip-unless-schedules',))(Alias) register_strategy('test-inclusive', args=('skip-unless-schedules',))(Alias)
register_strategy('test-try', args=('skip-unless-schedules',))(Alias) register_strategy('test-try', args=('skip-unless-schedules',))(Alias)
register_strategy('fuzzing-builds', args=('skip-unless-schedules', 'seta'))(Any) register_strategy('fuzzing-builds', args=('skip-unless-schedules', 'seta'))(Either)
class experimental(object): class experimental(object):
@ -385,12 +323,12 @@ class experimental(object):
""" """
relevant_tests = { relevant_tests = {
'test': Any('skip-unless-schedules', 'skip-unless-has-relevant-tests'), 'test': Either('skip-unless-schedules', 'skip-unless-has-relevant-tests'),
} }
"""Runs task containing tests in the same directories as modified files.""" """Runs task containing tests in the same directories as modified files."""
seta = { seta = {
'test': Any('skip-unless-schedules', 'seta'), 'test': Either('skip-unless-schedules', 'seta'),
} }
"""Provides a stable history of SETA's performance in the event we make it """Provides a stable history of SETA's performance in the event we make it
non-default in the future. Only useful as a benchmark.""" non-default in the future. Only useful as a benchmark."""
@ -400,22 +338,22 @@ class experimental(object):
learning to determine which tasks to run.""" learning to determine which tasks to run."""
all = { all = {
'test': Any('skip-unless-schedules', 'bugbug-all'), 'test': Either('skip-unless-schedules', 'bugbug-all'),
} }
"""Doesn't limit platforms, medium confidence threshold.""" """Doesn't limit platforms, medium confidence threshold."""
all_low = { all_low = {
'test': Any('skip-unless-schedules', 'bugbug-all-low'), 'test': Either('skip-unless-schedules', 'bugbug-all-low'),
} }
"""Doesn't limit platforms, low confidence threshold.""" """Doesn't limit platforms, low confidence threshold."""
all_high = { all_high = {
'test': Any('skip-unless-schedules', 'bugbug-all-high'), 'test': Either('skip-unless-schedules', 'bugbug-all-high'),
} }
"""Doesn't limit platforms, high confidence threshold.""" """Doesn't limit platforms, high confidence threshold."""
debug = { debug = {
'test': Any('skip-unless-schedules', 'bugbug-debug'), 'test': Either('skip-unless-schedules', 'bugbug-debug'),
} }
"""Restricts tests to debug platforms.""" """Restricts tests to debug platforms."""

Просмотреть файл

@ -1,128 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, print_function, unicode_literals
import logging
from collections import defaultdict
import requests
from redo import retry
from taskgraph.optimize import OptimizationStrategy, register_strategy
logger = logging.getLogger(__name__)
PUSH_ENDPOINT = "{head_repository}/json-pushes/?startID={push_id_start}&endID={push_id_end}"
@register_strategy('backstop', args=(10, 60))
class Backstop(OptimizationStrategy):
"""Ensures that no task gets left behind.
Will schedule all tasks either every Nth push, or M minutes.
Args:
push_interval (int): Number of pushes
"""
def __init__(self, push_interval, time_interval):
self.push_interval = push_interval
self.time_interval = time_interval
# cached push dates by project
self.push_dates = defaultdict(dict)
# cached push_ids that failed to retrieve datetime for
self.failed_json_push_calls = []
def should_remove_task(self, task, params, _):
project = params['project']
pushid = int(params['pushlog_id'])
pushdate = int(params['pushdate'])
# Only enable the backstop on autoland since we always want the *real*
# optimized tasks on try and release branches.
if project != 'autoland':
return True
# On every Nth push, want to run all tasks.
if pushid % self.push_interval == 0:
return False
# We also want to ensure we run all tasks at least once per N minutes.
if self.minutes_between_pushes(
params["head_repository"],
project,
pushid,
pushdate) >= self.time_interval:
return False
return True
def minutes_between_pushes(self, repository, project, cur_push_id, cur_push_date):
# figure out the minutes that have elapsed between the current push and previous one
# defaulting to max min so if we can't get value, defaults to run the task
min_between_pushes = self.time_interval
prev_push_id = cur_push_id - 1
# cache the pushdate for the current push so we can use it next time
self.push_dates[project].update({cur_push_id: cur_push_date})
# check if we already have the previous push id's datetime cached
prev_push_date = self.push_dates[project].get(prev_push_id, 0)
# we have datetime of current and previous push, so return elapsed minutes and bail
if cur_push_date > 0 and prev_push_date > 0:
return (cur_push_date - prev_push_date) / 60
# datetime for previous pushid not cached, so must retrieve it
# if we already tried to retrieve the datetime for this pushid
# before and the json-push request failed, don't try it again
if prev_push_id in self.failed_json_push_calls:
return min_between_pushes
url = PUSH_ENDPOINT.format(
head_repository=repository,
push_id_start=prev_push_id - 1,
push_id_end=prev_push_id,
)
try:
response = retry(requests.get, attempts=2, sleeptime=10,
args=(url, ),
kwargs={'timeout': 60, 'headers': {'User-Agent': 'TaskCluster'}})
prev_push_date = response.json().get(str(prev_push_id), {}).get('date', 0)
# cache it for next time
self.push_dates[project].update({prev_push_id: prev_push_date})
# now have datetime of current and previous push
if cur_push_date > 0 and prev_push_date > 0:
min_between_pushes = (cur_push_date - prev_push_date) / 60
# In the event of request times out, requests will raise a TimeoutError.
except requests.exceptions.Timeout:
logger.warning("json-pushes timeout, enabling backstop")
self.failed_json_push_calls.append(prev_push_id)
# In the event of a network problem (e.g. DNS failure, refused connection, etc),
# requests will raise a ConnectionError.
except requests.exceptions.ConnectionError:
logger.warning("json-pushes connection error, enabling backstop")
self.failed_json_push_calls.append(prev_push_id)
# In the event of the rare invalid HTTP response(e.g 404, 401),
# requests will raise an HTTPError exception
except requests.exceptions.HTTPError:
logger.warning("Bad Http response, enabling backstop")
self.failed_json_push_calls.append(prev_push_id)
# When we get invalid JSON (i.e. 500 error), it results in a ValueError (bug 1313426)
except ValueError as error:
logger.warning("Invalid JSON, possible server error: {}".format(error))
self.failed_json_push_calls.append(prev_push_id)
# We just print the error out as a debug message if we failed to catch the exception above
except requests.exceptions.RequestException as error:
logger.warning(error)
self.failed_json_push_calls.append(prev_push_id)
return min_between_pushes

Просмотреть файл

@ -6,9 +6,10 @@ from __future__ import absolute_import, print_function, unicode_literals
import json import json
import logging import logging
import requests
from collections import defaultdict
import attr import attr
import requests
from redo import retry from redo import retry
from requests import exceptions from requests import exceptions
@ -23,6 +24,7 @@ SETA_LOW_PRIORITY = 5
SETA_ENDPOINT = "https://treeherder.mozilla.org/api/project/%s/seta/" \ SETA_ENDPOINT = "https://treeherder.mozilla.org/api/project/%s/seta/" \
"job-priorities/?build_system_type=%s&priority=%s" "job-priorities/?build_system_type=%s&priority=%s"
PUSH_ENDPOINT = "https://hg.mozilla.org/integration/%s/json-pushes/?startID=%d&endID=%d"
@attr.s(frozen=True) @attr.s(frozen=True)
@ -35,6 +37,10 @@ class SETA(object):
# cached low value tasks, by project # cached low value tasks, by project
low_value_tasks = attr.ib(factory=dict, init=False) low_value_tasks = attr.ib(factory=dict, init=False)
low_value_bb_tasks = attr.ib(factory=dict, init=False) low_value_bb_tasks = attr.ib(factory=dict, init=False)
# cached push dates by project
push_dates = attr.ib(factory=lambda: defaultdict(dict), init=False)
# cached push_ids that failed to retrieve datetime for
failed_json_push_calls = attr.ib(factory=list, init=False)
def _get_task_string(self, task_tuple): def _get_task_string(self, task_tuple):
# convert task tuple to single task string, so the task label sent in can match # convert task tuple to single task string, so the task label sent in can match
@ -156,13 +162,96 @@ class SETA(object):
return low_value_tasks return low_value_tasks
def is_low_value_task(self, label, project): def minutes_between_pushes(self, project, cur_push_id, cur_push_date, time_interval):
# figure out the minutes that have elapsed between the current push and previous one
# defaulting to max min so if we can't get value, defaults to run the task
min_between_pushes = time_interval
prev_push_id = cur_push_id - 1
# cache the pushdate for the current push so we can use it next time
self.push_dates[project].update({cur_push_id: cur_push_date})
# check if we already have the previous push id's datetime cached
prev_push_date = self.push_dates[project].get(prev_push_id, 0)
# we have datetime of current and previous push, so return elapsed minutes and bail
if cur_push_date > 0 and prev_push_date > 0:
return (cur_push_date - prev_push_date) / 60
# datetime for previous pushid not cached, so must retrieve it
# if we already tried to retrieve the datetime for this pushid
# before and the json-push request failed, don't try it again
if prev_push_id in self.failed_json_push_calls:
return min_between_pushes
url = PUSH_ENDPOINT % (project, cur_push_id - 2, prev_push_id)
try:
response = retry(requests.get, attempts=2, sleeptime=10,
args=(url, ),
kwargs={'timeout': 60, 'headers': {'User-Agent': 'TaskCluster'}})
prev_push_date = json.loads(response.content).get(str(prev_push_id), {}).get('date', 0)
# cache it for next time
self.push_dates[project].update({prev_push_id: prev_push_date})
# now have datetime of current and previous push
if cur_push_date > 0 and prev_push_date > 0:
min_between_pushes = (cur_push_date - prev_push_date) / 60
# In the event of request times out, requests will raise a TimeoutError.
except exceptions.Timeout:
logger.warning("json-pushes timeout, treating task as high value")
self.failed_json_push_calls.append(prev_push_id)
# In the event of a network problem (e.g. DNS failure, refused connection, etc),
# requests will raise a ConnectionError.
except exceptions.ConnectionError:
logger.warning("json-pushes connection error, treating task as high value")
self.failed_json_push_calls.append(prev_push_id)
# In the event of the rare invalid HTTP response(e.g 404, 401),
# requests will raise an HTTPError exception
except exceptions.HTTPError:
logger.warning("Bad Http response, treating task as high value")
self.failed_json_push_calls.append(prev_push_id)
# When we get invalid JSON (i.e. 500 error), it results in a ValueError (bug 1313426)
except ValueError as error:
logger.warning("Invalid JSON, possible server error: {}".format(error))
self.failed_json_push_calls.append(prev_push_id)
# We just print the error out as a debug message if we failed to catch the exception above
except exceptions.RequestException as error:
logger.warning(error)
self.failed_json_push_calls.append(prev_push_id)
return min_between_pushes
def is_low_value_task(self, label, project, pushlog_id, push_date,
push_interval, time_interval):
# marking a task as low_value means it will be optimized out by tc # marking a task as low_value means it will be optimized out by tc
if project not in SETA_PROJECTS: if project not in SETA_PROJECTS:
return False return False
# The SETA service has a superficial check preventing try, so spoof autoland # Disable the "run all tasks" feature if we're on try (e.g pushed via `mach try auto`)
project = 'autoland' if project != 'try':
# on every Nth push, want to run all tasks
if int(pushlog_id) % push_interval == 0:
return False
# Nth push, so time to call seta based on number of pushes; however
# we also want to ensure we run all tasks at least once per N minutes
if self.minutes_between_pushes(
project,
int(pushlog_id),
int(push_date),
time_interval) >= time_interval:
return False
else:
# The SETA service has a superficial check preventing try, so spoof autoland
project = 'autoland'
# cache the low value tasks per project to avoid repeated SETA server queries # cache the low value tasks per project to avoid repeated SETA server queries
if project not in self.low_value_tasks: if project not in self.low_value_tasks:
@ -175,9 +264,17 @@ class SETA(object):
is_low_value_task = SETA().is_low_value_task is_low_value_task = SETA().is_low_value_task
@register_strategy('seta') @register_strategy('seta', args=(10, 60))
class SkipLowValue(OptimizationStrategy): class SkipLowValue(OptimizationStrategy):
def __init__(self, push_interval, time_interval):
self.push_interval = push_interval
self.time_interval = time_interval
def should_remove_task(self, task, params, _): def should_remove_task(self, task, params, _):
# Return True to optimize a low value task. # Return True to optimize a low value task.
return is_low_value_task(task.label, params['project']) return is_low_value_task(task.label, params.get('project'),
params.get('pushlog_id'),
params.get('pushdate'),
self.push_interval,
self.time_interval)

Просмотреть файл

@ -6,20 +6,14 @@ from __future__ import absolute_import, print_function, unicode_literals
import unittest import unittest
import pytest
from taskgraph import graph, optimize from taskgraph import graph, optimize
from taskgraph.optimize import OptimizationStrategy, All, Any from taskgraph.optimize import OptimizationStrategy
from taskgraph.taskgraph import TaskGraph from taskgraph.taskgraph import TaskGraph
from taskgraph.task import Task from taskgraph.task import Task
from mozunit import main from mozunit import main
from slugid import nice as slugid from slugid import nice as slugid
@pytest.fixture
def set_monkeypatch(request, monkeypatch):
request.cls.monkeypatch = monkeypatch
class Remove(OptimizationStrategy): class Remove(OptimizationStrategy):
def should_remove_task(self, task, params, arg): def should_remove_task(self, task, params, arg):
@ -32,7 +26,6 @@ class Replace(OptimizationStrategy):
return taskid return taskid
@pytest.mark.usefixtures("set_monkeypatch")
class TestOptimize(unittest.TestCase): class TestOptimize(unittest.TestCase):
strategies = { strategies = {
@ -76,11 +69,11 @@ class TestOptimize(unittest.TestCase):
('t3', 't1', 'dep2'), ('t3', 't1', 'dep2'),
('t2', 't1', 'dep')) ('t2', 't1', 'dep'))
def assert_remove_tasks(self, graph, exp_removed, do_not_optimize=set(), strategies=None): def assert_remove_tasks(self, graph, exp_removed, do_not_optimize=set()):
strategies = strategies or self.strategies optimize.registry = self.strategies
got_removed = optimize.remove_tasks( got_removed = optimize.remove_tasks(
target_task_graph=graph, target_task_graph=graph,
optimizations=optimize._get_optimizations(graph, strategies), optimizations=optimize._get_optimizations(graph, self.strategies),
params={}, params={},
do_not_optimize=do_not_optimize) do_not_optimize=do_not_optimize)
self.assertEqual(got_removed, exp_removed) self.assertEqual(got_removed, exp_removed)
@ -98,29 +91,6 @@ class TestOptimize(unittest.TestCase):
t3={'remove': None}) t3={'remove': None})
self.assert_remove_tasks(graph, {'t1', 't2', 't3'}) self.assert_remove_tasks(graph, {'t1', 't2', 't3'})
def test_composite_strategies_any(self):
self.monkeypatch.setattr(optimize, 'registry', self.strategies)
strategies = self.strategies.copy()
strategies['any'] = Any('never', 'remove')
graph = self.make_triangle(
t1={'any': None},
t2={'any': None},
t3={'any': None})
self.assert_remove_tasks(graph, {'t1', 't2', 't3'}, strategies=strategies)
def test_composite_strategies_all(self):
self.monkeypatch.setattr(optimize, 'registry', self.strategies)
strategies = self.strategies.copy()
strategies['all'] = All('never', 'remove')
graph = self.make_triangle(
t1={'all': None},
t2={'all': None},
t3={'all': None})
self.assert_remove_tasks(graph, set(), strategies=strategies)
def test_remove_tasks_blocked(self): def test_remove_tasks_blocked(self):
"Removable tasks that are depended on by non-removable tasks are not removed" "Removable tasks that are depended on by non-removable tasks are not removed"
graph = self.make_triangle( graph = self.make_triangle(

Просмотреть файл

@ -6,11 +6,8 @@ from __future__ import absolute_import
import time import time
import pytest import pytest
from datetime import datetime
from mozunit import main from mozunit import main
from time import mktime
from taskgraph.optimize.backstop import Backstop
from taskgraph.optimize.bugbug import BugBugPushSchedules, BugbugTimeoutException, platform from taskgraph.optimize.bugbug import BugBugPushSchedules, BugbugTimeoutException, platform
from taskgraph.task import Task from taskgraph.task import Task
@ -18,12 +15,9 @@ from taskgraph.task import Task
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def params(): def params():
return { return {
'branch': 'integration/autoland',
'head_repository': 'https://hg.mozilla.org/integration/autoland', 'head_repository': 'https://hg.mozilla.org/integration/autoland',
'head_rev': 'abcdef', 'head_rev': 'abcdef',
'project': 'autoland', 'branch': 'integration/autoland',
'pushlog_id': 1,
'pushdate': mktime(datetime.now().timetuple()),
} }
@ -157,32 +151,5 @@ def test_bugbug_timeout(monkeypatch, responses, params, tasks):
opt.should_remove_task(tasks[0], params, None) opt.should_remove_task(tasks[0], params, None)
def test_backstop(params, tasks):
all_labels = {t.label for t in tasks}
opt = Backstop(10, 60) # every 10th push or 1 hour
# If there's no previous push date, run tasks
params['pushlog_id'] = 8
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == all_labels
# Only multiples of 10 schedule tasks. Pushdate from push 8 was cached.
params['pushlog_id'] = 9
params['pushdate'] += 3599
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == set()
params['pushlog_id'] = 10
params['pushdate'] += 1
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == all_labels
# Tasks are also scheduled if an hour has passed.
params['pushlog_id'] = 11
params['pushdate'] += 3600
scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
assert scheduled == all_labels
if __name__ == '__main__': if __name__ == '__main__':
main() main()