This commit is contained in:
Jonathan Eads 2013-04-26 14:34:20 -07:00
Родитель eaccfecd00 1f4dae6807
Коммит 1de79d9a2a
17 изменённых файлов: 1395 добавлений и 57 удалений

0
manage.py Normal file → Executable file
Просмотреть файл

Просмотреть файл

@ -4,14 +4,16 @@ import sys
from django.core.management import call_command
import pytest
from django.conf import settings
from django.core.management import call_command
def pytest_sessionstart(session):
"""
Set up the test environment.
Set up the test environment.
Set DJANGO_SETTINGS_MODULE and sets up a test database.
Set DJANGO_SETTINGS_MODULE and sets up a test database.
"""
"""
sys.path.append(dirname(dirname(__file__)))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "treeherder.settings")
from django.conf import settings
@ -43,14 +45,14 @@ def pytest_sessionfinish(session):
def pytest_runtest_setup(item):
"""
Per-test setup.
Per-test setup.
Start a transaction and disable transaction methods for the duration of the
test. The transaction will be rolled back after the test. This prevents any
database changes made to Django ORM models from persisting between tests,
providing test isolation.
Start a transaction and disable transaction methods for the duration of the
test. The transaction will be rolled back after the test. This prevents any
database changes made to Django ORM models from persisting between tests,
providing test isolation.
"""
"""
from django.test.testcases import disable_transaction_methods
from django.db import transaction
@ -63,11 +65,12 @@ providing test isolation.
def pytest_runtest_teardown(item):
"""
Per-test teardown.
Per-test teardown.
Roll back the Django ORM transaction and delete all the dbs created between tests
Roll back the Django ORM transaction and delete all the dbs created
between tests
"""
"""
from django.test.testcases import restore_transaction_methods
from django.db import transaction
from treeherder.model.models import Datasource
@ -93,11 +96,57 @@ def increment_cache_key_prefix():
cache.set(prefix_counter_cache_key, key_prefix_counter)
cache.key_prefix = "t{0}".format(key_prefix_counter)
@pytest.fixture(scope='session')
def sample_data():
"""Returns a SampleData() object"""
from sampledata import SampleData
return SampleData()
@pytest.fixture()
def jm():
""" Give a test access to a JobsModel instance. """
from django.conf import settings
from treeherder.model.derived.jobs import JobsModel
# return JobsModel.create(settings.DATABASES["default"]["TEST_NAME"])
from treeherder.model.models import Datasource
jds = Datasource.objects.create(
project=settings.DATABASES["default"]["TEST_NAME"],
dataset=1,
contenttype="jobs",
host=settings.DATABASES['default']['HOST'],
)
objstore = Datasource.objects.create(
project=settings.DATABASES["default"]["TEST_NAME"],
dataset=1,
contenttype="objectstore",
host=settings.DATABASES['default']['HOST'],
)
model = JobsModel(settings.DATABASES["default"]["TEST_NAME"])
# patch in additional test-only procs on the datasources
test_proc_file = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"objectstore_test.json",
)
add_test_procs_file(
model.get_dhub("objectstore"),
objstore.key,
"objectstore_test.json",
)
add_test_procs_file(
model.get_dhub("jobs"),
jds.key,
"jobs_test.json",
)
return model
def add_test_procs_file(dhub, key, filename):
"""Add an extra procs file in for testing purposes."""
test_proc_file = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
filename,
)
del dhub.procs[key]
proclist = dhub.data_sources[key]["procs"]
if not test_proc_file in proclist:
proclist.append(test_proc_file)
dhub.data_sources[key]["procs"] = proclist
dhub.load_procs(key)
@pytest.fixture()
def jobs_ds():
@ -121,3 +170,9 @@ def objectstore_ds():
contenttype="objectstore",
host="localhost",
)
@pytest.fixture(scope='session')
def sample_data():
"""Returns a SampleData() object"""
from sampledata import SampleData
return SampleData()

12
tests/jobs_test.json Normal file
Просмотреть файл

@ -0,0 +1,12 @@
{
"selects": {
"job": {
"sql": "SELECT * FROM `job` WHERE id = ?",
"host": "master_host"
},
"jobs": {
"sql": "SELECT * FROM `job`",
"host": "master_host"
}
}
}

Просмотреть файл

@ -147,22 +147,8 @@ def test_refdata_manager(refdata, params):
for k, v in params['expected'][i].items():
assert row[k] == v
# some tests just don't fit into the standard layout
# def test_repository_version_creation(refdata, repository_id):
# id = refdata.get_or_create_repository_version(
# repository_id,
# 'v1',
# 1366290144.07455)
# row_data = refdata.dhub.execute(
# proc=params[refdata_test.selects.test_repository_version],
# placeholders=[id]
# )[0]
# assert row[repository_id] == 1
# assert row[version] == 'v1'
# assert row[version_timestamp] == 1366290144
# assert row[active_status] == 'active'
assert row_data["symbol"] == 'fill me'
assert row_data["name"] == 'mygroup'
assert row_data["description"] == 'fill me'
assert row_data["active_status"] == 'active'

Просмотреть файл

@ -0,0 +1,227 @@
"""
Functions for flexible generation of sample input job JSON.
"""
import json
import os
import time
from datetime import timedelta
from treeherder.model import utils
def ref_data_json():
"""Return reference data json structure"""
filename = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"ref_data.json",
)
json_data = ""
with open(filename) as f:
json_data = f.read()
return json_data
def job_json(**kwargs):
return json.dumps(job_data(**kwargs))
def job_data(**kwargs):
jobs_obj = {
"sources": [
{
"commit_timestamp": 1365732271,
"push_timestamp": 1365732271,
"comments": "Bug 854583 - Use _pointer_ instead of...",
"repository": "mozilla-aurora",
"revision": "c91ee0e8a980"
}
],
"revision_hash": "24fd64b8251fac5cf60b54a915bffa7e51f636b5",
"jobs": [{
'build_platform': build_platform(**kwargs.pop("build_platform", {})),
'submit_timestamp': kwargs.pop("submit_timestamp", submit_timestamp()),
'start_timestamp': kwargs.pop("start_timestamp", start_timestamp()),
'name': kwargs.pop("name", u'mochitest-5'),
'option_collection': option_collection(
**kwargs.pop("build_platform", {})),
'log_references': log_references(kwargs.pop("log_references", [])),
'who': kwargs.pop("who", u'sendchange-unittest'),
'reason': kwargs.pop("reason", u'scheduler'),
'artifact': kwargs.pop("artifact", {}),
'machine_platform': machine_platform(
**kwargs.pop("machine_platform", {})),
'machine': kwargs.pop("machine", u'talos-r3-xp-088'),
'state': kwargs.pop("state", 'TODO'),
'result': kwargs.pop("result", 0),
'job_guid': kwargs.pop(
"job_guid", "f3e3a9e6526881c39a3b2b6ff98510f213b3d4ed"),
'product_name': kwargs.pop("product_name", u'firefox'),
'end_timestamp': kwargs.pop("end_timestamp", end_timestamp()),
}]
}
# defaults.update(kwargs)
return jobs_obj
def to_seconds(td):
return (td.microseconds +
(td.seconds + td.days * 24 * 3600) * 10 ** 6
) / 10 ** 6
def get_timestamp_days_ago(days_ago):
now = int(time.time())
return now - to_seconds(timedelta(int(days_ago)))
def submit_timestamp():
"""3 days ago"""
return get_timestamp_days_ago(3)
def start_timestamp():
"""2 days ago"""
return get_timestamp_days_ago(2)
def end_timestamp():
"""1 day ago"""
return get_timestamp_days_ago(1)
def option_collection(**kwargs):
"""
Return a sample data structure, with default values.
"""
defaults = {
'debug': True
}
defaults.update(kwargs)
return defaults
def log_references(log_refs=None):
if not log_refs:
log_refs = [
{
"url": "http://ftp.mozilla.org/pub/...",
"name": "unittest"
}
]
return log_refs
def build_platform(**kwargs):
"""
Return a sample data structure, with default values.
"""
defaults = {
'platform': 'WINNT5.1',
'os_name': 'win',
'architecture': 'x86',
'vm': False
}
defaults.update(kwargs)
return defaults
def machine_platform(**kwargs):
"""
Return a sample data structure, with default values.
"""
defaults = {
'platform': 'WINNT5.1',
'os_name': 'win',
'architecture': 'x86',
'vm': False
}
defaults.update(kwargs)
return defaults
def create_date_based_data(jm, monkeypatch, dates=None):
"""Store and process some good and some error blobs on specified dates"""
if not dates:
dates = [
get_timestamp_days_ago(5),
get_timestamp_days_ago(4),
get_timestamp_days_ago(3),
]
# 5 days ago
mocknow = dates[0]
def mock_now():
return mocknow
monkeypatch.setattr(utils, 'get_now_timestamp', mock_now)
# store the error blob
blob = job_json(
testrun={"date": dates[0]},
test_build={"name": "one"},
)
badblob = "{0}fooo".format(blob)
jm.store_test_data(badblob, error="badness")
# 4 days ago
mocknow = dates[1]
# store the good blobs
blobs = [
job_json(
testrun={"date": dates[1]},
name="one",
),
job_json(
testrun={"date": dates[1]},
name="three",
),
]
# 3 days ago
mocknow = dates[2]
# store another error blob
blob = job_json(
testrun={"date": dates[2]},
name="four",
)
badblob = "{0}fooo".format(blob)
jm.store_test_data(badblob, error="Malformed JSON")
for blob in blobs:
jm.store_test_data(blob)
# now process all of them
jm.process_objects(4)

Просмотреть файл

@ -0,0 +1,174 @@
import json
from .sample_data_generator import job_json
def test_unicode(jm):
"""Unicode representation of a ``JobModel`` is the project name."""
assert unicode(jm) == unicode(jm.project)
def xtest_disconnect(jm):
"""test that your model disconnects"""
# establish the connection to jobs.
jm._get_last_insert_id()
# establish the connection to objectstore
jm.retrieve_job_data(limit=1)
jm.disconnect()
for src in jm.sources.itervalues():
assert src.dhub.connection["master_host"]["con_obj"].open is False
def test_claim_objects(jm):
"""``claim_objects`` claims & returns unclaimed rows up to a limit."""
blobs = [
job_json(testrun={"date": "1330454755"}),
job_json(testrun={"date": "1330454756"}),
job_json(testrun={"date": "1330454757"}),
]
# import time
# time.sleep(30)
for blob in blobs:
jm.store_job_data(blob)
rows1 = jm.claim_objects(2)
# a separate worker with a separate connection
from treeherder.model.derived.jobs import JobsModel
jm2 = JobsModel(jm.project)
rows2 = jm2.claim_objects(2)
loading_rows = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.counts.loading")[0]["loading_count"]
assert len(rows1) == 2
# second worker asked for two rows but only got one that was left
assert len(rows2) == 1
# all three blobs were fetched by one of the workers
assert set([r["json_blob"] for r in rows1 + rows2]) == set(blobs)
# the blobs are all marked as "loading" in the database
assert loading_rows == 3
def test_mark_object_complete(jm):
"""Marks claimed row complete and records run id."""
jm.store_job_data(job_json())
row_id = jm.claim_objects(1)[0]["id"]
job_id = 7 # any arbitrary number; no cross-db constraint checks
jm.mark_object_complete(row_id, job_id)
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0]
assert row_data["job_id"] == job_id
assert row_data["processed_state"] == "complete"
def test_process_objects(jm):
"""Claims and processes a chunk of unprocessed JSON jobs data blobs."""
# Load some rows into the objectstore
blobs = [
job_json(submit_timestamp="1330454755"),
job_json(submit_timestamp="1330454756"),
job_json(submit_timestamp="1330454757"),
]
for blob in blobs:
jm.store_job_data(blob)
# just process two rows
jm.process_objects(2)
test_run_rows = jm.get_dhub(jm.CT_JOBS).execute(
proc="jobs_test.selects.jobs")
date_set = set([r['submit_timestamp'] for r in test_run_rows])
expected_dates = set([1330454755, 1330454756, 1330454757])
complete_count = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.counts.complete")[0]["complete_count"]
loading_count = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.counts.loading")[0]["loading_count"]
assert complete_count == 2
assert loading_count == 0
assert date_set.issubset(expected_dates)
assert len(date_set) == 2
def test_process_objects_invalid_json(jm):
"""process_objects fail for invalid json"""
jm.store_job_data("invalid json")
row_id = jm._get_last_insert_id("objectstore")
jm.process_objects(1)
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0]
expected_error = "Malformed JSON: No JSON object could be decoded"
assert row_data['error'] == 'Y'
assert row_data['error_msg'] == expected_error
assert row_data['processed_state'] == 'ready'
def test_process_objects_unknown_error(jm, monkeypatch):
"""process_objects fail for unknown reason"""
jm.store_job_data("{}")
row_id = jm._get_last_insert_id("objectstore")
# force an unexpected error to occur
def raise_error(*args, **kwargs):
raise ValueError("Something blew up!")
monkeypatch.setattr(jm, "load_job_data", raise_error)
jm.process_objects(1)
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0]
expected_error_msg = "Unknown error: ValueError: Something blew up!"
assert row_data['error'] == 'Y'
assert row_data['error_msg'] == expected_error_msg
assert row_data['processed_state'] == 'ready'
def test_ingest_sample_data(jm, sample_data):
"""Process all job structures in the job_data.txt file"""
print "start test_ingest_sample_data"
for blob in sample_data.job_data[:250]:
# print blob
jm.store_job_data(json.dumps(blob))
#data_length = len(sample_data.job_data)
data_length = 250
# process 10 rows at a time
remaining = data_length
while remaining:
jm.process_objects(10)
remaining -= 10
job_rows = jm.get_jobs_dhub().execute(
proc="jobs_test.selects.jobs")
complete_count = jm.get_os_dhub().execute(
proc="objectstore_test.counts.complete")[0]["complete_count"]
loading_count = jm.get_os_dhub().execute(
proc="objectstore_test.counts.loading")[0]["loading_count"]
print "start test_ingest_sample_data"
assert complete_count == data_length
assert loading_count == 0
assert len(job_rows) == data_length

Просмотреть файл

@ -0,0 +1,28 @@
{
"counts": {
"loading": {
"sql": "SELECT COUNT(`id`) AS loading_count
FROM `objectstore`
WHERE processed_state = 'loading'
",
"host": "master_host"
},
"complete": {
"sql": "SELECT COUNT(`id`) AS complete_count
FROM `objectstore`
WHERE processed_state = 'complete'
",
"host": "master_host"
}
},
"selects": {
"all": {
"sql": "SELECT * FROM `objectstore`",
"host": "master_host"
},
"row": {
"sql": "SELECT * FROM `objectstore` WHERE id = ?",
"host": "master_host"
}
}
}

Просмотреть файл

@ -5,29 +5,69 @@ access.
"""
from django.conf import settings
from treeherder.model.sql.datasource import SQLDataSource
from treeherder.model.models import Datasource
from treeherder.model.derived.refdata import RefDataManager
class TreeherderModelBase(object):
"""Base model class for all TreeHerder models"""
"""
Base model class for all derived models
"""
def __init__(self, project):
"""Encapsulate the dataset access for this ``project`` """
self.project = project
self.sources = {}
for ct in self.CONTENT_TYPES:
self.sources[ct] = SQLDataSource(project, ct)
self.dhubs = {}
self.DEBUG = settings.DEBUG
self.refdata_model = RefDataManager()
def __unicode__(self):
"""Unicode representation is project name."""
return self.project
def disconnect(self):
"""Iterate over and disconnect all data sources."""
for src in self.sources.itervalues():
src.disconnect()
def get_dhub(self, contenttype, procs_file_name=None):
"""
The configured datahub for the given contenttype
def get_project_cache_key(self, str_data):
return "{0}_{1}".format(self.project, str_data)
"""
if not procs_file_name:
procs_file_name = "{0}.json".format(contenttype)
if not contenttype in self.dhubs.keys():
self.dhubs[contenttype] = self.get_datasource(
contenttype).dhub(procs_file_name)
return self.dhubs[contenttype]
def get_datasource(self, contenttype):
"""The datasource for this contenttype of the project."""
if not contenttype in self.sources.keys():
self.sources[contenttype] = self._get_datasource(contenttype)
return self.sources[contenttype]
def _get_datasource(self, contenttype):
"""Find the datasource for this contenttype in the cache."""
candidate_sources = []
for source in Datasource.objects.cached():
if (source.project == self.project and
source.contenttype == contenttype):
candidate_sources.append(source)
if not candidate_sources:
raise DatasetNotFoundError(
"No dataset found for project %r, contenttype %r."
% (self.project, contenttype)
)
candidate_sources.sort(key=lambda s: s.dataset, reverse=True)
return candidate_sources[0]
class DatasetNotFoundError(ValueError):
pass

