Support mozilla-central pulse messages in events (#1439)

* Base support for mozilla-central pulse messages

* Prevent awaiting a non declared queue
This commit is contained in:
Valentin Rigal 2023-01-12 13:55:16 +01:00 коммит произвёл GitHub
Родитель ec0d381978
Коммит d08be08c24
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 57 добавлений и 22 удалений

Просмотреть файл

@ -146,11 +146,12 @@ events:
# Each project has its own Sentry environment
SENTRY_DSN: https://xxx:yyy@sentry.com
# Does the system need to listen for autoland payloads on pulse
# This is needed to ingest autoland issues in the backend
# Does the system need to listen for autoland or mozilla-central payloads on pulse
# This is needed to ingest issues in the backend
autoland_enabled: true
mozilla_central_enabled: true
# Pulse authentication to get messages for the autoland triggers
# Pulse authentication to get messages for the autoland and mozilla-central triggers
pulse_user: xxx
pulse_password: yyy

Просмотреть файл

@ -19,3 +19,4 @@ QUEUE_PULSE_BUGBUG_TEST_SELECT = "pulse:bugbug_test_select"
QUEUE_BUGBUG = "bugbug"
QUEUE_BUGBUG_TRY_PUSH = "bugbug:try_push"
QUEUE_PULSE_AUTOLAND = "pulse:autoland"
QUEUE_PULSE_MOZILLA_CENTRAL = "pulse:mozilla_central"

Просмотреть файл

@ -53,6 +53,7 @@ def main():
repositories=[],
user_blacklist=[],
autoland_enabled=False,
mozilla_central_enabled=False,
skippable_files=[],
),
local_secrets=yaml.safe_load(args.configuration)

Просмотреть файл

