2019-04-26 15:56:49 +03:00
|
|
|
#!/bin/env python
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
#
|
|
|
|
# Copyright 2019 Mozilla
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
"""
|
|
|
|
This script triggers the data pipeline for the bugbug project
|
|
|
|
"""
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
|
|
|
|
import jsone
|
|
|
|
import requests.packages.urllib3
|
|
|
|
import taskcluster
|
|
|
|
import yaml
|
|
|
|
|
|
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
|
2019-11-09 00:13:10 +03:00
|
|
|
TASKCLUSTER_DEFAULT_URL = "https://community-tc.services.mozilla.com"
|
2019-04-26 15:56:49 +03:00
|
|
|
|
|
|
|
|
|
|
|
def get_taskcluster_options():
|
|
|
|
"""
|
|
|
|
Helper to get the Taskcluster setup options
|
|
|
|
according to current environment (local or Taskcluster)
|
|
|
|
"""
|
|
|
|
options = taskcluster.optionsFromEnvironment()
|
|
|
|
proxy_url = os.environ.get("TASKCLUSTER_PROXY_URL")
|
|
|
|
|
|
|
|
if proxy_url is not None:
|
|
|
|
# Always use proxy url when available
|
|
|
|
options["rootUrl"] = proxy_url
|
|
|
|
|
|
|
|
if "rootUrl" not in options:
|
|
|
|
# Always have a value in root url
|
|
|
|
options["rootUrl"] = TASKCLUSTER_DEFAULT_URL
|
|
|
|
|
|
|
|
return options
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(description="Spawn tasks for bugbug data pipeline")
|
|
|
|
parser.add_argument("data_pipeline_json")
|
|
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
decision_task_id = os.environ.get("TASK_ID")
|
|
|
|
options = get_taskcluster_options()
|
|
|
|
add_self = False
|
|
|
|
if decision_task_id:
|
|
|
|
add_self = True
|
|
|
|
task_group_id = decision_task_id
|
|
|
|
else:
|
|
|
|
task_group_id = taskcluster.utils.slugId()
|
|
|
|
keys = {"taskGroupId": task_group_id}
|
|
|
|
|
|
|
|
id_mapping = {}
|
|
|
|
|
|
|
|
# First pass, do the template rendering and dependencies resolution
|
|
|
|
tasks = []
|
|
|
|
|
|
|
|
with open(args.data_pipeline_json) as pipeline_file:
|
2019-06-08 00:34:07 +03:00
|
|
|
raw_tasks = yaml.safe_load(pipeline_file.read())
|
2019-04-26 15:56:49 +03:00
|
|
|
|
2019-12-11 23:24:38 +03:00
|
|
|
version = os.getenv("TAG", "latest")
|
|
|
|
context = {"version": version}
|
2019-06-23 00:18:08 +03:00
|
|
|
rendered = jsone.render(raw_tasks, context)
|
2019-04-26 15:56:49 +03:00
|
|
|
|
2019-06-23 00:18:08 +03:00
|
|
|
for task in rendered["tasks"]:
|
2019-04-26 15:56:49 +03:00
|
|
|
# We need to generate new unique task ids for taskcluster to be happy
|
|
|
|
# but need to identify dependencies across tasks. So we create a
|
|
|
|
# mapping between an internal ID and the generate ID
|
|
|
|
|
|
|
|
task_id = taskcluster.utils.slugId()
|
2019-06-23 00:18:08 +03:00
|
|
|
task_internal_id = task.pop("ID")
|
2019-04-26 15:56:49 +03:00
|
|
|
|
|
|
|
if task_internal_id in id_mapping:
|
2019-06-07 12:04:33 +03:00
|
|
|
raise ValueError(f"Conflicting IDs {task_internal_id}")
|
2019-04-26 15:56:49 +03:00
|
|
|
|
|
|
|
id_mapping[task_internal_id] = task_id
|
|
|
|
|
|
|
|
for key, value in keys.items():
|
2019-06-23 00:18:08 +03:00
|
|
|
task[key] = value
|
2019-04-26 15:56:49 +03:00
|
|
|
|
2019-12-11 23:24:38 +03:00
|
|
|
task_payload = task["payload"]
|
|
|
|
|
|
|
|
if "env" in task_payload and task_payload["env"]:
|
|
|
|
if "$merge" not in task_payload["env"]:
|
|
|
|
task_payload["env"] = {"$merge": [task_payload["env"]]}
|
|
|
|
|
|
|
|
task_payload["env"]["$merge"].append({"TAG": version})
|
|
|
|
else:
|
|
|
|
task_payload["env"] = {
|
|
|
|
"TAG": version,
|
|
|
|
}
|
|
|
|
|
2019-04-26 15:56:49 +03:00
|
|
|
# Process the dependencies
|
|
|
|
new_dependencies = []
|
2019-06-23 00:18:08 +03:00
|
|
|
for dependency in task.get("dependencies", []):
|
2019-04-26 15:56:49 +03:00
|
|
|
new_dependencies.append(id_mapping[dependency])
|
|
|
|
|
|
|
|
if add_self:
|
|
|
|
new_dependencies.append(decision_task_id)
|
|
|
|
|
2019-06-23 00:18:08 +03:00
|
|
|
task["dependencies"] = new_dependencies
|
2019-04-26 15:56:49 +03:00
|
|
|
|
2019-06-23 00:18:08 +03:00
|
|
|
tasks.append((task_id, task))
|
2019-04-26 15:56:49 +03:00
|
|
|
|
|
|
|
# Now sends them
|
|
|
|
queue = taskcluster.Queue(options)
|
|
|
|
try:
|
|
|
|
for task_id, task_payload in tasks:
|
|
|
|
queue.createTask(task_id, task_payload)
|
|
|
|
|
2019-11-09 00:13:10 +03:00
|
|
|
print(f"https://community-tc.services.mozilla.com/tasks/groups/{task_group_id}")
|
2019-04-26 15:56:49 +03:00
|
|
|
except taskcluster.exceptions.TaskclusterAuthFailure as e:
|
2019-06-07 12:04:33 +03:00
|
|
|
print(f"TaskclusterAuthFailure: {e.body}", file=sys.stderr)
|
2019-04-26 15:56:49 +03:00
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|