Просмотреть файл

@ -0,0 +1,518 @@
import json
import MySQLdb
from warnings import filterwarnings, resetwarnings
from django.conf import settings
from treeherder.model.models import Datasource
from treeherder.model import utils
from .refdata import RefDataManager
from .base import TreeherderModelBase
class JobsModel(TreeherderModelBase):
"""
Represent a job repository with objectstore
content-types:
jobs
objectstore
"""
# content types that every project will have
CT_JOBS = "jobs"
CT_OBJECTSTORE = "objectstore"
CONTENT_TYPES = [CT_JOBS, CT_OBJECTSTORE]
@classmethod
def create(cls, project, hosts=None, types=None):
"""
Create all the datasource tables for this project.
"""
for ct in [cls.CT_JOBS, cls.CT_OBJECTSTORE]:
dataset = Datasource.get_latest_dataset(project, ct)
source = Datasource(
project=project,
contenttype=ct,
dataset=dataset or 1,
)
source.save()
return cls(project=project)
def get_jobs_dhub(self):
"""Get the dhub for jobs"""
return self.get_dhub(self.CT_JOBS)
def get_os_dhub(self):
"""Get the dhub for the objectstore"""
return self.get_dhub(self.CT_OBJECTSTORE)
##################
#
# Objectstore functionality
#
##################
def get_oauth_consumer_secret(self, key):
"""Consumer secret for oauth"""
ds = self.get_datasource(self.CT_OBJECTSTORE)
secret = ds.get_oauth_consumer_secret(key)
return secret
def _get_last_insert_id(self, contenttype=None):
"""Return last-inserted ID."""
if not contenttype:
contenttype = self.CT_JOBS
return self.get_dhub(contenttype).execute(
proc='generic.selects.get_last_insert_id',
debug_show=self.DEBUG,
return_type='iter',
).get_column_data('id')
def store_job_data(self, json_data, error=None):
"""Write the JSON to the objectstore to be queued for processing."""
loaded_timestamp = utils.get_now_timestamp()
error = "N" if error is None else "Y"
error_msg = error or ""
self.get_os_dhub().execute(
proc='objectstore.inserts.store_json',
placeholders=[loaded_timestamp, json_data, error, error_msg],
debug_show=self.DEBUG
)
return self._get_last_insert_id()
def retrieve_job_data(self, limit):
"""
Retrieve JSON blobs from the objectstore.
Does not claim rows for processing; should not be used for actually
processing JSON blobs into jobs schema.
Used only by the `transfer_data` management command.
"""
proc = "objectstore.selects.get_unprocessed"
json_blobs = self.get_os_dhub().execute(
proc=proc,
placeholders=[limit],
debug_show=self.DEBUG,
return_type='tuple'
)
return json_blobs
def load_job_data(self, data):
"""
Load JobData instance into jobs db, return job_id.
@@@: should I return the job_guid instead?
Example:
{
"sources": [
{
"commit_timestamp": 1365732271,
"push_timestamp": 1365732271,
"comments": "Bug 854583 - Use _pointer_ instead of...",
"repository": "mozilla-aurora",
"revision": "c91ee0e8a980"
}
],
"revision_hash": "24fd64b8251fac5cf60b54a915bffa7e51f636b5",
"jobs": [
{
"build_platform": {
"platform": "Ubuntu VM 12.04",
"os_name": "linux",
"architecture": "x86_64",
"vm": true
},
"submit_timestamp": 1365732271,
"start_timestamp": "20130411165317",
"name": "xpcshell",
"option_collection": {
"opt": true
},
"log_references": [
{
"url": "http://ftp.mozilla.org/pub/...",
"name": "unittest"
}
],
"who": "sendchange-unittest",
"reason": "scheduler",
artifact:{
type:" json | img | ...",
name:"",
log_urls:[
]
blob:""
},
"machine_platform": {
"platform": "Ubuntu VM 12.04",
"os_name": "linux",
"architecture": "x86_64",
"vm": true
},
"machine": "tst-linux64-ec2-314",
"state": "TODO",
"result": 0,
"job_guid": "d19375ce775f0dc166de01daa5d2e8a73a8e8ebf",
"product_name": "firefox",
"end_timestamp": "1365733932"
}
]
}
"""
# @@@ sources
# Get/Set reference info, all inserts use ON DUPLICATE KEY
rdm = self.refdata_model
job_id = -1
for job in data["jobs"]:
build_platform_id = rdm.get_or_create_build_platform(
job["build_platform"]["os_name"],
job["build_platform"]["platform"],
job["build_platform"]["architecture"],
)
machine_platform_id = rdm.get_or_create_machine_platform(
job["machine_platform"]["os_name"],
job["machine_platform"]["platform"],
job["machine_platform"]["architecture"],
)
machine_id = rdm.get_or_create_machine(
job["machine"],
timestamp=max([
job["start_timestamp"],
job["submit_timestamp"],
job["end_timestamp"],
])
)
option_collection_id = rdm.get_or_create_option_collection(
[k for k, v in job["option_collection"].items() if v],
)
job_group, sep, job_name = job["name"].partition("-")
job_type_id = rdm.get_or_create_job_type(
job_name, job_group,
)
product_id = rdm.get_or_create_product(
job["product_name"],
)
result_set_id = self._set_result_set(data["revision_hash"])
job_id = self._set_job_data(
job,
result_set_id,
build_platform_id,
machine_platform_id,
machine_id,
option_collection_id,
job_type_id,
product_id,
)
for log_ref in job["log_references"]:
self._insert_job_log_url(
job_id,
log_ref["name"],
log_ref["url"]
)
try:
artifact = job["artifact"]
self._insert_job_artifact(
job_id,
artifact["name"],
artifact["type"],
artifact["blob"],
)
for log_ref in artifact["log_urls"]:
self._insert_job_log_url(
job_id,
log_ref["name"],
log_ref["url"]
)
except KeyError:
# it is ok to have an empty or missing artifact
pass
return job_id
def _set_result_set(self, revision_hash):
"""Set result set revision hash"""
result_set_id = self._insert_data_and_get_id(
'set_result_set',
[
revision_hash,
]
)
return result_set_id
def _set_job_data(self, data, result_set_id, build_platform_id,
machine_platform_id, machine_id, option_collection_id,
job_type_id, product_id):
"""Inserts job data into the db and returns job id."""
try:
job_guid = data["job_guid"]
# @@@ jeads: not sure about job_coalesced_to_guid.
# According to the sample data, this could be:
#
# coalesced: [
# "job_guid",
# ...
# ]
#
# I think I need an
# example of this in job_data.txt
job_coalesced_to_guid = ""
who = data["who"]
reason = data["reason"]
result = int(data["result"])
state = data["state"]
submit_timestamp = data["submit_timestamp"]
start_timestamp = data["start_timestamp"]
end_timestamp = data["end_timestamp"]
except ValueError as e:
e.__class__ = JobDataError
raise
job_id = self._insert_data_and_get_id(
'set_job_data',
[
job_guid,
job_coalesced_to_guid,
result_set_id,
build_platform_id,
machine_platform_id,
machine_id,
option_collection_id,
job_type_id,
product_id,
who,
reason,
result,
state,
submit_timestamp,
start_timestamp,
end_timestamp,
]
)
return job_id
def _insert_job_log_url(self, job_id, name, url):
"""Insert job log data"""
self._insert_data(
'set_job_log_url',
[
job_id, name, url
]
)
def _insert_job_artifact(self, job_id, name, artifact_type, blob):
"""Insert job artifact """
self._insert_data(
'set_job_artifact',
[
job_id, name, artifact_type, blob
]
)
def _insert_data(self, statement, placeholders, executemany=False):
"""Insert a set of data using the specified proc ``statement``."""
self.get_jobs_dhub().execute(
proc='jobs.inserts.' + statement,
debug_show=self.DEBUG,
placeholders=placeholders,
executemany=executemany,
)
def _insert_data_and_get_id(self, statement, placeholders):
"""Execute given insert statement, returning inserted ID."""
self._insert_data(statement, placeholders)
return self._get_last_insert_id()
def _get_last_insert_id(self, source="jobs"):
"""Return last-inserted ID."""
return self.get_dhub(source).execute(
proc='generic.selects.get_last_insert_id',
debug_show=self.DEBUG,
return_type='iter',
).get_column_data('id')
def process_objects(self, loadlimit):
"""Processes JSON blobs from the objectstore into jobs schema."""
rows = self.claim_objects(loadlimit)
job_ids_loaded = []
for row in rows:
row_id = int(row['id'])
try:
data = JobData.from_json(row['json_blob'])
job_id = self.load_job_data(data)
except JobDataError as e:
self.mark_object_error(row_id, str(e))
except Exception as e:
self.mark_object_error(
row_id,
u"Unknown error: {0}: {1}".format(
e.__class__.__name__, unicode(e))
)
else:
self.mark_object_complete(row_id, job_id)
job_ids_loaded.append(job_id)
return job_ids_loaded
def claim_objects(self, limit):
"""
Claim & return up to ``limit`` unprocessed blobs from the objectstore.
Returns a tuple of dictionaries with "json_blob" and "id" keys.
May return more than ``limit`` rows if there are existing orphaned rows
that were claimed by an earlier connection with the same connection ID
but never completed.
"""
proc_mark = 'objectstore.updates.mark_loading'
proc_get = 'objectstore.selects.get_claimed'
# Note: There is a bug in MySQL http://bugs.mysql.com/bug.php?id=42415
# that causes the following warning to be generated in the production
# environment:
#
# _mysql_exceptions.Warning: Unsafe statement written to the binary
# log using statement format since BINLOG_FORMAT = STATEMENT. The
# statement is unsafe because it uses a LIMIT clause. This is
# unsafe because the set of rows included cannot be predicted.
#
# I have been unable to generate the warning in the development
# environment because the warning is specific to the master/slave
# replication environment which only exists in production.In the
# production environment the generation of this warning is causing
# the program to exit.
#
# The mark_loading SQL statement does execute an UPDATE/LIMIT but now
# implements an "ORDER BY id" clause making the UPDATE
# deterministic/safe. I've been unsuccessfull capturing the specific
# warning generated without redirecting program flow control. To
# ressolve the problem in production, we're disabling MySQLdb.Warnings
# before executing mark_loading and then re-enabling warnings
# immediately after. If this bug is ever fixed in mysql this handling
# should be removed. Holy Hackery! -Jeads
filterwarnings('ignore', category=MySQLdb.Warning)
# Note: this claims rows for processing. Failure to call load_job_data
# on this data will result in some json blobs being stuck in limbo
# until another worker comes along with the same connection ID.
self.get_os_dhub().execute(
proc=proc_mark,
placeholders=[limit],
debug_show=self.DEBUG,
)
resetwarnings()
# Return all JSON blobs claimed by this connection ID (could possibly
# include orphaned rows from a previous run).
json_blobs = self.get_os_dhub().execute(
proc=proc_get,
debug_show=self.DEBUG,
return_type='tuple'
)
return json_blobs
def mark_object_complete(self, object_id, job_id):
""" Call to database to mark the task completed """
self.get_os_dhub().execute(
proc="objectstore.updates.mark_complete",
placeholders=[job_id, object_id],
debug_show=self.DEBUG
)
def mark_object_error(self, object_id, error):
""" Call to database to mark the task completed """
self.get_os_dhub().execute(
proc="objectstore.updates.mark_error",
placeholders=[error, object_id],
debug_show=self.DEBUG
)
class JobDataError(ValueError):
pass
class JobData(dict):
"""
Encapsulates data access from incoming test data structure.
All missing-data errors raise ``JobDataError`` with a useful
message. Unlike regular nested dictionaries, ``JobData`` keeps track of
context, so errors contain not only the name of the immediately-missing
key, but the full parent-key context as well.
"""
def __init__(self, data, context=None):
"""Initialize ``JobData`` with a data dict and a context list."""
self.context = context or []
super(JobData, self).__init__(data)
@classmethod
def from_json(cls, json_blob):
"""Create ``JobData`` from a JSON string."""
try:
data = json.loads(json_blob)
except ValueError as e:
raise JobDataError("Malformed JSON: {0}".format(e))
return cls(data)
def __getitem__(self, name):
"""Get a data value, raising ``JobDataError`` if missing."""
full_context = list(self.context) + [name]
try:
value = super(JobData, self).__getitem__(name)
except KeyError:
raise JobDataError("Missing data: {0}.".format(
"".join(["['{0}']".format(c) for c in full_context])))
# Provide the same behavior recursively to nested dictionaries.
if isinstance(value, dict):
value = self.__class__(value, full_context)
return value

