merge from jobs-derived-models

This commit is contained in:
Cameron Dawson 2013-04-26 14:48:03 -07:00
Родитель 5843d2beac 25aee931be
Коммит 356c2fab9a
6 изменённых файлов: 143 добавлений и 188 удалений

Просмотреть файл

@ -4,8 +4,14 @@ import sys
from django.core.management import call_command
import pytest
from django.conf import settings
from django.core.management import call_command
def pytest_addoption(parser):
parser.addoption(
"--runslow",
action="store_true",
help="run slow tests",
)
def pytest_sessionstart(session):
"""
@ -62,6 +68,9 @@ def pytest_runtest_setup(item):
increment_cache_key_prefix()
if 'slow' in item.keywords and not item.config.getoption("--runslow"):
pytest.skip("need --runslow option to run")
def pytest_runtest_teardown(item):
"""
@ -141,6 +150,7 @@ def jm():
return model
def add_test_procs_file(dhub, key, filename):
"""Add an extra procs file in for testing purposes."""
test_proc_file = os.path.join(

Просмотреть файл

@ -41,7 +41,7 @@ def job_data(**kwargs):
}
],
"revision_hash": "24fd64b8251fac5cf60b54a915bffa7e51f636b5",
"jobs": [{
"job": {
'build_platform': build_platform(**kwargs.pop("build_platform", {})),
@ -77,7 +77,7 @@ def job_data(**kwargs):
'product_name': kwargs.pop("product_name", u'firefox'),
'end_timestamp': kwargs.pop("end_timestamp", end_timestamp()),
}]
}
}
# defaults.update(kwargs)
@ -168,60 +168,3 @@ def machine_platform(**kwargs):
defaults.update(kwargs)
return defaults
def create_date_based_data(jm, monkeypatch, dates=None):
"""Store and process some good and some error blobs on specified dates"""
if not dates:
dates = [
get_timestamp_days_ago(5),
get_timestamp_days_ago(4),
get_timestamp_days_ago(3),
]
# 5 days ago
mocknow = dates[0]
def mock_now():
return mocknow
monkeypatch.setattr(utils, 'get_now_timestamp', mock_now)
# store the error blob
blob = job_json(
testrun={"date": dates[0]},
test_build={"name": "one"},
)
badblob = "{0}fooo".format(blob)
jm.store_test_data(badblob, error="badness")
# 4 days ago
mocknow = dates[1]
# store the good blobs
blobs = [
job_json(
testrun={"date": dates[1]},
name="one",
),
job_json(
testrun={"date": dates[1]},
name="three",
),
]
# 3 days ago
mocknow = dates[2]
# store another error blob
blob = job_json(
testrun={"date": dates[2]},
name="four",
)
badblob = "{0}fooo".format(blob)
jm.store_test_data(badblob, error="Malformed JSON")
for blob in blobs:
jm.store_test_data(blob)
# now process all of them
jm.process_objects(4)

Просмотреть файл

@ -1,20 +1,14 @@
import json
import pytest
from .sample_data_generator import job_json
slow = pytest.mark.slow
def test_claim_objects(jm):
"""``claim_objects`` claims & returns unclaimed rows up to a limit."""
blobs = [
job_json(testrun={"date": "1330454755"}),
job_json(testrun={"date": "1330454756"}),
job_json(testrun={"date": "1330454757"}),
]
# import time
# time.sleep(30)
blobs = [json.dumps(job) for job in sample_data.job_data[:3]]
for blob in blobs:
jm.store_job_data(blob)
@ -45,13 +39,15 @@ def test_mark_object_complete(jm):
jm.store_job_data(job_json())
row_id = jm.claim_objects(1)[0]["id"]
job_id = 7 # any arbitrary number; no cross-db constraint checks
revision_hash = "fakehash"
jm.mark_object_complete(row_id, job_id)
jm.mark_object_complete(row_id, job_id, revision_hash)
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0]
assert row_data["job_id"] == job_id
assert row_data["revision_hash"] == revision_hash
assert row_data["processed_state"] == "complete"
@ -125,14 +121,14 @@ def test_process_objects_unknown_error(jm, monkeypatch):
assert row_data['processed_state'] == 'ready'
@slow
def test_ingest_sample_data(jm, sample_data):
"""Process all job structures in the job_data.txt file"""
job_data = sample_data.job_data[:250]
for blob in job_data:
for blob in sample_data.job_data:
# print blob
jm.store_job_data(json.dumps(blob))
data_length = len(job_data)
data_length = len(sample_data.job_data)
# process 10 rows at a time
remaining = data_length

