Publish to pulse using neat utils

This commit is contained in:
Jonas Finnemann Jensen 2014-10-02 14:38:48 -07:00
Родитель 6fab3a0890
Коммит 4b724fdf32
4 изменённых файлов: 303 добавлений и 10 удалений

Просмотреть файл

@ -0,0 +1,25 @@
{
"id": "https://treeherder.mozilla.org/schemas/v1/resultset-message.json#",
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "New ResultSet Message",
"description": "Pulse message sent whenever a new result-set is created.",
"type": "object",
"properties": {
"version": {
"title": "Message-format version",
"enum": [1]
},
"project": {
"title": "Project Name",
"description": "Identifier for treeherder project, like `try` or `mozilla-central`.",
"type": "string"
},
"revision_hash": {
"title": "Revision Hash Identifier",
"description": "Identifier for the result-set that was created.",
"type": "string"
}
},
"additionalProperties": true,
"required": ["version", "revision_hash"]
}

Просмотреть файл

@ -0,0 +1,29 @@
from pulse_publisher import PulsePublisher, Exchange, Key
class TreeherderPublisher(PulsePublisher):
title = "TreeHerder Exchanges"
description = """
Exchanges for services that wants to know what shows up on TreeHerder.
"""
exchange_prefix = 'v1/'
new_result_set = Exchange(
exchange = 'new-result-set',
title = "New Result-Set Messages",
description = """
Whenever a new result-set is created a message featuring the
`revision_hash` is published on this exchange.
""",
routing_keys = [
Key(
name = 'project',
summary = "Project (or branch) that this result-set concerns"
),
Key(
name = 'revision_hash',
summary = "result-set identifier for the message"
)
],
schema = "https://treeherder.mozilla.org/schemas/v1/resultset-message.json#"
)

Просмотреть файл

@ -0,0 +1,193 @@
import kombu, re, json, os, jsonschema
def toCamelCase(input):
def replace(match):
return match.group(1).upper()
return re.sub(r'_(.)', replace, input)
def load_schemas(folder):
""" Load JSON schemas from folder """
schemas = {}
# List files in folder
for filename in os.listdir(folder):
# Skip non-json files
if not filename.endswith('.json'):
continue
# Read file and insert into schemas
with open(os.path.join(folder, filename)) as f:
data = json.load(f)
assert data.has_key('id'), "JSON schemas must have an 'id' property"
schemas[data['id']] = data
# Return schemas loaded
return schemas
class Exchange(object):
"""
Exchange declaration that can be used as property on a subclass of
PulsePublisher.
"""
def __init__(self, exchange, title, description, routing_keys, schema):
"""
Create exchange instance
"""
self.exchange = exchange
self.title = title
self.description = description
self.routing_keys = routing_keys
self.schema = schema
def message(self, message, **keys):
""" Construct message """
return message
def routing(self, message, **keys):
""" Construct routing key """
return '.'.join([key.build(**keys) for key in self.routing_keys])
def reference(self, name):
""" Construct reference entry with given name """
return {
'type': 'topic-exchange',
'exchange': self.exchange,
'name': toCamelCase(name),
'title': self.title,
'description': self.description,
'routingKey': [key.reference() for key in self.routing_keys],
'schema': self.schema
}
class Key(object):
""" Routing key entry """
def __init__(self, name, summary, required = True, multiple_words = False):
self.name = name
self.summary = summary
self.required = required
self.multiple_words = multiple_words
def build(self, **keys):
""" Build routing key entry """
key = keys.get(self.name)
# Ensure the key is present if required
if self.required and key is None:
raise ValueError("Key %s is required" % self.name)
key = key or '_'
# Check if has multiple words
if '.' in key and not self.multiple_words:
raise ValueError("Key %s cannot contain dots" % self.name)
# Return constructed key
return key
def reference(self):
""" Construct reference entry for this routing key entry """
return {
'name': toCamelCase(self.name),
'summary': self.summary,
'multipleWords': self.multiple_words,
'required': self.required
}
class PulsePublisher(object):
"""
Base class for pulse publishers.
All subclasses of this class must define the properties:
* title
* description
* exchange_prefix
Additional properties of type `Exchange` will be declared as exchanges.
"""
def __init__(self, client_id, access_token, schemas, namespace = None):
"""
Create publisher, requires a connection_string and a mapping from
JSON schema uris to JSON schemas.
"""
# Validate properties
assert hasattr(self, 'title'), "Title is required"
assert hasattr(self, 'description'), "description is required"
assert hasattr(self, 'exchange_prefix'), "exchange_prefix is required"
# Set attributes
self.client_id = client_id
self.access_token = access_token
self.schemas = schemas
self.namespace = namespace or client_id
self.exchanges = []
self.connection = kombu.Connection(
userid = client_id,
password = access_token,
hostname = 'pulse.mozilla.org',
virtual_host = '/',
port = 5671,
ssl = True,
transport = 'amqp',
# This should work, but it doesn't... Basically, docs either lie,
# or everything python is broken as usual. At this point the best
# work around to call publisher.connection.release() after
# publishing something. It doesn't provide the same safety, but it's
# better than closing process to early.
transport_options = {
'confirm_publish': True,
'block_for_ack': True
}
)
# Find exchanges
for name in dir(self):
exchange = getattr(self, name)
if isinstance(exchange, Exchange):
self.exchanges += ((name, exchange),)
# Wrap exchanges in functions
for name, exchange in self.exchanges:
# Create producer for the exchange
exchange_path = "exchange/%s/%s%s" % (
self.namespace,
self.exchange_prefix,
exchange.exchange
)
producer = kombu.Producer(
channel = self.connection,
exchange = kombu.Exchange(
name = exchange_path,
type = 'topic',
durable = True,
delivery_mode = 'persistent'
),
auto_declare = True
)
publish_message = self.connection.ensure(
producer, producer.publish, max_retries = 3
)
# Create publication method for the exchange
def publish(**kwargs):
message = exchange.message(**kwargs)
jsonschema.validate(message, self.schemas[exchange.schema])
publish_message(
body = json.dumps(message),
routing_key = exchange.routing(**kwargs),
content_type = 'application/json'
)
setattr(self, name, publish)
def error(self, error, exchange, routing_key, message):
print routing_key
print err
def reference(self):
""" Construct reference for this publisher"""
return {
'version': '0.2.0',
'title': self.title,
'description': self.description,
'exchangePrefix': "exchange/%s/%s" % (
self.namespace,
self.exchange_prefix
),
'entries': [ex.reference(name) for name, ex in self.exchanges]
}