Просмотреть файл

@ -2,7 +2,6 @@ import os
from django.conf import settings
from datasource.bases.BaseHub import BaseHub
from datasource.DataHub import DataHub
from .base import TreeherderModelBase
class RefDataManager(object):

Просмотреть файл

@ -41,7 +41,8 @@ Type 'yes' to continue, or 'no' to cancel: """ % connection.settings_dict['NAME'
if confirm == 'yes':
for sql_file in ('treeherder.sql.tmpl',
'treeherder_reference_1.sql.tmpl'):
'treeherder_reference_1.sql.tmpl',
):
sql = open(os.path.join(options['template_path'], sql_file)).read()
cursor = connection.cursor()

Просмотреть файл

@ -1,12 +1,16 @@
from __future__ import unicode_literals
import uuid
import subprocess
import os
from django.core.cache import cache
from django.db import models
from django.conf import settings
from datasource.bases.BaseHub import BaseHub
from datasource.hubs.MySQL import MySQL
from django.conf import settings
from django.core.cache import cache
from django.db import models
from django.db.models import Max
from treeherder import path
@ -154,13 +158,26 @@ class MachineNote(models.Model):
class DatasourceManager(models.Manager):
def cached(self):
"""Return all datasources, caching the results."""
"""
Return all datasources, caching the results.
"""
sources = cache.get(SOURCES_CACHE_KEY)
if not sources:
sources = list(self.all())
cache.set(SOURCES_CACHE_KEY, sources)
return sources
def latest(self, project, contenttype):
"""
@@@ TODO: this needs to use the cache, probably
"""
ds = Datasource.get_latest_dataset(project, contenttype)
return self.get(
project=project,
contenttype=contenttype,
dataset=ds)
class Datasource(models.Model):
id = models.IntegerField(primary_key=True)
@ -189,6 +206,14 @@ class Datasource(models.Model):
cache.delete(SOURCES_CACHE_KEY)
cls.objects.cached()
@classmethod
def get_latest_dataset(cls, project, contenttype):
"""get the latest dataset"""
return cls.objects.filter(
project=project,
contenttype=contenttype,
).aggregate(Max("dataset"))["dataset__max"]
@property
def key(self):
"""Unique key for a data source is the project, contenttype, dataset."""
@ -199,6 +224,31 @@ class Datasource(models.Model):
"""Unicode representation is the project's unique key."""
return unicode(self.key)
def create_next_dataset(self, schema_file=None):
"""
Create and return the next dataset for this project/contenttype.
The database for the new dataset will be located on the same host.
"""
dataset = Datasource.objects.filter(
project=self.project,
contenttype=self.contenttype
).order_by("-dataset")[0].dataset + 1
# @@@ should we store the schema file name used for the previous
# dataset in the db and use the same one again automatically? or should
# we actually copy the schema of an existing dataset rather than using
# a schema file at all?
return Datasource.objects.create(
project=self.project,
contenttype=self.contenttype,
dataset=dataset,
host=self.datasource.host,
db_type=self.datasource.type,
schema_file=schema_file,
)
def save(self, *args, **kwargs):
inserting = not self.pk
# in case you want to add a new datasource and provide

