This commit is contained in:
mdoglio 2013-07-23 17:39:47 +01:00
Родитель ec40464ee5
Коммит 6ec69ebf51
20 изменённых файлов: 283 добавлений и 83 удалений

Просмотреть файл

@ -1,4 +1,5 @@
from django.core.urlresolvers import reverse
from django.template import Context, Template
import pytest
from webtest.app import TestApp
import simplejson as json
@ -44,9 +45,13 @@ def running_jobs():
@pytest.fixture
def completed_jobs(sample_data):
"""returns a list of pulse completed jobs"""
return json.loads(open(
base_dir = os.path.dirname(__file__)
content = open(
os.path.join(os.path.dirname(__file__),"finished.json")
).read())
).read()
t = Template(content)
c = Context({"base_dir":base_dir})
return json.loads(t.render(c))
@pytest.fixture

Просмотреть файл

@ -33,11 +33,11 @@
"name": "xpcshell",
"log_references": [
{
"url": "http://ftp.mozilla.org/pub/mozilla.org/mobile/tinderbox-builds/mozilla-inbound-android/1370454517/mozilla-inbound_tegra_android_test-xpcshell-bm22-tests1-tegra-build690.txt.gz",
"url": "file://{{base_dir}}/mozilla-inbound_panda_android_test-crashtest-bm43-tests1-panda-build591.txt.gz",
"name": "unittest"
}
],
"machine": "tegra-132",
"state": "TODO",
"state": "finished",
"product_name": "mobile"
}, "revision_hash": "0686a4d3fa477cb0415c9ca590177e4b03919b64"}

Двоичный файл не отображается.

Просмотреть файл

@ -16,12 +16,8 @@
"option_collection": {
"opt": true
},
"log_references": [
{
"url": null,
"name": "unittest"
}
],
"who": "unknown",
"log_references": [],
"job_guid": "808f4f1372895eda5ecd65f2371ebe67a2a9af9b",
"machine_platform": {
"platform": "2.2",

Просмотреть файл

@ -16,12 +16,8 @@
"option_collection": {
"opt": true
},
"log_references": [
{
"url": null,
"name": "unittest"
}
],
"who": "unknown",
"log_references": [],
"job_guid": "808f4f1372895eda5ecd65f2371ebe67a2a9af9b",
"machine_platform": {
"platform": "2.2",

Просмотреть файл

@ -1,5 +1,6 @@
from django.core.urlresolvers import reverse
from webtest import TestApp
from treeherder.model.derived import JobsModel
from treeherder.webapp.wsgi import application
import logging
@ -7,43 +8,59 @@ logging.basicConfig(filename="test.log", level=logging.DEBUG)
logger = logging.getLogger()
def test_pending_job_available(datasource_created, pending_jobs_loaded):
def test_pending_job_available(initial_data, datasource_created, pending_jobs_loaded):
webapp = TestApp(application)
resp = webapp.get(
reverse("jobs-list", kwargs={"project": "mozilla-inbound"})
)
jobs = resp.json
print jobs
assert len(jobs) ==1
assert jobs[0]['status'] == 'pending'
assert jobs[0]['state'] == 'pending'
def test_running_job_available(datasource_created, running_jobs_loaded):
def test_running_job_available(initial_data, datasource_created, running_jobs_loaded):
webapp = TestApp(application)
resp = webapp.get(
reverse("jobs-list", kwargs={"project": "mozilla-inbound"})
)
jobs = resp.json
print jobs
jm = JobsModel("mozilla-inbound")
assert len(jobs) ==1
assert jobs[0]['status'] == 'running'
assert jobs[0]['state'] == 'running'
def test_completed_job_available(datasource_created, completed_jobs_loaded):
def test_completed_job_available(initial_data, datasource_created, completed_jobs_loaded):
webapp = TestApp(application)
resp = webapp.get(
reverse("jobs-list", kwargs={"project": "mozilla-inbound"})
)
jobs = resp.json
print jobs
jm = JobsModel("mozilla-inbound")
assert len(jobs) ==1
assert len(jobs) == 1
assert jobs[0]['state'] == 'finished'
assert jobs[0]['status'] == 'finished'
def test_pending_stored_to_running_loaded(initial_data, datasource_created, pending_jobs_stored, running_jobs_loaded):
"""
tests a job transition from pending to running
given a pending job loaded in the objects store
if I store and load the same job with status running,
the latter is shown in the jobs endpoint
"""
webapp = TestApp(application)
resp = webapp.get(
reverse("jobs-list", kwargs={"project": "mozilla-inbound"})
)
jobs = resp.json
jm = JobsModel("mozilla-inbound")
assert len(jobs) == 1
assert jobs[0]['state'] == 'running'

Просмотреть файл

@ -2,7 +2,6 @@ import logging
import urllib2
import simplejson as json
from django.conf import settings
from . import buildbot
from .common import (get_revision_hash, get_job_guid,
@ -68,7 +67,8 @@ class TreeherderBuildapiAdapter(TreeherderDataAdapter):
),
}
treeherder_data['sources'].append({
'repository': branch,
# repository name is always lowercase
'repository': branch.lower(),
'revision': rev,
})
@ -92,17 +92,13 @@ class TreeherderBuildapiAdapter(TreeherderDataAdapter):
'architecture': platform_info['arch'],
'vm': platform_info['vm']
},
'who': 'unknown',
'option_collection': {
# build_type contains an option name, eg. PGO
buildbot.extract_build_type(job['buildername']): True
},
'log_references': [{
'url': None,
#using the jobtype as a name for now, the name allows us
#to have different log types with their own processing
'name': buildbot.extract_job_type(job['buildername'])
}]
'log_references': []
}
treeherder_data['job'] = job
@ -127,7 +123,8 @@ class TreeherderBuildapiAdapter(TreeherderDataAdapter):
),
}
treeherder_data['sources'].append({
'repository': branch,
# repository name is always lowercase
'repository': branch.lower(),
'revision': rev,
})
@ -154,17 +151,13 @@ class TreeherderBuildapiAdapter(TreeherderDataAdapter):
'architecture': platform_info['arch'],
'vm': platform_info['vm']
},
'who': 'unknown',
'option_collection': {
# build_type contains an option name, eg. PGO
buildbot.extract_build_type(job['buildername']): True
},
'log_references': [{
'url': None,
#using the jobtype as a name for now, the name allows us
#to have different log types with their own processing
'name': buildbot.extract_job_type(job['buildername'])
}]
'log_references': []
}
treeherder_data['job'] = job

