Bug 1530965 - Combine Pulse listeners (#6122)

Merge both Pulse listeners without switching to it.
We can switch to it in the future.
This commit is contained in:
Shubhank Saxena 2020-04-08 02:39:59 +05:30 коммит произвёл GitHub
Родитель d300d2424f
Коммит d2d54f8579
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 94 добавлений и 5 удалений

Просмотреть файл

@ -38,8 +38,10 @@ celery_scheduler: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery beat -
# TODO: Merge these two listeners into one since they use so little CPU each (bug 1530965). # TODO: Merge these two listeners into one since they use so little CPU each (bug 1530965).
pulse_listener_pushes: newrelic-admin run-program ./manage.py pulse_listener_pushes pulse_listener_pushes: newrelic-admin run-program ./manage.py pulse_listener_pushes
pulse_listener_tasks: newrelic-admin run-program ./manage.py pulse_listener_tasks pulse_listener_tasks: newrelic-admin run-program ./manage.py pulse_listener_tasks
# Both Pulse listeners; for now, do not run at the same time as the othe ones.
pulse_listener: newrelic-admin run-program ./manage.py pulse_listener
# Processes pushes/jobs from Pulse that were collected by `pulse_listener_{pushes,jobs)`. # Processes pushes/jobs from Pulse that were collected by `pulse_listener_{pushes,tasks}`.
worker_store_pulse_data: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q store_pulse_pushes,store_pulse_tasks --concurrency=3 worker_store_pulse_data: REMAP_SIGTERM=SIGQUIT newrelic-admin run-program celery worker -A treeherder --without-gossip --without-mingle --without-heartbeat -Q store_pulse_pushes,store_pulse_tasks --concurrency=3
# Handles the log parsing tasks scheduled by `worker_store_pulse_data` as part of job ingestion. # Handles the log parsing tasks scheduled by `worker_store_pulse_data` as part of job ingestion.

Просмотреть файл

@ -34,6 +34,9 @@
}, },
"HEROKU_REVIEW_APP": true, "HEROKU_REVIEW_APP": true,
"LOGGING_LEVEL": "INFO", "LOGGING_LEVEL": "INFO",
"PULSE_QUEUE_NAME": {
"generator": "secret"
},
"PULSE_RESULSETS_QUEUE_NAME": { "PULSE_RESULSETS_QUEUE_NAME": {
"generator": "secret" "generator": "secret"
}, },
@ -57,10 +60,14 @@
"size": "Standard-1X" "size": "Standard-1X"
}, },
"pulse_listener_pushes": { "pulse_listener_pushes": {
"quantity": 1, "quantity": 0,
"size": "Standard-1X" "size": "Standard-1X"
}, },
"pulse_listener_tasks": { "pulse_listener_tasks": {
"quantity": 0,
"size": "Standard-1X"
},
"pulse_listener": {
"quantity": 1, "quantity": 1,
"size": "Standard-1X" "size": "Standard-1X"
}, },

Просмотреть файл

@ -0,0 +1,39 @@
import environ
from django.core.management.base import BaseCommand
from treeherder.services.pulse import (JointConsumer,
prepare_joint_consumers)
env = environ.Env()
class Command(BaseCommand):
"""
Management command to read tasks and pushes from a set of pulse exchanges.
This adds the pushes to a celery queue called ```store_tasks_pushes``` and
```store_pulse_pushes```which does the actual storing of the pushes
in the database.
"""
help = "Read tasks and pushes from a set of pulse exchanges and queue for ingestion"
def handle(self, *args, **options):
# Specifies the Pulse services from which Treeherder will ingest push
# information. Sources can include properties `hgmo`, `github`, or both, to
# listen to events from those sources. The value is a JSON array of the form
# [{pulse_url: .., hgmo: true, root_url: ..}, ..]
pulse_sources = env.json(
"PULSE_SOURCES",
default=[{"root_url": "https://firefox-ci-tc.services.mozilla.com",
"github": True,
"hgmo": True,
"pulse_url": env("PULSE_URL"),
"tasks": True}])
listener_params = (JointConsumer, pulse_sources, [lambda key: "#.{}".format(key), None])
consumer = prepare_joint_consumers(listener_params)
try:
consumer.run()
except KeyboardInterrupt:
pass
self.stdout.write("Pulse and Task listening stopped......")

Просмотреть файл

@ -1,10 +1,14 @@
from .consumers import (PushConsumer, from .consumers import (JointConsumer,
PushConsumer,
TaskConsumer, TaskConsumer,
prepare_consumers) prepare_consumers,
prepare_joint_consumers)
__all__ = [ __all__ = [
"JointConsumer",
"PushConsumer", "PushConsumer",
"TaskConsumer", "TaskConsumer",
"prepare_consumers", "prepare_consumers",
"prepare_joint_consumers",
"pulse_conn", "pulse_conn",
] ]

Просмотреть файл

@ -187,12 +187,43 @@ class PushConsumer(PulseConsumer):
message.ack() message.ack()
class Consumers: class JointConsumer(PulseConsumer):
""" """
Run a collection of consumers in parallel. These may be connected to different Run a collection of consumers in parallel. These may be connected to different
AMQP servers, and Kombu only supports communicating wiht one connection per AMQP servers, and Kombu only supports communicating wiht one connection per
thread, so we use multiple threads, one per consumer. thread, so we use multiple threads, one per consumer.
""" """
queue_suffix = env("PULSE_QUEUE_NAME", default="queue")
def bindings(self):
rv = []
if self.source.get('hgmo'):
rv += HGMO_PUSH_BINDINGS
if self.source.get('github'):
rv += GITHUB_PUSH_BINDINGS
if self.source.get('tasks'):
rv += TASKCLUSTER_TASK_BINDINGS
return rv
@newrelic.agent.background_task(name='pulse-joint-listener.on_message', group='Pulse Listener')
def on_message(self, body, message):
exchange = message.delivery_info['exchange']
routing_key = message.delivery_info['routing_key']
logger.debug('received job message from %s#%s', exchange, routing_key)
if exchange.startswith('exchange/taskcluster-queue/v1/'):
store_pulse_tasks.apply_async(
args=[body, exchange, routing_key, self.root_url],
queue='store_pulse_tasks'
)
else:
store_pulse_pushes.apply_async(
args=[body, exchange, routing_key, self.root_url],
queue='store_pulse_pushes'
)
message.ack()
class Consumers:
def __init__(self, consumers): def __init__(self, consumers):
self.consumers = consumers self.consumers = consumers
@ -209,3 +240,9 @@ class Consumers:
def prepare_consumers(consumer_cls, sources, build_routing_key=None): def prepare_consumers(consumer_cls, sources, build_routing_key=None):
return Consumers([consumer_cls(source, build_routing_key) for source in sources]) return Consumers([consumer_cls(source, build_routing_key) for source in sources])
def prepare_joint_consumers(listening_params):
def unpacker(x, y, z): return x, y, z
consumer_class, sources, keys = unpacker(*listening_params)
return Consumers([consumer_class(source, key) for source, key in zip(sources, keys)])