Modify scheduler_analysis recipe to get data from shadow_schedulers before attempting to generate locally

This commit is contained in:
Andrew Halberstadt 2019-08-29 16:51:15 -04:00
Родитель 311cbbcf43
Коммит 788ea3dfbd
3 изменённых файлов: 119 добавлений и 45 удалений

Просмотреть файл

@ -10,6 +10,8 @@ from adr.util.memoize import memoize, memoized_property
from loguru import logger
HGMO_JSON_URL = "https://hg.mozilla.org/integration/{branch}/rev/{rev}?style=json"
TASKGRAPH_ARTIFACT_URL = "https://index.taskcluster.net/v1/task/gecko.v2.autoland.revision.{rev}.taskgraph.decision/artifacts/public/{artifact}"
SHADOW_SCHEDULER_ARTIFACT_URL = "https://tools.taskcluster.net/index/gecko.v2.try.revision.{rev}.source/shadow-scheduler-{name}/artifacts/public/shadow-scheduler/optimzied_tasks.list"
class Status(Enum):
@ -179,7 +181,7 @@ class Push:
set: A set of task labels (str).
"""
tasks = self._get_decision_artifact('task-graph.json').values()
return set([t['label'] for t in tasks])
return {t['label'] for t in tasks}
@property
def unscheduled_task_labels(self):
@ -320,14 +322,35 @@ class Push:
"""
return set([label for label, count in self.regressions.items() if count == 0])
@memoized_property
def _decision_artifact_urls(self):
"""All artifact urls from the Decision task of this push.
@memoize
def get_shadow_scheduler_tasks(self, name):
"""Returns all tasks the given shadow scheduler would have scheduler,
or None if the given scheduler didn't run.
Args:
name (str): The name of the shadow scheduler to query.
Returns:
list: All task labels that would have been scheduler or None.
"""
if name not in self._shadow_scheduler_artifacts:
return None
r = requests.get(self._shadow_scheduler_artifacts[name])
tasks = r.text
return set(tasks.splitlines())
@memoize
def _get_artifact_urls_from_label(self, label):
"""All artifact urls from any task whose label contains ``label``.
Args:
label (str): Substring to filter task labels by.
Returns:
list: A list of urls.
"""
return run_query('decision_artifacts', Namespace(rev=self.rev))['data']
return run_query('label_artifacts', Namespace(rev=self.rev, label=label))['data']
@memoize
def _get_decision_artifact(self, name):
@ -339,12 +362,31 @@ class Push:
Returns:
dict: JSON representation of the artifact.
"""
for decision in self._decision_artifact_urls:
for url in decision['artifacts']:
if url.rsplit('/', 1)[1] == name:
return requests.get(url).json()
logger.warning(f"No decision task with artifact {name} on {self.rev}.")
return []
url = TASKGRAPH_ARTIFACT_URL.format(rev=self.rev, artifact=name)
r = requests.get(url)
if r.status_code != 200:
logger.warning(f"No decision task with artifact {name} on {self.rev}.")
return []
return r.json()
@memoized_property
def _shadow_scheduler_artifacts(self):
"""Get the tasks artifact from the shadow scheduler task called 'name'.
Returns:
dict: A mapping of {<shadow scheduler name>: <tasks>}.
"""
artifacts = {}
for task in self._get_artifact_urls_from_label('shadow-scheduler'):
label = task['label']
found_url = None
for url in task['artifacts']:
if url.rsplit('/', 1)[1] == 'optimized_tasks.list':
found_url = url
index = label.find('shadow-scheduler-') + len('shadow-scheduler-')
artifacts[label[index:]] = found_url
return artifacts
@memoized_property
def _hgmo(self):

Просмотреть файл

@ -1,11 +1,16 @@
from: task
select:
- {name: id, value: task.id}
- {name: label, value: run.name}
- {name: artifacts, value: task.artifacts.url}
where:
and:
- prefix: {repo.changeset.id: {$eval: rev}}
- eq: {repo.branch.name: "autoland"}
- eq: {task.kind: "decision-task"}
- $if: label == "decision-task"
then:
eq: {task.kind: "decision-task"}
else:
find: {run.name: {$eval: label}}
format: list
limit: 100

Просмотреть файл