Просмотреть файл

@ -127,49 +127,48 @@ class JobsModel(TreeherderModelBase):
}
],
"revision_hash": "24fd64b8251fac5cf60b54a915bffa7e51f636b5",
"jobs": [
{
"build_platform": {
"platform": "Ubuntu VM 12.04",
"os_name": "linux",
"architecture": "x86_64",
"vm": true
},
"submit_timestamp": 1365732271,
"start_timestamp": "20130411165317",
"name": "xpcshell",
"option_collection": {
"opt": true
},
"log_references": [
{
"url": "http://ftp.mozilla.org/pub/...",
"name": "unittest"
}
],
"who": "sendchange-unittest",
"reason": "scheduler",
artifact:{
type:" json | img | ...",
name:"",
log_urls:[
]
blob:""
},
"machine_platform": {
"platform": "Ubuntu VM 12.04",
"os_name": "linux",
"architecture": "x86_64",
"vm": true
},
"machine": "tst-linux64-ec2-314",
"state": "TODO",
"result": 0,
"job_guid": "d19375ce775f0dc166de01daa5d2e8a73a8e8ebf",
"product_name": "firefox",
"end_timestamp": "1365733932"
}
]
"job": {
"build_platform": {
"platform": "Ubuntu VM 12.04",
"os_name": "linux",
"architecture": "x86_64",
"vm": true
},
"submit_timestamp": 1365732271,
"start_timestamp": "20130411165317",
"name": "xpcshell",
"option_collection": {
"opt": true
},
"log_references": [
{
"url": "http://ftp.mozilla.org/pub/...",
"name": "unittest"
}
],
"who": "sendchange-unittest",
"reason": "scheduler",
artifact:{
type:" json | img | ...",
name:"",
log_urls:[
]
blob:""
},
"machine_platform": {
"platform": "Ubuntu VM 12.04",
"os_name": "linux",
"architecture": "x86_64",
"vm": true
},
"machine": "tst-linux64-ec2-314",
"state": "TODO",
"result": 0,
"job_guid": "d19375ce775f0dc166de01daa5d2e8a73a8e8ebf",
"product_name": "firefox",
"end_timestamp": "1365733932"
}
}
"""
@ -181,85 +180,87 @@ class JobsModel(TreeherderModelBase):
for src in data["sources"]:
revision_id = self._insert_revision(src)
self._insert_revision_map(revision_id, result_set_id)
# Get/Set reference info
# set Job data
rdm = self.refdata_model
job_id = -1
for job in data["jobs"]:
job = data["jobs"]
build_platform_id = rdm.get_or_create_build_platform(
job["build_platform"]["os_name"],
job["build_platform"]["platform"],
job["build_platform"]["architecture"],
build_platform_id = rdm.get_or_create_build_platform(
job["build_platform"]["os_name"],
job["build_platform"]["platform"],
job["build_platform"]["architecture"],
)
machine_platform_id = rdm.get_or_create_machine_platform(
job["machine_platform"]["os_name"],
job["machine_platform"]["platform"],
job["machine_platform"]["architecture"],
)
machine_id = rdm.get_or_create_machine(
job["machine"],
timestamp=max([
job["start_timestamp"],
job["submit_timestamp"],
job["end_timestamp"],
])
)
option_collection_id = rdm.get_or_create_option_collection(
[k for k, v in job["option_collection"].items() if v],
)
job_group, sep, job_name = job["name"].partition("-")
job_type_id = rdm.get_or_create_job_type(
job_name, job_group,
)
product_id = rdm.get_or_create_product(
job["product_name"],
)
result_set_id = self._set_result_set(data["revision_hash"])
job_id = self._set_job_data(
job,
result_set_id,
build_platform_id,
machine_platform_id,
machine_id,
option_collection_id,
job_type_id,
product_id,
)
for log_ref in job["log_references"]:
self._insert_job_log_url(
job_id,
log_ref["name"],
log_ref["url"]
)
machine_platform_id = rdm.get_or_create_machine_platform(
job["machine_platform"]["os_name"],
job["machine_platform"]["platform"],
job["machine_platform"]["architecture"],
try:
artifact = job["artifact"]
self._insert_job_artifact(
job_id,
artifact["name"],
artifact["type"],
artifact["blob"],
)
machine_id = rdm.get_or_create_machine(
job["machine"],
timestamp=max([
job["start_timestamp"],
job["submit_timestamp"],
job["end_timestamp"],
])
)
option_collection_id = rdm.get_or_create_option_collection(
[k for k, v in job["option_collection"].items() if v],
)
job_group, sep, job_name = job["name"].partition("-")
job_type_id = rdm.get_or_create_job_type(
job_name, job_group,
)
product_id = rdm.get_or_create_product(
job["product_name"],
)
job_id = self._set_job_data(
job,
result_set_id,
build_platform_id,
machine_platform_id,
machine_id,
option_collection_id,
job_type_id,
product_id,
)
for log_ref in job["log_references"]:
for log_ref in artifact["log_urls"]:
self._insert_job_log_url(
job_id,
log_ref["name"],
log_ref["url"]
)
try:
artifact = job["artifact"]
self._insert_job_artifact(
job_id,
artifact["name"],
artifact["type"],
artifact["blob"],
)
for log_ref in artifact["log_urls"]:
self._insert_job_log_url(
job_id,
log_ref["name"],
log_ref["url"]
)
except KeyError:
# it is ok to have an empty or missing artifact
pass
except KeyError:
# it is ok to have an empty or missing artifact
pass
return job_id
@ -423,6 +424,7 @@ class JobsModel(TreeherderModelBase):
try:
data = JobData.from_json(row['json_blob'])
job_id = self.load_job_data(data)
revision_hash = data["revision_hash"]
except JobDataError as e:
self.mark_object_error(row_id, str(e))
except Exception as e:
@ -432,7 +434,7 @@ class JobsModel(TreeherderModelBase):
e.__class__.__name__, unicode(e))
)
else:
self.mark_object_complete(row_id, job_id)
self.mark_object_complete(row_id, job_id, revision_hash)
job_ids_loaded.append(job_id)
return job_ids_loaded
@ -468,9 +470,9 @@ class JobsModel(TreeherderModelBase):
#
# The mark_loading SQL statement does execute an UPDATE/LIMIT but now
# implements an "ORDER BY id" clause making the UPDATE
# deterministic/safe. I've been unsuccessfull capturing the specific
# deterministic/safe. I've been unsuccessful capturing the specific
# warning generated without redirecting program flow control. To
# ressolve the problem in production, we're disabling MySQLdb.Warnings
# resolve the problem in production, we're disabling MySQLdb.Warnings
# before executing mark_loading and then re-enabling warnings
# immediately after. If this bug is ever fixed in mysql this handling
# should be removed. Holy Hackery! -Jeads
@ -497,11 +499,11 @@ class JobsModel(TreeherderModelBase):
return json_blobs
def mark_object_complete(self, object_id, job_id):
def mark_object_complete(self, object_id, job_id, revision_hash):
""" Call to database to mark the task completed """
self.get_os_dhub().execute(
proc="objectstore.updates.mark_complete",
placeholders=[job_id, object_id],
placeholders=[job_id, revision_hash, object_id],
debug_show=self.DEBUG
)

Просмотреть файл

@ -115,7 +115,9 @@
"mark_complete":{
"sql":"UPDATE `objectstore`
SET `processed_state` = 'complete', `job_id` = ?
SET `processed_state` = 'complete',
`job_id` = ?,
`revision_hash` = ?
WHERE `processed_state` = 'loading'
AND `id` = ?
AND `worker_id` = CONNECTION_ID()

Просмотреть файл

@ -31,7 +31,7 @@ DROP TABLE IF EXISTS `objectstore`;
*
* An object store for the incoming JSON structures described in
* sample_data/job_data.json.sample. These structures are transfered to
* project_jobs_1.sql.tmpl and treeherder_reference_1.sql.tmpl by a
* project_jobs_1.sql.tmpl and treeherder_reference_1.sql.tmpl by a
* scheduled job.
*
* Population Method: dynamic from incoming data
@ -39,6 +39,7 @@ DROP TABLE IF EXISTS `objectstore`;
* Example Data:
*
* job_id - Referenced project_jobs_1.job.id
* revision_hash - Hash of any number of revisions associated with the result set.
* loaded_timestamp - Timestamp when the structure was first loaded.
* processed_state - ready | loading | complete
* ready - Object ready for processing
@ -53,6 +54,7 @@ DROP TABLE IF EXISTS `objectstore`;
CREATE TABLE `objectstore` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
`job_id` bigint(11) unsigned DEFAULT NULL,
`revision_hash` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`loaded_timestamp` int(11) unsigned NOT NULL,
`processed_state` enum('ready','loading','complete') COLLATE utf8_bin DEFAULT 'ready',
`error` enum('N','Y') COLLATE utf8_bin DEFAULT 'N',