Просмотреть файл

@ -0,0 +1,21 @@
{
"selects":{
"get_last_insert_id":{
"sql":"SELECT LAST_INSERT_ID() AS `id`",
"host":"master_host"
},
"get_db_size":{
"sql":"SELECT table_schema as db_name,
round(sum( data_length + index_length ) / 1024 / 1024, 2) as size_mb
FROM information_schema.TABLES
WHERE TABLE_SCHEMA like ?
GROUP BY TABLE_SCHEMA
",
"host":"read_host"
}
}
}

Просмотреть файл

@ -0,0 +1,64 @@
{
"views":{
},
"inserts":{
"set_job_data":{
"sql":"INSERT INTO `job` (
`job_guid`,
`job_coalesced_to_guid`,
`result_set_id`,
`build_platform_id`,
`machine_platform_id`,
`machine_id`,
`option_collection_id`,
`job_type_id`,
`product_id`,
`who`,
`reason`,
`result`,
`state`,
`submit_timestamp`,
`start_timestamp`,
`end_timestamp`)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
"host":"master_host"
},
"set_result_set":{
"sql":"INSERT INTO `result_set` (
`revision_hash`)
VALUES (?)",
"host":"master_host"
},
"set_job_log_url":{
"sql":"INSERT INTO `job_log_url` (
`job_id`,
`name`,
`url`)
VALUES (?,?,?)",
"host":"master_host"
},
"set_job_artifact":{
"sql":"INSERT INTO `job_artifact` (
`job_id`,
`name`,
`type`,
`blob`)
VALUES (?,?,?)",
"host":"master_host"
}
},
"selects":{
}
}

