Merge pull request #15 from mozilla/pulse-consumer

Pulse consumer
This commit is contained in:
jeads 2013-04-18 18:50:41 -07:00
Родитель 9948ad1c80 cc45a27fac
Коммит 882ce71b0f
16 изменённых файлов: 2323 добавлений и 16 удалений

Просмотреть файл

@ -1,5 +1,6 @@
oauth2==1.5.211
South==0.7.6
python-memcached==1.48
mozillapulse==0.61
git+git://github.com/jeads/datasource@97ce9bb710

0
tests/__init__.py Normal file
Просмотреть файл

Просмотреть файл

@ -92,3 +92,9 @@ def increment_cache_key_prefix():
key_prefix_counter = 0
cache.set(prefix_counter_cache_key, key_prefix_counter)
cache.key_prefix = "t{0}".format(key_prefix_counter)
@pytest.fixture(scope='session')
def sample_data():
"""Returns a SampleData() object"""
from sampledata import SampleData
return SampleData()

Просмотреть файл

Просмотреть файл

@ -0,0 +1,38 @@
import pytest
from treeherder.pulse_consumer.consumer import PulseDataAdapter
def test_process_data(sample_data):
"""
Test the ability of PulseDataAdapter.process_data() to process the
raw data available in sample_data without missing any attributes.
"""
pda = PulseDataAdapter(
durable=False,
logdir='logs',
rawdata=False,
outfile=None
)
msg = Message()
for data in sample_data.raw_pulse_data:
data = pda.process_data(data, msg)
missing_attributes = pda.required_attributes.difference(
set( data.keys() )
)
assert set() == missing_attributes
class Message(object):
"""Class that mimics the message object interface from
mozilla pulse"""
def __init__(self):
pass
def ack(self):
pass

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

30
tests/sampledata.py Normal file
Просмотреть файл

@ -0,0 +1,30 @@
import json
import os
class SampleData(object):
def __init__(self):
self.job_data_file = "{0}/sample_data/job_data.txt".format(
os.path.dirname(__file__)
)
self.raw_pulse_data_file = "{0}/sample_data/raw_pulse_data.txt".format(
os.path.dirname(__file__)
)
self.job_data = []
self.raw_pulse_data = []
self.initialize_data()
def initialize_data(self):
with open(self.job_data_file) as f:
for line in f.readlines():
self.job_data.append( json.loads( line.strip() ) )
with open(self.raw_pulse_data_file) as f:
for line in f.readlines():
line = str(line)
self.raw_pulse_data.append( json.loads( line.strip() ) )

Просмотреть файл