@ -27,6 +27,7 @@ from code_review_events import QUEUE_MONITORING_COMMUNITY
from code_review_events import QUEUE_PHABRICATOR_RESULTS
from code_review_events import QUEUE_PULSE_AUTOLAND
from code_review_events import QUEUE_PULSE_BUGBUG_TEST_SELECT
from code_review_events import QUEUE_PULSE_MOZILLA_CENTRAL
from code_review_events import QUEUE_PULSE_TRY_TASK_END
from code_review_events import QUEUE_WEB_BUILDS
from code_review_events import community_taskcluster_config
@ -308,25 +309,34 @@ class CodeReview(PhabricatorActions):
else:
logger.warning("Unsupported publication", mode=mode, build=build)
async def trigger_autoland(self, payload: dict):
"""
Trigger a code review autoland ingestion task
If the task is an autoland decision task
"""
async def trigger_repository(self, payload: dict):
"""Trigger a code review from the ingestion task of a repository (all tasks are resolved)"""
logger.info(f"TRIGGER : {payload}")
assert (
payload["routing"]["exchange"] == PULSE_TASK_GROUP_RESOLVED
), "Not an autoland message"
), "Message was not published to task-group-resolved"
try:
# Load first task in task group, check if it's an autoland
# Load first task in task group, check if it's on autoland or mozilla-central
queue = taskcluster_config.get_service("queue")
logger.info(f"QUEUE : {queue}")
task_group_id = payload["body"]["taskGroupId"]
logger.debug("Checking autoland task", task_group_id=task_group_id)
logger.info(f"GROUP_ID: {task_group_id}")
logger.debug(
"Checking repository for the task group", task_group_id=task_group_id
)
task = queue.task(task_group_id)
repo = task["payload"]["env"].get("GECKO_HEAD_REPOSITORY")
if repo != "https://hg.mozilla.org/integration/autoland":
logger.debug("Not an autoland task", task=task_group_id)
repo_url = task["payload"]["env"].get("GECKO_HEAD_REPOSITORY")
logger.info(f"REPO_URL: {repo_url}")
if repo_url == "https://hg.mozilla.org/integration/autoland":
group_key = "AUTOLAND_TASK_GROUP_ID"
elif repo_url == "https://hg.mozilla.org/mozilla-central":
group_key = "MOZILLA_CENTRAL_TASK_GROUP_ID"
else:
logger.debug(
f"Repository {repo_url} is not supported", task=task_group_id
)
return
# Trigger the autoland ingestion task
@ -335,13 +345,15 @@ class CodeReview(PhabricatorActions):
task = hooks.triggerHook(
"project-relman",
f"code-review-{env}",
{"AUTOLAND_TASK_GROUP_ID": task_group_id},
{group_key: task_group_id},
)
task_id = task["status"]["taskId"]
logger.info("Triggered a new autoland ingestion task", id=task_id)
logger.info(f"Triggered a new ingestion task from {repo_url}", id=task_id)
except Exception as e:
logger.warn(
"Autoland trigger failure", key=payload["routing"]["key"], error=str(e)
"Repository trigger failure",
key=payload["routing"]["key"],
error=str(e),
)
@ -372,7 +384,7 @@ class Events(object):
self.webserver = WebServer(QUEUE_WEB_BUILDS)
self.webserver.register(self.bus)
# Create pulse listener
# Create pulse listeners
exchanges = {}
if taskcluster_config.secrets["autoland_enabled"]:
logger.info("Autoland ingestion is enabled")
@ -380,6 +392,12 @@ class Events(object):
exchanges[QUEUE_PULSE_AUTOLAND] = [
(PULSE_TASK_GROUP_RESOLVED, ["#.gecko-level-3.#"])
]
if taskcluster_config.secrets["mozilla_central_enabled"]:
logger.info("Mozilla-central ingestion is enabled")
# autoland ingestion
exchanges[QUEUE_PULSE_MOZILLA_CENTRAL] = [
(PULSE_TASK_GROUP_RESOLVED, ["#.gecko-level-3.#"])
]
# Create pulse listeners for bugbug test selection task and unit test failures.
if community_config is not None and test_selection_enabled:
@ -421,7 +439,10 @@ class Events(object):
)
# Manually register to set queue as redis
self.pulse.bus = self.bus
self.bus.add_queue(QUEUE_PULSE_AUTOLAND, redis=True)
if taskcluster_config.secrets["autoland_enabled"]:
self.bus.add_queue(QUEUE_PULSE_AUTOLAND, redis=True)
if taskcluster_config.secrets["mozilla_central_enabled"]:
self.bus.add_queue(QUEUE_PULSE_MOZILLA_CENTRAL, redis=True)
else:
self.pulse = None
@ -434,6 +455,7 @@ class Events(object):
# Register queues for workers
self.bus.add_queue(QUEUE_PULSE_AUTOLAND, redis=True)
self.bus.add_queue(QUEUE_PULSE_MOZILLA_CENTRAL, redis=True)
self.bus.add_queue(QUEUE_PULSE_BUGBUG_TEST_SELECT, redis=True)
self.bus.add_queue(QUEUE_PULSE_TRY_TASK_END, redis=True)
self.bus.add_queue(QUEUE_WEB_BUILDS, redis=True)
@ -513,14 +535,24 @@ class Events(object):
self.bus.run(self.workflow.process_build, QUEUE_WEB_BUILDS),
# Publish results on Phabricator
self.bus.run(self.workflow.publish_results, QUEUE_PHABRICATOR_RESULTS),
# Trigger autoland tasks
self.bus.run(self.workflow.trigger_autoland, QUEUE_PULSE_AUTOLAND),
# Send to phabricator results publication for normal processing and to bugbug for further analysis
self.bus.dispatch(
QUEUE_MERCURIAL_APPLIED,
[QUEUE_PHABRICATOR_RESULTS, QUEUE_BUGBUG_TRY_PUSH],
),
]
if taskcluster_config.secrets["autoland_enabled"]:
# Trigger autoland tasks
consumers.append(
self.bus.run(self.workflow.trigger_repository, QUEUE_PULSE_AUTOLAND)
)
if taskcluster_config.secrets["mozilla_central_enabled"]:
# Trigger mozilla-central tasks
consumers.append(
self.bus.run(
self.workflow.trigger_repository, QUEUE_PULSE_MOZILLA_CENTRAL
)
)
if self.bugbug_utils:
consumers += [