Просмотреть файл

@ -1,6 +1,17 @@
import re
RESULT_DICT = {
0: "success",
1: "testfailed",
2: "busted",
3: "skipped",
4: "exception",
5: "retry",
6: "usercancel"
}
####
# The following variables were taken from util.py
#
@ -341,6 +352,14 @@ def extract_platform_info(source_string):
PLATFORMS_BUILDERNAME[platform_name]['attributes']
)
break
if not 'platform_name' in output:
output['platform_name'] = 'unknown'
output.update({
'os': 'unknown',
'os_platform': 'unknown',
'arch': 'unknown',
'vm': False
})
return output

Просмотреть файл

@ -8,6 +8,9 @@ import simplejson as json
from django.core.urlresolvers import reverse
from django.conf import settings
import logging
logging.basicConfig(filename="pulse_consumer.log", level=logging.DEBUG)
logger = logging.getLogger()
class JobDataError(ValueError):
pass

Просмотреть файл

@ -85,7 +85,8 @@ class Command(BaseCommand):
durable=durable,
logdir=logdir,
rawdata=rawdata,
outfile=outfile
outfile=outfile,
loaddata=True
)
if start:

Просмотреть файл

@ -359,16 +359,14 @@ class TreeherderPulseDataAdapter(PulseDataAdapter, TreeherderDataAdapter):
def adapt_data(self, data):
"""Adapts the PulseDataAdapter into the treeherder input data structure"""
treeherder_data = {
'sources': {},
'sources': [],
#Include branch so revision hash with the same revision is still
#unique across branches
'revision_hash': get_revision_hash(
[data['revision'], data['branch']]
),
)
}
treeherder_data['sources'] = []
####
#TODO: This is a temporary fix, this data will not be located
# in the sourceStamp in the pulse stream. It will likely
@ -385,7 +383,6 @@ class TreeherderPulseDataAdapter(PulseDataAdapter, TreeherderDataAdapter):
})
request_id = data['request_ids'][0]
job = {
'job_guid': get_job_guid(
#The keys in this dict are unicode but the values in
@ -396,10 +393,10 @@ class TreeherderPulseDataAdapter(PulseDataAdapter, TreeherderDataAdapter):
),
'name': data['test_name'],
'product_name': data['product'],
'state': 'TODO',
'state': 'finished',
#Do we need to map this to the strings in the sample structure?
'result': data['results'],
'result': buildbot.RESULT_DICT.get(int(data['results']),'unknown'),
'reason': data['reason'],
#There is both a who and blame that appear to be identical in the
@ -454,7 +451,7 @@ class TreeherderPulseDataAdapter(PulseDataAdapter, TreeherderDataAdapter):
TreeherderPulseDataAdapter,
self
).process_data(raw_data, message)
# load transformed data into the restful api
if data and self.loaddata:
self.load([data])

