Bug 1492664 - move list_task_group to taskgraph.taskcluster.util; r=bstack

--HG--
extra : rebase_source : 2ceef629af09aa6264e30977426a527b234c43db
extra : source : 870c8cec99e5f81814a8064a785c4c96f5f1aeca
This commit is contained in:
Dustin J. Mitchell 2018-10-02 14:19:11 +00:00
Родитель 8f32bbe7eb
Коммит bc68c63ae0
2 изменённых файлов: 21 добавлений и 24 удалений

Просмотреть файл

@ -10,33 +10,15 @@ import concurrent.futures as futures
import logging
import os
from taskgraph.util.taskcluster import get_session, cancel_task
from taskgraph.util.taskcluster import list_task_group, cancel_task
from .registry import register_callback_action
# the maximum number of parallel cancelTask calls to make
CONCURRENCY = 50
base_url = 'https://queue.taskcluster.net/v1/{}'
logger = logging.getLogger(__name__)
def list_group(task_group_id, session):
params = {}
while True:
url = base_url.format('task-group/{}/list'.format(task_group_id))
response = session.get(url, stream=True, params=params)
response.raise_for_status()
response = response.json()
for task in [t['status'] for t in response['tasks']]:
if task['state'] in ['running', 'pending', 'unscheduled']:
yield task['taskId']
if response.get('continuationToken'):
params = {'continuationToken': response.get('continuationToken')}
else:
break
@register_callback_action(
title='Cancel All',
name='cancel-all',
@ -51,12 +33,11 @@ def list_group(task_group_id, session):
context=[]
)
def cancel_all_action(parameters, graph_config, input, task_group_id, task_id, task):
session = get_session()
own_task_id = os.environ.get('TASK_ID', '')
with futures.ThreadPoolExecutor(CONCURRENCY) as e:
cancels_jobs = [
e.submit(cancel_task, t, use_proxy=True)
for t in list_group(task_group_id, session) if t != own_task_id
for t in list_task_group(task_group_id) if t != own_task_id
]
for job in cancels_jobs:
job.result()

Просмотреть файл

@ -55,9 +55,9 @@ def get_session():
return session
def _do_request(url, **kwargs):
def _do_request(url, force_get=False, **kwargs):
session = get_session()
if kwargs:
if kwargs and not force_get:
response = session.post(url, **kwargs)
else:
response = session.get(url, stream=True)
@ -259,10 +259,26 @@ def send_email(address, subject, content, link, use_proxy=False):
# Until bug 1460015 is finished, use the old baseUrl style of proxy URL
url = os.environ['TASKCLUSTER_PROXY_URL'] + '/notify/v1/email'
else:
url = liburls.api(os.environ['TASKCLUSTER_ROOT_URL'], 'notify', 'v1', 'email')
url = liburls.api(get_root_url(), 'notify', 'v1', 'email')
_do_request(url, json={
'address': address,
'subject': subject,
'content': content,
'link': link,
})
def list_task_group(task_group_id):
"""Generate the tasks in a task group"""
params = {}
while True:
url = liburls.api(get_root_url(), 'queue', 'v1',
'task-group/{}/list'.format(task_group_id))
resp = _do_request(url, force_get=True, params=params).json()
for task in [t['status'] for t in resp['tasks']]:
if task['state'] in ['running', 'pending', 'unscheduled']:
yield task['taskId']
if resp.get('continuationToken'):
params = {'continuationToken': resp.get('continuationToken')}
else:
break