Просмотреть файл

@ -78,20 +78,66 @@ def populate_performance_series(project, series_type, series_data):
)
jm.disconnect()
from exchanges import TreeherderPublisher
from pulse_publisher import load_schemas
import os
# Load schemas for validation of messages published on pulse
source_folder = os.path.dirname(os.path.realpath(__file__))
schema_folder = os.path.join(source_folder, '..', '..', 'schemas')
schemas = load_schemas(schema_folder)
# Find an appropriate namespace to publish to under, this will be removed when
# pulse guardian has multi-user, then we'll just not provide the namespace
# property and the settings property will provide client_id and access_token
# from which client_id will be the namespace.
namespace = 'treeherder-local'
if 'treeherder.allizom.org' in settings.SITE_URL:
namespace = 'treeherder-staging'
elif 'treeherder.mozilla.org' in settings.SITE_URL:
namespace = 'treeherder'
publisher = TreeherderPublisher(
client_id = 'public',
access_token = 'public',
schemas = schemas,
namespace = namespace
)
@task(name='publish-to-pulse')
def publish_to_pulse(project, ids, data_type):
jm = JobsModel(project)
# Get appropriate data for data_type
# using the ids provided
data = []
# Publish messages with new result-sets
if data_type == 'result_set':
data = jm.get_result_set_list_by_ids(ids)
# Get appropriate data for data_type
# using the ids provided
for entry in jm.get_result_set_list_by_ids(ids):
# Don't expose these properties, they are internal, at least that's
# what I think without documentation I have no clue... what any of
# this is
del entry['revisions'] # Not really internal, but too big
del entry['repository_id']
# Set required properties
entry['version'] = 1
entry['project'] = project
# Property revision_hash should already be there, I suspect it is the
# result-set identifier...
# publish the data to pulse
publisher.new_result_set(
message = entry,
revision_hash = entry['revision_hash'],
project = project
)
# Basically, I have no idea what context this runs and was inherently
# unable to make kombu with or without pyamqp, etc. confirm-publish,
# so we're stuck with this super ugly hack where we just close the
# connection so that if the process context is destroyed then at least
# messages will still get published... Well, assuming nothing goes
# wrong, because we're not using confirm channels for publishing...
publisher.connection.release()
jm.disconnect()
# TODO: publish the data to pulse