@ -8,20 +8,42 @@
* The revision_hash is used for associating job data asynchronously
* with a single entry in project_jobs_1.result_set.
**************/
sources: {
gecko: "revision",
gaia: "revision",
mozharness: "revision,
...
sources: [
{
repository:"gecko",
"revision":"",
"comments":"",
"push_timestamp":"",
"commit_timestamp":""
},
{
repository:"gaia",
"revision":"",
"comments":"",
"push_timestamp":"",
"commit_timestamp":""
},
{
repository:"mozharness",
"revision":"",
"comments":"",
"push_timestamp":"",
"commit_timestamp":""
},
...
],
revision_hash:"",
jobs: [
{
#Stored in project_jobs_1.job.job_guid
job_guid:"",
#Stored in treeherder_reference_1.job_type.name
name:"build | mochitest_one | ...",
name:"build | mochitest_one | ...",
#Stored in treeherder_reference_1.product
product_name:"Firefox | Firefox OS | Thunderbird | ...",
@ -48,20 +70,20 @@
end_timestamp:"",
#Stored in treeherder_reference_1.machine.name
machine:"",
machine:"",
#Stored in:
# treeherder_reference_1.build_platform.os_name,
# treeherder_reference_1.build_platform.platform,
# treeherder_reference_1.build_platform.architecture,
# treeherder_reference_1.build_platform.platform,
# treeherder_reference_1.build_platform.architecture,
build_platform:{ os_name:"", platform:"", architecture:"" },
#Stored in:
# treeherder_reference_1.machine_platform.os_name,
# treeherder_reference_1.machine_platform.platform,
# treeherder_reference_1.machine_platform.architecture,
# treeherder_reference_1.machine_platform.platform,
# treeherder_reference_1.machine_platform.architecture,
machine_platform:{ os_name:"", platform:"", architecture:"" },
#Stored in treeherder_reference_1.option_collection and
#treeherder_reference_1.option
option_collection: {
@ -73,11 +95,13 @@
#Stored in project_jobs_1.job_log_url
log_references: [
{ url:"",
name:"" },
{ url:"http://ftp.mozilla.org/pub/mozilla.org/firefox/try-builds/mfowler@mozilla.com-64620dd73baa/try-macosx64-debug/try_snowleopard-debug_test-marionette-bm06-tests1-macosx-build1293.txt.gz",
#associating a name with the log allows for different types of
#processing to be applied to different types of logs
name:"unittest" },
...
],
#Stored project_jobs_1.job_artifact
artifact:"{
type:" json | img | ...",

Просмотреть файл

Просмотреть файл

@ -0,0 +1,901 @@
import json
import re
import time
import datetime
import sys
import hashlib
import socket
import signal
from django.conf import settings
from mozillapulse import consumers
from treeherder.pulse_consumer.daemon import Daemon
####
# The following variables were taken from util.py
#
# PLATFORMS_BUILDERNAME, BUILD_TYPE_BUILDERNAME, JOB_TYPE_BUILDERNAME
#
# http://mxr.mozilla.org/build/source/buildapi/buildapi/model/util.py
#
# TODO: Once these attributes are available as build properties in the
# pulse stream these structures can be removed.
####
PLATFORMS_BUILDERNAME = {
'linux-mock': {
'regexes': [
re.compile('^b2g .+_armv7a.+', re.IGNORECASE),
re.compile('^b2g linux32_gecko .+', re.IGNORECASE),
re.compile('^b2g_((?!(test|talos)).)+$', re.IGNORECASE),
re.compile('^Android (?!(?:Tegra|Armv6 Tegra|no-ionmonkey Tegra 250|4.0 Panda)).+'),
re.compile('.*linux.*', re.IGNORECASE),
],
'attributes': {
'os': 'linux',
'os_platform': 'gecko',
'arch': 'ARMv7',
'vm': False
}
},
'fedora': {
'regexes': [
re.compile('^Rev3 Fedora 12 .+'),
re.compile('jetpack-.*-fedora(?!64)'),
re.compile('^b2g_.+(opt|debug) test.+', re.IGNORECASE)
],
'attributes': {
'os': 'linux',
'os_platform': 'Fedora 12',
'arch':'x86',
'vm': False
}
},
'fedora64': {
'regexes': [
re.compile('Rev3 Fedora 12x64 .+'),
re.compile('jetpack-.*-fedora64'),
],
'attributes': {
'os': 'linux',
'os_platform': 'Fedora 12',
'arch': 'x86_64',
'vm': False
}
},
'ubuntu32_vm': {
'regexes':[
re.compile('Ubuntu VM 12.04 (?!x64).+'),
re.compile('jetpack-.*-ubuntu32(?:_vm)?'),
],
'attributes': {
'os': 'linux',
'os_platform': 'Ubuntu 12.04',
'arch': 'x86',
'vm': True
}
},
'ubuntu64_vm': {
'regexes':[
re.compile('Ubuntu VM 12.04 x64 .+'),
re.compile('jetpack-.*-ubuntu64(?:_vm)?'),
],
'attributes': {
'os': 'linux',
'os_platform': 'Ubuntu VM 12.04',
'arch': 'x86_64',
'vm': True
}
},
'ubuntu32_hw': {
'regexes':[
re.compile('Ubuntu HW 12.04 (?!x64).+'),
],
'attributes': {
'os': 'linux',
'os_platform': 'Ubuntu HW 12.04',
'arch': 'x86',
'vm': False
}
},
'ubuntu64_hw': {
'regexes':[
re.compile('Ubuntu HW 12.04 x64 .+'),
],
'attributes': {
'os': 'linux',
'os_platform': 'Ubuntu HW 12.04',
'arch': 'x86_64',
'vm': False
}
},
'leopard': {
'regexes':[
re.compile('^OS X 10\.5.+'),
re.compile('^Rev3 MacOSX Leopard 10\.5.+'),
re.compile('.*macosx(?!64).*'),
re.compile('jetpack-.*-leopard'),
],
'attributes': {
'os': 'mac',
'os_platform': 'OS X 10.5',
'arch': 'x86',
'vm': False
}
},
'snowleopard': {
'regexes':[
re.compile('^OS X 10\.6.+'),
re.compile('^Rev3 MacOSX Snow Leopard 10\.6.+'),
re.compile('.*macosx64.*'),
re.compile('jetpack-.*-snowleopard'),
],
'attributes': {
'os': 'mac',
'os_platform': 'OS X 10.6',
'arch': 'x86_64',
'vm': False
}
},
'snowleopard-r4': {
'regexes':[
re.compile('^Rev4 MacOSX Snow Leopard 10\.6.+'),
],
'attributes': {
'os': 'mac',
'os_platform': 'OS X 10.6',
'arch': 'x86_64',
'vm': False
}
},
'lion': {
'regexes':[
re.compile('^OS X 10\.7.+'),
re.compile('^Rev4 MacOSX Lion 10\.7.+'),
re.compile('jetpack-.*-lion'),
],
'attributes': {
'os': 'mac',
'os_platform': 'OS X 10.6',
'arch': 'x86_64',
'vm': False
}
},
'mountainlion': {
'regexes':[
re.compile('^Rev5 MacOSX Mountain Lion 10\.8+'),
re.compile('jetpack-.*-mountainlion'),
],
'attributes': {
'os': 'mac',
'os_platform': 'OS X 10.8',
'arch': 'x86_64',
'vm': False
}
},
'xp': {
'regexes':[
re.compile('^Rev3 WINNT 5\.1 .+'),
re.compile('jetpack-.*-xp'),
],
'attributes': {
'os': 'win',
'os_platform': 'WINNT 5.1',
'arch': 'x86',
'vm': False
}
},
'win2k3': {
'regexes':[
re.compile('^WINNT 5\.2 .+'),
re.compile('.*win32.*'),
],
'attributes': {
'os': 'win',
'os_platform': 'WINNT 5.2',
'arch': 'x86',
'vm': False
}
},
'win64': {
'regexes':[
re.compile('^WINNT 6\.1 .+'),
re.compile('.*win64.*'),
],
'attributes': {
'os': 'win',
'os_platform': 'WINNT 6.1',
'arch': 'x86_64',
'vm': False
}
},
'win7': {
'regexes':[
re.compile('^Rev3 WINNT 6\.1 '),
re.compile('jetpack-.*-win7'),
],
'attributes': {
'os': 'win',
'os_platform': 'Rev3 WINNT 6.1',
'arch': 'x86_64',
'vm': False
}
},
'win764': {
'regexes':[
re.compile('^Rev3 WINNT 6\.1 x64 .+'),
re.compile('jetpack-.*-w764'),
],
'attributes': {
'os': 'win',
'os_platform': 'Rev3 WINNT 6.1',
'arch': 'x86_64',
'vm': False
}
},
'win8': {
'regexes':[
re.compile('.*WINNT 6\.2 '),
],
'attributes': {
'os': 'win',
'os_platform': 'WINNT 6.3',
'arch': 'x86_64',
'vm': False
}
},
'tegra': {
'regexes':[
re.compile('^Android Tegra 250 .+'),
re.compile('^Android Armv6 Tegra 250 .+'),
re.compile('^Android no-ionmonkey Tegra 250 .+'),
],
'attributes': {
'os': 'android',
'os_platform': '2.2',
'arch': 'ARMv7',
'vm': False
}
},
'panda-android': {
'regexes':[
re.compile('^Android 4.0 Panda .+'),
],
'attributes': {
'os': 'android',
'os_platform': '4.0',
'arch': 'x86',
'vm': False
}
}
}
BUILD_TYPE_BUILDERNAME = {
'opt': [
re.compile('.+ opt .+'),
re.compile('.+(?<!leak test) build'),
re.compile('.+ talos .+'), # all talos are made only for opt
re.compile('.+ nightly$'), # all nightly builds are opt
re.compile('.+ xulrunner$'), # nightly
re.compile('.+ code coverage$'), # nightly
],
'debug': [
re.compile('.+ debug .+'),
re.compile('.+ leak test build'),
],
}
JOB_TYPE_BUILDERNAME = {
'build': [
re.compile('.+ build'),
re.compile('.+(?<!l10n) nightly$'), # all 'nightly'-s are builds
re.compile('.+ xulrunner$'), # nightly
re.compile('.+ code coverage$'), # nightly
],
'unittest': [re.compile('.+(?<!leak) test .+')],
'talos': [re.compile('.+ talos .+')],
'repack': [re.compile('.+ l10n .+')],
}
class PulseDataAdapter(object):
"""Base class for adapting the pulse stream to a consumable data structure"""
def __init__(
self, rawdata=None, outfile=None, durable=False,
context='dataadapter', logdir='logs'):
self.data = {}
####
#TODO: Put appropriate data in logdir
####
self.logdir = logdir
self.context = context
self.durable = durable
self.rawdata = rawdata
#Set the output stream to write to
self.outstream = None
if outfile:
if outfile == 'stdout':
outfile = sys.stdout
else:
outfile = open(outfile, 'w')
self.outstream = outfile
#Setup signal handler
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
"""
data_attributes description
key - set of '.' delimited keys in the raw pulse stream
processor - function reference called with the data
structure specified in the key.
Ex: processor(attr_table, pulse_data, data)
attr_table - List of attributes to process in the data
structure specified by the key.
attr - The attribute name in the raw pulse stream.
attr_test - A list of strings to match against the
attribute in the raw pulse stream.
cb - function reference that's called instead of
executing the default behavior. Use this when
special processing of the raw data for an attribute
is required.
Ex: cb(attr, pulse_value, data)
"""
self.data_attributes = {
'_meta': {
'processor':self.process_raw_data_dict,
'attr_table':[
{ 'attr':'routing_key',
'cb':self.get_routing_key_data
}
]
},
'payload.build': {
'processor':self.process_raw_data_dict,
'attr_table':[
{ 'attr':'results' },
{ 'attr':'slave' },
{ 'attr':'times', 'cb':self.get_times_data },
{ 'attr':'blame' },
{ 'attr':'reason' },
]
},
'payload.build.sourceStamp.changes': {
'processor':self.process_sourcestamp_changes_list,
'attr_table':[
{ 'attr':'who' },
{ 'attr':'when' },
{ 'attr':'comments' },
]
},
'payload.build.properties': {
'processor':self.process_property_list,
'attr_table':[
{ 'attr':'revision' },
{ 'attr':'product' },
{ 'attr':'branch' },
{ 'attr':'platform' },
{ 'attr':'buildid' },
{ 'attr':'log_url' },
{ 'attr':'buildername',
'cb':self.get_buildername_data
},
{ 'attr':'slavename' },
{ 'attr':'request_ids' },
{ 'attr':'request_times' },
{
'attr':'buildurl',
'attr_test':['packageUrl', 'build_url', 'fileURL']
},
],
}
}
#Build list of required attributes for data validation
self.required_attributes = set(
#These are attributes set outside of the attr_table's in
#self.data_attributes
[ 'os', 'os_platform', 'arch', 'vm', 'buildtype', 'test_name' ]
)
for key in self.data_attributes:
for attr_dict in self.data_attributes[key]['attr_table']:
self.required_attributes.add(attr_dict['attr'])
#
# TODO: This list of routing key strings were excluded from
# processing in the current PulseBuildbotTranslator. Confirm
# if we need to exclude any of these and then use or remove
#self.exclude_routing_key_regex = re.compile(
# r'[schedulers|tag|submitter|final_verification|fuzzer|source|repack|jetpack|finished]'
# )
#set pulse consumer labels
app_label_base = 'pulse-{0}-consumer-{1}-{2}'
self.buildapp_label = app_label_base.format(
'build', self.context, socket.gethostname()
)
#initialize consumers
self.pulse = consumers.BuildConsumer(
applabel=self.buildapp_label
)
#configure consumers
self.pulse.configure(
#####
#TODO: Register a specialized adapter for #.finished
# to record the heartbeat of the push. This will
# require adding the request_ids and request_times
# to the .finished data structure.
#
#topic=['#.finished', '#.log_uploaded'],
#####
topic=['#.log_uploaded'],
callback=self.process_data,
durable=self.durable
)
def start(self):
"""Start the pulse listener"""
self.pulse.listen()
def signal_handler(self, signal, frame):
"""POSIX signal handler"""
#close outstream if we have one
if self.outstream:
self.outstream.close()
sys.exit(0)
def process_data(self, raw_data, message):
"""Process the raw data from the pulse stream using the
processor and attributes specified in the data_attributes
structure."""
message.ack()
data = {}
for attr_key in self.data_attributes:
#retrieve raw_data reference by the attr_key
pulse_data_target = self._get_data(attr_key, raw_data)
#call the data processor
self.data_attributes[attr_key]['processor'](
self.data_attributes[attr_key]['attr_table'],
pulse_data_target, data
)
#Validate data
missing_attributes = self.required_attributes.difference(
set( data.keys() )
)
if missing_attributes:
####
#TODO: We will need to develop a logging strategy here
# not exactly sure what it should be. Need to get
# more of the required data into the pulse stream
# before we can determine what should be logged.
# Will need to me mindful of where errors are raised
# when running as a daemon since stderr is sent to
# /dev/null, program will die silently in this conditional.
#
#raise PulseMissingAttributesError(
# missing_attributes, data, raw_data
# )
pass
else:
#Carry out data processing that requires all of the
#attributes being populated
data = self.adapt_data(data)
if self.outstream:
self.outstream.write(json.dumps(data) + "\n")
self.outstream.flush()
if self.rawdata:
self.outstream.write(json.dumps(raw_data) + "\n")
self.outstream.flush()
return data
def process_raw_data_dict(self, attr_table, pulse_data, data):
"""Process a pulse stream dictionary"""
for attr_data in attr_table:
attr = attr_data.get('attr', None)
cb = attr_data.get('cb', None)
pulse_value = pulse_data.get(attr, None)
if cb:
cb(attr, pulse_value, data)
else:
if (type( pulse_value ) == list) and (len(pulse_value) > 0):
data[attr] = pulse_value[0]
else:
data[attr] = pulse_value
def process_property_list(self, attr_table, pulse_data, data):
"""Process the pulse stream property list"""
for datum in pulse_data:
for attr_data in attr_table:
attr = attr_data.get('attr', None)
attr_test = attr_data.get('attr_test', None)
if ( attr_test and (datum[0] in attr_test) ) or \
( attr and (attr in datum[0]) ):
cb = attr_data.get('cb', None)
if cb:
cb(datum[0], datum[1], data)
else:
data[attr] = datum[1]
def process_sourcestamp_changes_list(self, attr_table, pulse_data, data):
"""Process sourcestamp changes list"""
if (type( pulse_data ) == list) and (len(pulse_data) > 0):
self.process_raw_data_dict(attr_table, pulse_data[0], data)
def adapt_data(self, data):
"""Execute any required post processing steps and return the
updated data structure. This is an interface function for
derived classes to use to adapt the data in different ways."""
return JobData(data)
def get_buildername_data(self, attr, value, data):
"""Callback function for the buildername property in the pulse stream"""
#set buildername
data[attr] = value
#extend data with platform attributes
for platform_name in PLATFORMS_BUILDERNAME:
for regex in PLATFORMS_BUILDERNAME[platform_name]['regexes']:
if regex.search(value):
data.update(
PLATFORMS_BUILDERNAME[platform_name]['attributes']
)
data['platform_name'] = platform_name
break
#extend data with build type attributes
for build_type in BUILD_TYPE_BUILDERNAME:
for regex in BUILD_TYPE_BUILDERNAME[build_type]:
if regex.search(value):
data['buildtype'] = build_type
break
if 'buildtype' not in data:
data['buildtype'] = 'opt'
#extend data with job type data
for job_type in JOB_TYPE_BUILDERNAME:
for regex in JOB_TYPE_BUILDERNAME[job_type]:
if regex.search(value):
data['jobtype'] = job_type
break
buildername_fields = value.split()
data['test_name'] = buildername_fields[ len( buildername_fields ) - 1 ]
def get_times_data(self, attr, value, data):
"""Callback function for the build.times property in the pulse stream"""
data['times'] = {
'start_timestamp':time.mktime(
datetime.datetime.strptime(
value[0], "%Y-%m-%dT%H:%M:%S+0000").timetuple()
),
'end_timestamp':time.mktime(
datetime.datetime.strptime(
value[1], "%Y-%m-%dT%H:%M:%S+0000").timetuple()
)
}
def get_routing_key_data(self, attr, value, data):
"""Callback function for the routing_key property"""
#set buildername
data[attr] = value
def _get_data(self, attribute_key, raw_data):
"""Uses the attribute key to return the target data structure
in raw data. The attribute key should be a set of strings
delimited by '.'s, where each string is an entry in the raw
data dict provided."""
fields = attribute_key.split('.')
target_struct = None
for idx, f in enumerate(fields):
try:
if idx == 0:
target_struct = raw_data[f]
else:
target_struct = target_struct[f]
except KeyError:
msg = "In {0} not found in pulse data.".format(attribute_key)
raise PulseDataAttributeError(f, msg)
return target_struct
class TreeherderDataAdapter(PulseDataAdapter):
"""Data adapter class that converts the PulseDataAdapter
structure into the data structure accepted by treeherder."""
def __init__(self, **kwargs):
super(TreeherderDataAdapter, self).__init__(**kwargs)
def get_revision_hash(self, revisions):
"""Builds the revision hash for a set of revisions"""
sh = hashlib.sha1()
sh.update(
''.join( map( lambda x: str(x), revisions ) )
)
return sh.hexdigest()
def get_job_guid(self, request_id, request_time):
"""Converts a request_id and request_time into a guid"""
sh = hashlib.sha1()
sh.update( str( request_id ) )
sh.update( str( request_time ) )
return sh.hexdigest()
def adapt_data(self, data):
"""Adapts the PulseDataAdapter into the treeherder input data structure"""
treeherder_data = {
'sources': { },
#Include branch so revision hash with the same revision is still
#unique across branches
'revision_hash': self.get_revision_hash(
[ data['revision'], data['branch'] ]
),
'jobs': []
}
treeherder_data['sources'] = []
####
#TODO: This is a temporary fix, this data will not be located
# in the sourceStamp in the pulse stream. It will likely
# be in other build properties but for now this will work.
# Once the new properties are added they need to be incorporated
# here.
####
treeherder_data['sources'].append(
{ 'repository':data['branch'],
'revision':data['revision'],
'push_timestamp':data['when'],
'commit_timestamp':data['when'],
'comments':data['comments'] }
)
request_id = data['request_ids'][0]
job = {
'job_guid': self.get_job_guid(
#The keys in this dict are unicode but the values in
#request_ids are not, this explicit cast could cause
#problems if the data added to the pulse stream is
#modified
request_id, data['request_times'][ unicode(request_id) ]
),
'name':data['test_name'],
'product_name':data['product'],
'state':'TODO',
#Do we need to map this to the strings in the sample structure?
'result':data['results'],
'reason':data['reason'],
#There is both a who and blame that appear to be identical in the
#pulse stream, is who the way to go?
'who':data['who'],
#This assumes the 0 element in request_ids is the id for the
#job which is not always true if there are coalesced jobs. This will need
#to be updated when https://bugzilla.mozilla.org/show_bug.cgi?id=862633
#is resolved.
'submit_timestamp': data['request_times'][ unicode(request_id) ],
'start_timestamp': data['times']['start_timestamp'],
'end_timestamp': str( int( time.time() ) ),
'machine': data['slave'],
'build_platform': {
'os_name': data['os'],
'platform': data['os_platform'],
'architecture': data['arch'],
'vm': data['vm']
},
#where are we going to get this data from?
'machine_platform': {
'os_name': data['os'],
'platform': data['os_platform'],
'architecture': data['arch'],
'vm': data['vm']
},
'option_collection': {
data['buildtype']:True
},
'log_references':[
{ 'url':data['log_url'],
#using the jobtype as a name for now, the name allows us
#to have different log types with their own processing
'name':data['jobtype'] },
],
'artifact':{}
}
treeherder_data['jobs'].append(job)
return JobData(treeherder_data)
class PulseMessageError(Exception):
"""Error base class for pulse messages"""
def __init__(self, key, error):
self.key = key
self.error = error
def __str__(self):
return "%s, key: %s" % (self.error, self.key)
class PulseDataAttributeError(PulseMessageError): pass
class PulseMissingAttributesError(PulseMessageError):
def __init__(self, missing_attributes, data, raw_data):
self.missing_attributes = missing_attributes
self.data = data
self.raw_data = raw_data
def __str__(self):
msg = "The following attributes were not found: {0} in routing_key:{1}\nbuildername:{2}\n{3}\n{4}".format(
','.join(self.missing_attributes), self.data['routing_key'],
self.data['buildername'], self.data, self.raw_data
)
return msg
class JobDataError(ValueError): pass
class JobData(dict):
"""
Encapsulates data access from incoming test data structure.
All missing-data errors raise ``JobDataError`` with a useful
message. Unlike regular nested dictionaries, ``JobData`` keeps track of
context, so errors contain not only the name of the immediately-missing
key, but the full parent-key context as well.
"""
def __init__(self, data, context=None):
"""Initialize ``JobData`` with a data dict and a context list."""
self.context = context or []
super(JobData, self).__init__(data)
@classmethod
def from_json(cls, json_blob):
"""Create ``JobData`` from a JSON string."""
try:
data = json.loads(json_blob)
except ValueError as e:
raise JobDataError("Malformed JSON: {0}".format(e))
return cls(data)
def __getitem__(self, name):
"""Get a data value, raising ``JobDataError`` if missing."""
full_context = list(self.context) + [name]
try:
value = super(JobData, self).__getitem__(name)
except KeyError:
raise JobDataError("Missing data: {0}.".format(
"".join(["['{0}']".format(c) for c in full_context])))
# Provide the same behavior recursively to nested dictionaries.
if isinstance(value, dict):
value = self.__class__(value, full_context)
return value
class TreeherderPulseDaemon(Daemon):
def __init__(
self,
pidfile,
treeherder_data_adapter=TreeherderDataAdapter(
durable=False,
logdir='logs',
rawdata=False,
outfile=None
),
stdin='/dev/null',
stdout='/dev/null',
stderr='/dev/null'):
self.tda = treeherder_data_adapter
super(TreeherderPulseDaemon, self).__init__(
pidfile, stdin='/dev/null', stdout='/dev/null',
stderr='/dev/null')
def run(self):
self.tda.start()

Просмотреть файл

@ -0,0 +1,134 @@
import sys, os, time, atexit
from signal import SIGTERM
"""
Python daemon for Python 2.x obtained from
http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
"""
class Daemon(object):
"""
A generic daemon class.
Usage: subclass the Daemon class and override the run() method
"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.pidfile = pidfile
def daemonize(self):
"""
do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# decouple from parent environment
os.chdir("/")
os.setsid()
os.umask(0)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError, e:
sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = file(self.stdin, 'r')
so = file(self.stdout, 'a+')
se = file(self.stderr, 'a+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
atexit.register(self.delpid)
pid = str(os.getpid())
file(self.pidfile,'w+').write("%s\n" % pid)
def delpid(self):
try:
os.remove(self.pidfile)
except OSError, e:
pass
def start(self):
"""
Start the daemon
"""
# Check for a pidfile to see if the daemon already runs
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if pid:
message = "pidfile %s already exist. Daemon already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
self.run()
def stop(self):
"""
Stop the daemon
"""
# Get the pid from the pidfile
try:
pf = file(self.pidfile,'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
if not pid:
message = "pidfile %s does not exist. Daemon not running?\n"
sys.stderr.write(message % self.pidfile)
return # not an error in a restart
# Try killing the daemon process
try:
while 1:
os.kill(pid, SIGTERM)
time.sleep(0.1)
except OSError, err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print str(err)
sys.exit(1)
def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
def run(self):
"""
You should override this method when you subclass Daemon. It will be called after the process has been
daemonized by start() or restart().
"""

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -0,0 +1,117 @@
import os
from optparse import make_option
from django.core.management.base import BaseCommand
from treeherder.pulse_consumer.consumer import TreeherderDataAdapter, TreeherderPulseDaemon
class Command(BaseCommand):
"""Management command to run mozilla pulse consumer."""
help = (
"Manages mozilla pulse consumer daemon to listen for product build "
"and test data events.\n\n"
"Example: Write job data structures to stdout\n"
"manage.py start_pulse_consumer --start --outfile 'stdout'"
)
option_list = BaseCommand.option_list + (
make_option('--start',
action='store_true',
dest='start',
help="Start the daemon." ),
make_option('--stop',
action='store_true',
dest='stop',
help=("Stop the daemon. If no pidfile is supplied "
"pulse_consumer.pid is used." )),
make_option('--restart',
action='store_true',
dest='restart',
help="Restart the daemon." ),
make_option('--daemon',
action='store_true',
dest='daemon',
help='Run as daemon (posix only). Requires sudo.'),
make_option('--pidfile',
action='store_true',
dest='pidfile',
default='{0}/pulse_consumer.pid'.format(os.getcwd()),
help='Path to file for loggin pid.'),
make_option('--durable',
action='store_true',
dest='durable',
help=("Should only be used in production. Causes pulse "
"to store data for consumer when disconnected.")),
make_option('--logdir',
action='store',
dest='logdir',
help=("Directory to write log files to.")),
make_option('--rawdata',
action='store_true',
dest='rawdata',
help=("Log the raw data and also write it to the "
"outfile if one is specified.")),
make_option('--outfile',
action='store',
dest='outfile',
help=("Write treeherder json data to file specified in"
" outfile, quick way to test data structures. Use"
" the string stdout to write to standard output.")),
)
def handle(self, *args, **options):
start = options.get("start")
restart = options.get("restart")
stop = options.get("stop")
daemon = options.get("daemon")
pidfile = options.get("pidfile")
durable = options.get("durable")
logdir = options.get("logdir")
rawdata = options.get("rawdata")
outfile = options.get("outfile")
tda = TreeherderDataAdapter(
durable=durable,
logdir=logdir,
rawdata=rawdata,
outfile=outfile
)
if start:
if daemon:
th_daemon = TreeherderPulseDaemon(
pidfile, treeherder_data_adapter=tda, stdin='/dev/null',
stdout='/dev/null', stderr='/dev/null'
)
th_daemon.start()
else:
#Run the pulse consumer without becoming
#a daemon
tda.start()
else:
th_daemon = TreeherderPulseDaemon(
pidfile, treeherder_data_adapter=tda, stdin='/dev/null',
stdout='/dev/null', stderr='/dev/null'
)
if restart:
th_daemon.restart()
elif stop:
th_daemon.stop()

Просмотреть файл

@ -70,6 +70,7 @@ INSTALLED_APPS = [
'django.contrib.admin',
'treeherder.model',
'treeherder.webapp',
'treeherder.pulse_consumer',
'south',
]