From ea8ee01686fa3119e24f1caa4a191a17c02fbbb3 Mon Sep 17 00:00:00 2001 From: Rob Wood Date: Thu, 26 Jan 2017 15:06:08 -0500 Subject: [PATCH] Bug 1333841 - Add seta timeout code to gecko decision task; r=jmaher MozReview-Commit-ID: IavIAHZ1zgQ --HG-- extra : rebase_source : e6ef642a76eabbd766a9465017f16a78ea5319e6 --- taskcluster/taskgraph/task/transform.py | 5 +- taskcluster/taskgraph/util/seta.py | 98 +++++++++++++++++++++++-- 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/taskcluster/taskgraph/task/transform.py b/taskcluster/taskgraph/task/transform.py index de5035f9a64b..68d423eb7a57 100644 --- a/taskcluster/taskgraph/task/transform.py +++ b/taskcluster/taskgraph/task/transform.py @@ -96,7 +96,10 @@ class TransformTask(base.Task): # we would like to return 'False, None' while it's high_value_task # and we wouldn't optimize it. Otherwise, it will return 'True, None' - if is_low_value_task(self.label, params.get('project'), params.get('pushlog_id')): + if is_low_value_task(self.label, + params.get('project'), + params.get('pushlog_id'), + params.get('pushdate')): # Always optimize away low-value tasks return True, None else: diff --git a/taskcluster/taskgraph/util/seta.py b/taskcluster/taskgraph/util/seta.py index 3b7f982752b7..e173c53f75b5 100644 --- a/taskcluster/taskgraph/util/seta.py +++ b/taskcluster/taskgraph/util/seta.py @@ -1,6 +1,7 @@ import json import logging import requests +from collections import defaultdict from redo import retry from requests import exceptions @@ -11,9 +12,11 @@ headers = { # It's a list of project name which SETA is useful on SETA_PROJECTS = ['mozilla-inbound', 'autoland'] -PROJECT_SCHEDULE_ALL_EVERY = {'mozilla-inbound': 5, 'autoland': 5} +PROJECT_SCHEDULE_ALL_EVERY_PUSHES = {'mozilla-inbound': 5, 'autoland': 5} +PROJECT_SCHEDULE_ALL_EVERY_MINUTES = {'mozilla-inbound': 60, 'autoland': 60} SETA_ENDPOINT = "https://seta.herokuapp.com/data/setadetails/?branch=%s" +PUSH_ENDPOINT = "https://hg.mozilla.org/integration/%s/json-pushes/?startID=%d&endID=%d" class SETA(object): @@ -24,14 +27,14 @@ class SETA(object): def __init__(self): # cached low value tasks, by project self.low_value_tasks = {} + # cached push dates by project + self.push_dates = defaultdict(dict) + # cached push_ids that failed to retrieve datetime for + self.failed_json_push_calls = [] def query_low_value_tasks(self, project): # Request the set of low value tasks from the SETA service. Low value tasks will be # optimized out of the task graph. - if project not in SETA_PROJECTS: - logger.debug("SETA is not enabled for project `{}`".format(project)) - return [] - logger.debug("Querying SETA service for low-value tasks on {}".format(project)) low_value_tasks = [] @@ -76,11 +79,92 @@ class SETA(object): return low_value_tasks - def is_low_value_task(self, label, project, pushlog_id): - schedule_all_every = PROJECT_SCHEDULE_ALL_EVERY.get(project, 5) + def minutes_between_pushes(self, project, cur_push_id, cur_push_date): + # figure out the minutes that have elapsed between the current push and previous one + # defaulting to max min so if we can't get value, defaults to run the task + min_between_pushes = PROJECT_SCHEDULE_ALL_EVERY_MINUTES.get(project, 60) + prev_push_id = cur_push_id - 1 + + # cache the pushdate for the current push so we can use it next time + self.push_dates[project].update({cur_push_id: cur_push_date}) + + # check if we already have the previous push id's datetime cached + prev_push_date = self.push_dates[project].get(prev_push_id, 0) + + # we have datetime of current and previous push, so return elapsed minutes and bail + if cur_push_date > 0 and prev_push_date > 0: + return (cur_push_date - prev_push_date) / 60 + + # datetime for previous pushid not cached, so must retrieve it + # if we already tried to retrieve the datetime for this pushid + # before and the json-push request failed, don't try it again + if prev_push_id in self.failed_json_push_calls: + return min_between_pushes + + url = PUSH_ENDPOINT % (project, cur_push_id - 2, prev_push_id) + + try: + logger.debug("Retrieving datetime of previous push") + response = retry(requests.get, attempts=2, sleeptime=10, + args=(url, ), + kwargs={'timeout': 5, 'headers': headers}) + prev_push_date = json.loads(response.content).get(str(prev_push_id), {}).get('date', 0) + + # cache it for next time + self.push_dates[project].update({prev_push_id: prev_push_date}) + + # now have datetime of current and previous push + if cur_push_date > 0 and prev_push_date > 0: + min_between_pushes = (cur_push_date - prev_push_date) / 60 + + # In the event of request times out, requests will raise a TimeoutError. + except exceptions.Timeout: + logger.warning("json-pushes timeout, treating task as high value") + self.failed_json_push_calls.append(prev_push_id) + + # In the event of a network problem (e.g. DNS failure, refused connection, etc), + # requests will raise a ConnectionError. + except exceptions.ConnectionError: + logger.warning("json-pushes connection error, treating task as high value") + self.failed_json_push_calls.append(prev_push_id) + + # In the event of the rare invalid HTTP response(e.g 404, 401), + # requests will raise an HTTPError exception + except exceptions.HTTPError: + logger.warning("Bad Http response, treating task as high value") + self.failed_json_push_calls.append(prev_push_id) + + # When we get invalid JSON (i.e. 500 error), it results in a ValueError (bug 1313426) + except ValueError as error: + logger.warning("Invalid JSON, possible server error: {}".format(error)) + self.failed_json_push_calls.append(prev_push_id) + + # We just print the error out as a debug message if we failed to catch the exception above + except exceptions.RequestException as error: + logger.warning(error) + self.failed_json_push_calls.append(prev_push_id) + + return min_between_pushes + + def is_low_value_task(self, label, project, pushlog_id, push_date): + # marking a task as low_value means it will be optimized out by tc + if project not in SETA_PROJECTS: + logger.debug("SETA is not enabled for project `{}`".format(project)) + return False + + schedule_all_every = PROJECT_SCHEDULE_ALL_EVERY_PUSHES.get(project, 5) # on every Nth push, want to run all tasks if int(pushlog_id) % schedule_all_every == 0: return False + + # Nth push, so time to call seta based on number of pushes; however + # we also want to ensure we run all tasks at least once per N minutes + if self.minutes_between_pushes( + project, + int(pushlog_id), + int(push_date)) >= PROJECT_SCHEDULE_ALL_EVERY_MINUTES.get(project, 60): + return False + # cache the low value tasks per project to avoid repeated SETA server queries if project not in self.low_value_tasks: self.low_value_tasks[project] = self.query_low_value_tasks(project)