зеркало из https://github.com/mozilla/gecko-dev.git
131 строка
5.2 KiB
Python
131 строка
5.2 KiB
Python
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
import concurrent.futures as futures
|
|
import json
|
|
import sys
|
|
import logging
|
|
|
|
import six
|
|
|
|
from slugid import nice as slugid
|
|
from taskgraph.util.parameterization import resolve_timestamps
|
|
from taskgraph.util.time import current_json_time
|
|
from taskgraph.util.taskcluster import get_session, CONCURRENCY
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# this is set to true for `mach taskgraph action-callback --test`
|
|
testing = False
|
|
|
|
|
|
def create_tasks(graph_config, taskgraph, label_to_taskid, params, decision_task_id):
|
|
taskid_to_label = {t: l for l, t in six.iteritems(label_to_taskid)}
|
|
|
|
# when running as an actual decision task, we use the decision task's
|
|
# taskId as the taskGroupId. The process that created the decision task
|
|
# helpfully placed it in this same taskGroup. If there is no $TASK_ID,
|
|
# fall back to a slugid
|
|
scheduler_id = '{}-level-{}'.format(graph_config['trust-domain'], params['level'])
|
|
|
|
# Add the taskGroupId, schedulerId and optionally the decision task
|
|
# dependency
|
|
for task_id in taskgraph.graph.nodes:
|
|
task_def = taskgraph.tasks[task_id].task
|
|
|
|
# if this task has no dependencies *within* this taskgraph, make it
|
|
# depend on this decision task. If it has another dependency within
|
|
# the taskgraph, then it already implicitly depends on the decision
|
|
# task. The result is that tasks do not start immediately. if this
|
|
# loop fails halfway through, none of the already-created tasks run.
|
|
if not any(t in taskgraph.tasks for t in task_def.get('dependencies', [])):
|
|
task_def.setdefault('dependencies', []).append(decision_task_id)
|
|
|
|
task_def['taskGroupId'] = decision_task_id
|
|
task_def['schedulerId'] = scheduler_id
|
|
|
|
# If `testing` is True, then run without parallelization
|
|
concurrency = CONCURRENCY if not testing else 1
|
|
session = get_session()
|
|
with futures.ThreadPoolExecutor(concurrency) as e:
|
|
fs = {}
|
|
|
|
# We can't submit a task until its dependencies have been submitted.
|
|
# So our strategy is to walk the graph and submit tasks once all
|
|
# their dependencies have been submitted.
|
|
tasklist = set(taskgraph.graph.visit_postorder())
|
|
alltasks = tasklist.copy()
|
|
|
|
def schedule_tasks():
|
|
# bail out early if any futures have failed
|
|
if any(f.done() and f.exception() for f in fs.values()):
|
|
return
|
|
|
|
to_remove = set()
|
|
new = set()
|
|
|
|
def submit(task_id, label, task_def):
|
|
fut = e.submit(create_task, session, task_id, label, task_def)
|
|
new.add(fut)
|
|
fs[task_id] = fut
|
|
|
|
for task_id in tasklist:
|
|
task_def = taskgraph.tasks[task_id].task
|
|
# If we haven't finished submitting all our dependencies yet,
|
|
# come back to this later.
|
|
# Some dependencies aren't in our graph, so make sure to filter
|
|
# those out
|
|
deps = set(task_def.get('dependencies', [])) & alltasks
|
|
if any((d not in fs or not fs[d].done()) for d in deps):
|
|
continue
|
|
|
|
submit(task_id, taskid_to_label[task_id], task_def)
|
|
to_remove.add(task_id)
|
|
|
|
# Schedule tasks as many times as task_duplicates indicates
|
|
attributes = taskgraph.tasks[task_id].attributes
|
|
for i in range(1, attributes.get('task_duplicates', 1)):
|
|
# We use slugid() since we want a distinct task id
|
|
submit(slugid().decode("ascii"), taskid_to_label[task_id], task_def)
|
|
tasklist.difference_update(to_remove)
|
|
|
|
# as each of those futures complete, try to schedule more tasks
|
|
for f in futures.as_completed(new):
|
|
schedule_tasks()
|
|
|
|
# start scheduling tasks and run until everything is scheduled
|
|
schedule_tasks()
|
|
|
|
# check the result of each future, raising an exception if it failed
|
|
for f in futures.as_completed(fs.values()):
|
|
f.result()
|
|
|
|
|
|
def create_task(session, task_id, label, task_def):
|
|
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
|
|
# with credentials appropriate to this job.
|
|
|
|
# Resolve timestamps
|
|
now = current_json_time(datetime_format=True)
|
|
task_def = resolve_timestamps(now, task_def)
|
|
|
|
if testing:
|
|
json.dump([task_id, task_def], sys.stdout,
|
|
sort_keys=True, indent=4, separators=(',', ': '))
|
|
# add a newline
|
|
print("")
|
|
return
|
|
|
|
logger.debug("Creating task with taskId {} for {}".format(task_id, label))
|
|
res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id),
|
|
data=json.dumps(task_def))
|
|
if res.status_code != 200:
|
|
try:
|
|
logger.error(res.json()['message'])
|
|
except Exception:
|
|
logger.error(res.text)
|
|
res.raise_for_status()
|