From 68e0569caca88f8d69e3d721801a491a2a03d868 Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Tue, 15 Oct 2024 15:36:14 -0400 Subject: [PATCH] Bug 1922641 - Update pulse handling to account for new pulse events around automatic retries (#8240) --- treeherder/etl/taskcluster_pulse/handler.py | 35 ++++++--------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/treeherder/etl/taskcluster_pulse/handler.py b/treeherder/etl/taskcluster_pulse/handler.py index afc0d945d..c8b35b186 100644 --- a/treeherder/etl/taskcluster_pulse/handler.py +++ b/treeherder/etl/taskcluster_pulse/handler.py @@ -188,14 +188,6 @@ async def handle_message(message, task_definition=None): task_type = EXCHANGE_EVENT_MAP.get(message["exchange"]) - # Originally this code was only within the "pending" case, however, in order to support - # ingesting all tasks at once which might not have "pending" case - # If the job is an automatic rerun we mark the previous run as "retry" - # This will only work if the previous run has not yet been processed by Treeherder - # since _remove_existing_jobs() will prevent it - if message["payload"]["runId"] > 0: - jobs.append(await handle_task_rerun(parsed_route, task, message, session)) - if not task_type: raise Exception("Unknown exchange: {exchange}".format(exchange=message["exchange"])) elif task_type == "pending": @@ -292,21 +284,6 @@ def handle_task_pending(push_info, task, message): return build_message(push_info, task, payload["runId"], payload) -async def handle_task_rerun(push_info, task, message, session): - payload = message["payload"] - job = build_message(push_info, task, payload["runId"] - 1, payload) - job["state"] = "completed" - job["result"] = "fail" - job["isRetried"] = True - # reruns often have no logs, so in the interest of not linking to a 404'ing artifact, - # don't include a link - job["logs"] = [] - job = await add_artifact_uploaded_links( - message["root_url"], payload["status"]["taskId"], payload["runId"] - 1, job, session - ) - return job - - def handle_task_running(push_info, task, message): payload = message["payload"] job = build_message(push_info, task, payload["runId"], payload) @@ -332,13 +309,21 @@ async def handle_task_completed(push_info, task, message, session): async def handle_task_exception(push_info, task, message, session): payload = message["payload"] - job_run = payload["status"]["runs"][payload["runId"]] + runs = payload["status"]["runs"] + run_id = payload["runId"] + job_run = runs[run_id] # Do not report runs that were created as an exception. Such cases # are deadline-exceeded if job_run["reasonCreated"] == "exception": return job = build_message(push_info, task, payload["runId"], payload) + + # Check if the next run was an automatic retry. + if len(runs) > run_id + 1: + next_run = runs[run_id + 1] + job["isRetried"] = next_run["reasonCreated"] in ("retry", "task-retry") + # Jobs that get cancelled before running don't have a started time if job_run.get("started"): job["timeStarted"] = job_run["started"] @@ -347,7 +332,7 @@ async def handle_task_exception(push_info, task, message, session): # don't include a link job["logs"] = [] job = await add_artifact_uploaded_links( - message["root_url"], payload["status"]["taskId"], payload["runId"], job, session + message["root_url"], payload["status"]["taskId"], run_id, job, session ) return job