Просмотреть файл

@ -0,0 +1,145 @@
{
"inserts":{
"store_json":{
"sql":"INSERT INTO `objectstore` (`loaded_timestamp`,
`json_blob`,
`error`,
`error_msg`)
VALUES (?, ?, ?, ?)
",
"host":"master_host"
}
},
"selects":{
"get_claimed":{
"sql":"SELECT `json_blob`, `id`
FROM `objectstore`
WHERE `worker_id` = CONNECTION_ID()
AND `processed_state` = 'loading'
AND `error` = 'N'",
"host":"master_host"
},
"get_unprocessed":{
"sql":"SELECT `json_blob`, `id`
FROM `objectstore`
WHERE `processed_state` = 'ready'
AND `error` = 'N'
LIMIT ?",
"host":"master_host"
},
"get_all_errors":{
"sql":"SELECT `json_blob`, `id`
FROM `objectstore`
WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN ? AND ?",
"host":"read_host"
},
"get_error_metadata":{
"sql":"SELECT `id`, job_id, loaded_timestamp, processed_state, error_msg, worker_id
FROM `objectstore`
WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN ? AND ?",
"host":"read_host"
},
"get_json_blob":{
"sql":"SELECT json_blob,
error_msg,
error,
processed_state,
loaded_timestamp,
job_id
FROM `objectstore` WHERE `id` = ?",
"host":"read_host"
},
"get_json_blob_for_test_run":{
"sql":"SELECT json_blob, error_msg, error
FROM `objectstore`
WHERE `job_id` IN (REP0)",
"host":"read_host"
},
"get_error_counts":{
"sql":"SELECT
(CASE
WHEN error_msg LIKE 'Malformed JSON%'
THEN 'Malformed JSON'
ELSE 'Other'
END) AS message, count(id) AS count
FROM `objectstore`
WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN REP0 AND REP1
GROUP BY message",
"host":"read_host"
}
},
"updates":{
"mark_loading":{
"sql":"UPDATE `objectstore`
SET `processed_state` = 'loading',
`worker_id` = CONNECTION_ID()
WHERE `processed_state` = 'ready'
AND `error` = 'N'
ORDER BY `id`
LIMIT ?
",
"host":"master_host"
},
"mark_complete":{
"sql":"UPDATE `objectstore`
SET `processed_state` = 'complete', `job_id` = ?
WHERE `processed_state` = 'loading'
AND `id` = ?
AND `worker_id` = CONNECTION_ID()
",
"host":"master_host"
},
"mark_error":{
"sql":"UPDATE `objectstore`
SET `processed_state` = 'ready',
`worker_id` = NULL,
`error` = 'Y',
`error_msg` = ?
WHERE `processed_state` = 'loading'
AND `id` = ?
AND `worker_id` = CONNECTION_ID()
",
"host":"master_host"
}
}
}

Просмотреть файл

@ -46,7 +46,11 @@ class SQLDataSource(object):
def datasource(self):
"""The DataSource model object backing this SQLDataSource."""
if self._datasource is None:
self._datasource = self._get_datasource()
self._datasource = Datasource(
project=self.project,
contenttype=self.contenttype,
)
# self._datasource = self._get_datasource()
return self._datasource
@property
@ -90,7 +94,7 @@ class SQLDataSource(object):
The database for the new dataset will be located on the same host.
"""
dataset = DataSource.objects.filter(
dataset = Datasource.objects.filter(
project=self.project,
contenttype=self.contenttype
).order_by("-dataset")[0].dataset + 1
@ -156,7 +160,7 @@ class SQLDataSource(object):
oauth_consumer_key = uuid.uuid4()
oauth_consumer_secret = uuid.uuid4()
ds = DataSource.objects.create(
ds = Datasource.objects.create(
host=host,
project=project,
contenttype=contenttype,

14
treeherder/model/utils.py Normal file
Просмотреть файл

@ -0,0 +1,14 @@
import time
import datetime
import sys
def get_now_timestamp():
"""
Return a unix timestamp for the current time.
This is useful because it can be mocked out in unit tests.
"""
return int(time.time())