25
treeherder/etl/tasks.py Normal file
Просмотреть файл

@ -0,0 +1,25 @@
"""
This module contains
"""
from celery import task
from treeherder.etl.buildapi import TreeherderBuildapiAdapter
@task(name='fetch-buildapi-pending')
def fetch_buildapi_pending(url):
"""
Fetches the buildapi pending jobs api and load them to
the objectstore ingestion endpoint
"""
adapter = TreeherderBuildapiAdapter()
adapter.process_pending_jobs(url)
@task(name='fetch-buildapi-running')
def fetch_buildapi_pending(url):
"""
Fetches the buildapi running jobs api and load them to
the objectstore ingestion endpoint
"""
adapter = TreeherderBuildapiAdapter()
adapter.process_running_jobs(url)

Просмотреть файл

@ -37,9 +37,9 @@ class TreeherderModelBase(object):
procs_file_name = "{0}.json".format(contenttype)
if not contenttype in self.dhubs.keys():
self.dhubs[contenttype] = self.get_datasource(
contenttype).dhub(procs_file_name)
datasource = self.get_datasource(contenttype)
self.dhubs[contenttype] = datasource.dhub(procs_file_name)
return self.dhubs[contenttype]
def get_datasource(self, contenttype):
@ -92,6 +92,11 @@ class TreeherderModelBase(object):
procedure=proc,
)
def disconnect(self):
"""Iterate over and disconnect all data sources."""
for src in self.sources.itervalues():
src.disconnect()
class DatasetNotFoundError(ValueError):
pass

Просмотреть файл

