Bug 1922641 - Update pulse handling to account for new pulse events around automatic retries (#8240)

This commit is contained in:
Andrew Halberstadt 2024-10-15 15:36:14 -04:00 коммит произвёл GitHub
Родитель ae3da6eb19
Коммит 68e0569cac
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
1 изменённых файлов: 10 добавлений и 25 удалений

Просмотреть файл

@ -188,14 +188,6 @@ async def handle_message(message, task_definition=None):
task_type = EXCHANGE_EVENT_MAP.get(message["exchange"])
# Originally this code was only within the "pending" case, however, in order to support
# ingesting all tasks at once which might not have "pending" case
# If the job is an automatic rerun we mark the previous run as "retry"
# This will only work if the previous run has not yet been processed by Treeherder
# since _remove_existing_jobs() will prevent it
if message["payload"]["runId"] > 0:
jobs.append(await handle_task_rerun(parsed_route, task, message, session))
if not task_type:
raise Exception("Unknown exchange: {exchange}".format(exchange=message["exchange"]))
elif task_type == "pending":
@ -292,21 +284,6 @@ def handle_task_pending(push_info, task, message):
return build_message(push_info, task, payload["runId"], payload)
async def handle_task_rerun(push_info, task, message, session):
payload = message["payload"]
job = build_message(push_info, task, payload["runId"] - 1, payload)
job["state"] = "completed"
job["result"] = "fail"
job["isRetried"] = True
# reruns often have no logs, so in the interest of not linking to a 404'ing artifact,
# don't include a link
job["logs"] = []
job = await add_artifact_uploaded_links(
message["root_url"], payload["status"]["taskId"], payload["runId"] - 1, job, session
)
return job
def handle_task_running(push_info, task, message):
payload = message["payload"]
job = build_message(push_info, task, payload["runId"], payload)
@ -332,13 +309,21 @@ async def handle_task_completed(push_info, task, message, session):
async def handle_task_exception(push_info, task, message, session):
payload = message["payload"]
job_run = payload["status"]["runs"][payload["runId"]]
runs = payload["status"]["runs"]
run_id = payload["runId"]
job_run = runs[run_id]
# Do not report runs that were created as an exception. Such cases
# are deadline-exceeded
if job_run["reasonCreated"] == "exception":
return
job = build_message(push_info, task, payload["runId"], payload)
# Check if the next run was an automatic retry.
if len(runs) > run_id + 1:
next_run = runs[run_id + 1]
job["isRetried"] = next_run["reasonCreated"] in ("retry", "task-retry")
# Jobs that get cancelled before running don't have a started time
if job_run.get("started"):
job["timeStarted"] = job_run["started"]
@ -347,7 +332,7 @@ async def handle_task_exception(push_info, task, message, session):
# don't include a link
job["logs"] = []
job = await add_artifact_uploaded_links(
message["root_url"], payload["status"]["taskId"], payload["runId"], job, session
message["root_url"], payload["status"]["taskId"], run_id, job, session
)
return job