@ -18,6 +18,7 @@ from pathlib import Path
from adr import config
from adr.errors import MissingDataError
from adr.query import run_query
from adr.util.memoize import memoized_property
from icecream import ic
from loguru import logger
@ -41,6 +42,11 @@ RUN_CONTEXTS = [
"default": False,
"help": "Clone Gecko if the specified path does not exist.",
},
"strategies": {
"nargs": "+",
"default": [],
"help": "Strategy names to analyze.",
}
}
]
@ -67,23 +73,47 @@ class Score:
class Scheduler:
def __init__(self, path):
self.path = Path(path)
self.name = self.path.stem
def __init__(self, name):
self.name = name
self.score = Score()
def get_target_tasks(self, push):
if self.name == "baseline":
return push.target_task_labels
@memoized_property
def path(self):
# initialize schedulers to analyze
cwd = os.getcwd()
strategy_dir = here / "strategies"
for s in strategy_dir.glob("*.py"):
if s.stem == self.name:
return s
with open(self.path, "r") as fh:
scheduler_hash = config.cache._hash(fh.read())
def get_tasks(self, push):
key = f"scheduler.{push.rev}.{self.name}"
if self.path:
with open(self.path, "r") as fh:
scheduler_hash = config.cache._hash(fh.read())
key += f".{scheduler_hash}"
key = f"scheduler.{push.rev}.{scheduler_hash}"
if config.cache.has(key):
logger.debug(f"Loading target tasks from cache")
return config.cache.get(key)
# If we're baseline simply use the scheduled_task_labels.
if self.name == "baseline":
tasks = push.scheduled_task_labels
config.cache.put(key, tasks, 43200) # keep results for 30 days
return tasks
# Next check if a shadow scheduler matching our name ran on the push.
tasks = push.get_shadow_scheduler_tasks(self.name)
if tasks is not None:
config.cache.put(key, tasks, 43200) # keep results for 30 days
return tasks
# Finally fallback to generating the tasks locally.
if not GECKO or not self.path:
logger.error(f"error: shadow scheduler '{self.name}' not found!")
sys.exit(1)
logger.debug(f"Generating target tasks")
cmd = ["./mach", "taskgraph", "optimized", "--fast"]
env = os.environ.copy()
@ -96,17 +126,17 @@ class Scheduler:
output = subprocess.check_output(
cmd, env=env, cwd=GECKO, stderr=subprocess.DEVNULL
).decode("utf8")
target_tasks = set(output.splitlines())
tasks = set(output.splitlines())
config.cache.put(key, target_tasks, 43200) # keep results for 30 days
return target_tasks
config.cache.put(key, tasks, 43200) # keep results for 30 days
return tasks
def analyze(self, push):
target_tasks = self.get_target_tasks(push)
self.score.tasks += len(target_tasks)
tasks = self.get_tasks(push)
self.score.tasks += len(tasks)
if push.backedout:
if push.likely_regressions & target_tasks:
if push.likely_regressions & tasks:
self.score.primary_backouts += 1
else:
self.score.secondary_backouts += 1
@ -145,27 +175,20 @@ def clone_gecko():
def run(args):
global GECKO, logger
GECKO = args.gecko_path
if not GECKO:
logger.error("Must specify --gecko-path.")
sys.exit(1)
if args.gecko_path:
GECKO = args.gecko_path
if not Path(GECKO).is_dir():
if GECKO and not Path(GECKO).is_dir():
if args.clone:
clone_gecko()
else:
logger.error(f"Gecko path '{GECKO}' does not exist! Pass --clone to clone it to this location.")
sys.exit(1)
# initialize schedulers to analyze
cwd = os.getcwd()
strategy_dir = here / "strategies"
strategy_paths = [s for s in strategy_dir.glob("*.py") if s.name != "__init__.py"]
schedulers = []
for path in strategy_paths:
logger.debug(f"Creating scheduler using strategy from {path.relative_to(cwd)}")
schedulers.append(Scheduler(path))
for s in args.strategies:
logger.debug(f"Creating scheduler using strategy {s}")
schedulers.append(Scheduler(s))
# use what was actually scheduled as a baseline comparison
schedulers.append(Scheduler("baseline"))
@ -174,14 +197,17 @@ def run(args):
pushes = make_push_objects(
from_date=args.from_date, to_date=args.to_date, branch=args.branch
)
orig_rev = hg(["log", "-r", ".", "-T", "{node}"])
logger.debug(f"Found previous revision: {orig_rev}")
if GECKO:
orig_rev = hg(["log", "-r", ".", "-T", "{node}"])
logger.debug(f"Found previous revision: {orig_rev}")
try:
for i, push in enumerate(pushes):
logger.info(f"Analyzing https://treeherder.mozilla.org/#/jobs?repo=autoland&revision={push.rev} ({i+1}/{len(pushes)})") # noqa
hg(["update", push.rev])
if GECKO:
hg(["update", push.rev])
for scheduler in schedulers:
logger.opt(ansi=True).debug(f"<cyan>Scheduler {scheduler.name}</cyan>")
@ -191,8 +217,9 @@ def run(args):
logger.warning(f"MissingDataError: Skipping {push.rev}")
finally:
logger.debug("restoring repo")
hg(["update", orig_rev])
if GECKO:
logger.debug("restoring repo")
hg(["update", orig_rev])
header = [
"Scheduler",