@ -26,6 +26,16 @@ class JobsModel(TreeherderModelBase):
CONTENT_TYPES = [CT_JOBS, CT_OBJECTSTORE]
STATES = ["pending", "running", "completed", "coalesced"]
# this dict contains a matrix of state changes with the values defining
# if the change is allowed or not
STATE_CHANGES = {
'pending': {'coalesced': True, 'completed': True, 'running': True},
'running': {'coalesced': True, 'completed': True, 'pending': False},
'completed': {'coalesced': False, 'pending': False, 'running': False},
'coalesced': {'completed': False, 'pending': False, 'running': False}
}
@classmethod
def create(cls, project, host=None):
"""
@ -65,6 +75,16 @@ class JobsModel(TreeherderModelBase):
"""Return the job row for this ``job_id``"""
return self.get_row_by_id(self.CT_JOBS, "job", job_id)
def get_job_id_by_guid(self, job_guid):
"""Return the job id for this ``job_guid``"""
id_iter= self.get_jobs_dhub().execute(
proc="jobs.selects.get_job_id_by_guid",
placeholders=[job_guid],
debug_show=self.DEBUG,
return_type='iter'
)
return id_iter.get_column_data('id')
def get_job_list(self, page, limit):
"""
Retrieve a list of jobs.
@ -233,7 +253,6 @@ class JobsModel(TreeherderModelBase):
}
"""
result_set_id = self._set_result_set(data["revision_hash"])
rdm = self.refdata_model
@ -259,10 +278,13 @@ class JobsModel(TreeherderModelBase):
job["machine_platform"]["architecture"],
)
machine_id = rdm.get_or_create_machine(
job["machine"],
timestamp=long(job["end_timestamp"]),
)
if "machine" in job:
machine_id = rdm.get_or_create_machine(
job["machine"],
timestamp=long(job["end_timestamp"]),
)
else:
machine_id = None
option_collection_hash = rdm.get_or_create_option_collection(
[k for k, v in job["option_collection"].items() if v],
@ -274,9 +296,12 @@ class JobsModel(TreeherderModelBase):
job_name, job_group,
)
product_id = rdm.get_or_create_product(
job["product_name"],
)
if "product_name" in job:
product_id = rdm.get_or_create_product(
job["product_name"],
)
else:
product_id = None
job_id = self._set_job_data(
job,
@ -356,9 +381,9 @@ class JobsModel(TreeherderModelBase):
[
src["revision"],
author,
src["comments"],
long(src["push_timestamp"]),
long(src["commit_timestamp"]),
src.get("comments",""),
long(src.get("push_timestamp",0)),
long(src.get("commit_timestamp", 0)),
repository_id,
]
)
@ -394,19 +419,21 @@ class JobsModel(TreeherderModelBase):
job_coalesced_to_guid = ""
who = data["who"]
reason = data["reason"]
result = int(data["result"])
# TODO: fix who and reason for pending/running jobs
who = data.get("who","unknown")
reason = data.get("reason","unknown")
result = data.get("result","unknown")
state = data["state"]
submit_timestamp = long(data["submit_timestamp"])
start_timestamp = long(data["start_timestamp"])
end_timestamp = long(data["end_timestamp"])
start_timestamp = long(data.get("start_timestamp",0)) or None
end_timestamp = long(data.get("end_timestamp",0)) or None
except ValueError as e:
raise JobDataError(e.message)
job_id = self._insert_data_and_get_id(
'set_job_data',
# try to insert a new row
self._insert_data(
'create_job_data',
[
job_guid,
job_coalesced_to_guid,
@ -424,11 +451,35 @@ class JobsModel(TreeherderModelBase):
submit_timestamp,
start_timestamp,
end_timestamp,
job_guid
]
)
job_id = self.get_job_id_by_guid(job_guid)
self._update_data(
'update_job_data',
[
job_coalesced_to_guid,
result_set_id,
machine_id,
option_collection_hash,
job_type_id,
product_id,
who,
reason,
result,
state,
start_timestamp,
end_timestamp,
job_id
]
)
return job_id
def _insert_job_log_url(self, job_id, name, url):
"""Insert job log data"""
@ -458,6 +509,15 @@ class JobsModel(TreeherderModelBase):
executemany=executemany,
)
def _update_data(self, statement, placeholders):
"""Update a set of data using the specified proc ``statement``."""
self.get_jobs_dhub().execute(
proc='jobs.updates.' + statement,
debug_show=self.DEBUG,
placeholders=placeholders,
executemany=False,
)
def _insert_data_and_get_id(self, statement, placeholders):
"""Execute given insert statement, returning inserted ID."""
self._insert_data(statement, placeholders)
@ -471,7 +531,7 @@ class JobsModel(TreeherderModelBase):
return_type='iter',
).get_column_data('id')
def process_objects(self, loadlimit):
def process_objects(self, loadlimit, raise_errors=False):
"""Processes JSON blobs from the objectstore into jobs schema."""
rows = self.claim_objects(loadlimit)
@ -484,12 +544,16 @@ class JobsModel(TreeherderModelBase):
revision_hash = data["revision_hash"]
except JobDataError as e:
self.mark_object_error(row_id, str(e))
if raise_errors:
raise e
except Exception as e:
self.mark_object_error(
row_id,
u"Unknown error: {0}: {1}".format(
e.__class__.__name__, unicode(e))
)
row_id,
u"Unknown error: {0}: {1}".format(
e.__class__.__name__, unicode(e))
)
if raise_errors:
raise e
else:
self.mark_object_complete(row_id, revision_hash)

Просмотреть файл

@ -557,5 +557,31 @@
"repository_group": 1,
"description": ""
}
},
{
"pk": 44,
"model": "model.repository",
"fields": {
"dvcs_type": "hg",
"name": "mozilla-b2g18_v1_1_0_hd",
"url": "https://hg.mozilla.org/releases/mozilla-b2g18_v1_1_0_hd/",
"active_status": "active",
"codebase": "gecko",
"repository_group": 2,
"description": ""
}
},
{
"pk": 45,
"model": "model.repository",
"fields": {
"dvcs_type": "hg",
"name": "unknown",
"url": "",
"active_status": "active",
"codebase": "",
"repository_group": 1,
"description": ""
}
}
]

Просмотреть файл

@ -1,11 +1,25 @@
from optparse import make_option
from django.core.management.base import BaseCommand
from treeherder.model.models import Datasource, Repository
class Command(BaseCommand):
help = ("Populate the datasource table and"
"create the connected databases")
option_list = BaseCommand.option_list + (
make_option('--host',
action='store',
dest='host',
default='localhost',
help='Host to associate the datasource to'),
make_option('--readonly-host',
action='store',
dest='readonly_host',
default='localhost',
help='Readonly host to associate the datasource to'),
)
def handle(self, *args, **options):
projects = Repository.objects.all().values_list('name',flat=True)
for project in projects:
@ -13,5 +27,8 @@ class Command(BaseCommand):
Datasource.objects.get_or_create(
contenttype=contenttype,
dataset=1,
project=project
project=project,
host=options['host'],
read_only_host=options['readonly_host']
)
Datasource.reset_cache()

Просмотреть файл

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import uuid
import subprocess
import os
import MySQLdb
from datasource.bases.BaseHub import BaseHub
from datasource.hubs.MySQL import MySQL
@ -10,6 +11,7 @@ from django.conf import settings
from django.core.cache import cache
from django.db import models
from django.db.models import Max
from warnings import filterwarnings, resetwarnings
from treeherder import path
@ -360,9 +362,11 @@ class Datasource(models.Model):
user=DB_USER,
passwd=DB_PASS,
)
filterwarnings('ignore', category=MySQLdb.Warning)
cur = conn.cursor()
cur.execute("CREATE DATABASE {0}".format(self.name))
cur.execute("CREATE DATABASE IF NOT EXISTS {0}".format(self.name))
conn.close()
resetwarnings()
# MySQLdb provides no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.

Просмотреть файл

@ -1,6 +1,6 @@
{
"inserts":{
"set_job_data":{
"create_job_data":{
"sql":"INSERT INTO `job` (
`job_guid`,
@ -19,7 +19,13 @@
`submit_timestamp`,
`start_timestamp`,
`end_timestamp`)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
SELECT ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?
FROM DUAL WHERE NOT EXISTS (
SELECT `job_guid`
FROM `job`
WHERE `job_guid` = ?
)",
"host":"master_host"
},
@ -86,6 +92,24 @@
WHERE `id` = ?",
"host":"master_host"
},
"update_job_data":{
"sql":"UPDATE `job`
SET
`job_coalesced_to_guid` = ?,
`result_set_id` = ?,
`machine_id` = ?,
`option_collection_hash` = ?,
`job_type_id` = ?,
`product_id` = ?,
`who` = ?,
`reason` = ?,
`result` = ?,
`state` = ?,
`start_timestamp` = ?,
`end_timestamp` = ?
WHERE `id` = ?",
"host":"master_host"
}
},
"selects":{
@ -105,6 +129,12 @@
WHERE job_id = ?",
"host":"read_host"
},
"get_job_id_by_guid":{
"sql":"SELECT `id`
FROM `job`
WHERE `job_guid` = ?",
"host": "read_host"
}
}
}

Просмотреть файл

@ -98,10 +98,10 @@ CREATE TABLE `job` (
`result_set_id` bigint(20) unsigned NOT NULL,
`build_platform_id` int(10) unsigned NOT NULL,
`machine_platform_id` int(10) unsigned NOT NULL,
`machine_id` int(10) unsigned NOT NULL,
`machine_id` int(10) unsigned DEFAULT NULL,
`option_collection_hash` varchar(64) COLLATE utf8_bin DEFAULT NULL,
`job_type_id` int(10) unsigned NOT NULL,
`product_id` int(10) unsigned NOT NULL,
`product_id` int(10) unsigned DEFAULT NULL,
`who` varchar(50) COLLATE utf8_bin NOT NULL,
`reason` varchar(125) COLLATE utf8_bin NOT NULL,
`result` varchar(25) COLLATE utf8_bin DEFAULT NULL,

Просмотреть файл

@ -137,6 +137,8 @@ CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
# rest-framework settings
REST_FRAMEWORK = {
'DEFAULT_PARSER_CLASSES': (