Merge pull request #25 from mozilla/allow-objectstore-update

refactor objectstore to allow updates
This commit is contained in:
Mauro Doglio 2013-06-05 03:45:17 -07:00
Родитель 70bc7a0667 de7efd8c93
Коммит 10d678554f
9 изменённых файлов: 122 добавлений и 57 удалений

Просмотреть файл

@ -32,6 +32,10 @@
WHERE job.id = ?
",
"host": "master_host"
},
"row_by_guid": {
"sql": "SELECT * FROM `job` WHERE `job_guid` = ?",
"host": "master_host"
}
}
}

Просмотреть файл

@ -26,7 +26,11 @@ def ref_data_json():
def job_json(**kwargs):
return json.dumps(job_data(**kwargs))
"""
return a tuple containing a job guid and its json data
"""
data = job_data(**kwargs)
return json.dumps(data), data['job']['job_guid']
def job_data(**kwargs):

Просмотреть файл

@ -42,19 +42,17 @@ def do_job_ingestion(jm, job_data):
"""
for blob in job_data:
jm.store_job_data(json.dumps(blob))
jobs = jm.process_objects(1)
assert len(jobs) == 1
job_id = jobs[0]
jm.store_job_data(json.dumps(blob), blob['job']['job_guid'])
jm.process_objects(1)
# verify the job data
exp_job = clean_job_blob_dict(blob["job"])
act_job = JobDictBuilder(jm, job_id).as_dict()
act_job = JobDictBuilder(jm, blob['job']['job_guid']).as_dict()
assert exp_job == act_job, diff_dict(exp_job, act_job)
# verify the source data
exp_src = clean_source_blob_dict(blob["sources"][0])
act_src = SourceDictBuilder(jm, job_id).as_dict()
act_src = SourceDictBuilder(jm, blob['job']['job_guid']).as_dict()
assert exp_src == act_src, diff_dict(exp_src, act_src)
complete_count = jm.get_os_dhub().execute(
@ -106,15 +104,13 @@ def test_artifact_log_ingestion(jm, initial_data):
u"blob": ""
}
blob = job_data(artifact=artifact)
jm.store_job_data(json.dumps(blob))
job_ids = jm.process_objects(1)
jm.store_job_data(json.dumps(blob), blob['job']['job_guid'])
jm.process_objects(1)
assert get_objectstore_last_error(jm) == u"N"
job_id = job_ids[0]
exp_job = clean_job_blob_dict(blob["job"])
act_job = JobDictBuilder(jm, job_id).as_dict()
act_job = JobDictBuilder(jm, blob['job']['job_guid']).as_dict()
assert exp_job == act_job, diff_dict(exp_job, act_job)
@ -125,7 +121,7 @@ def test_bad_date_value_ingestion(jm, initial_data):
"""
blob = job_data(start_timestamp="foo")
jm.store_job_data(json.dumps(blob))
jm.store_job_data(json.dumps(blob), blob['job']['job_guid'])
job_ids = jm.process_objects(1)
assert get_objectstore_last_error(
@ -136,9 +132,15 @@ def test_bad_date_value_ingestion(jm, initial_data):
class SourceDictBuilder(object):
"""Given a ``job_id``, rebuild the dictionary the source came from."""
def __init__(self, jm, job_id):
def __init__(self, jm, job_guid):
self.jm = jm
self.job_id = job_id
self.job_guid = job_guid
job_data = self.jm.get_jobs_dhub().execute(
proc="jobs_test.selects.row_by_guid",
placeholders=[self.job_guid],
return_type="iter"
).next()
self.job_id = job_data['id']
def as_dict(self):
source = self.jm.get_jobs_dhub().execute(
@ -164,9 +166,15 @@ class SourceDictBuilder(object):
class JobDictBuilder(object):
"""Given a ``job_id``, rebuild the dictionary the job came from."""
def __init__(self, jm, job_id):
def __init__(self, jm, job_guid):
self.jm = jm
self.job_id = job_id
self.job_guid = job_guid
job_data = self.jm.get_jobs_dhub().execute(
proc="jobs_test.selects.row_by_guid",
placeholders=[self.job_guid],
return_type="iter"
).next()
self.job_id = job_data['id']
def as_dict(self):
job = self.jm.get_job(self.job_id)

Просмотреть файл

@ -9,9 +9,9 @@ slow = pytest.mark.slow
def test_claim_objects(jm, sample_data):
"""``claim_objects`` claims & returns unclaimed rows up to a limit."""
blobs = [json.dumps(job) for job in sample_data.job_data[:3]]
for blob in blobs:
jm.store_job_data(blob)
blobs = dict((job['job']['job_guid'], json.dumps(job)) for job in sample_data.job_data[:3])
for job_guid, blob in blobs.items():
jm.store_job_data(blob, job_guid)
rows1 = jm.claim_objects(2)
@ -29,7 +29,7 @@ def test_claim_objects(jm, sample_data):
assert len(rows2) == 1
# all three blobs were fetched by one of the workers
assert set([r["json_blob"] for r in rows1 + rows2]) == set(blobs)
assert set([r["json_blob"] for r in rows1 + rows2]) == set(blobs.values())
# the blobs are all marked as "loading" in the database
assert loading_rows == 3
@ -37,17 +37,16 @@ def test_claim_objects(jm, sample_data):
def test_mark_object_complete(jm):
"""Marks claimed row complete and records run id."""
jm.store_job_data(job_json())
jm.store_job_data(*job_json())
row_id = jm.claim_objects(1)[0]["id"]
job_id = 7 # any arbitrary number; no cross-db constraint checks
revision_hash = "fakehash"
jm.mark_object_complete(row_id, job_id, revision_hash)
jm.mark_object_complete(row_id, revision_hash)
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0]
assert row_data["job_id"] == job_id
assert row_data["revision_hash"] == revision_hash
assert row_data["processed_state"] == "complete"
@ -56,13 +55,13 @@ def test_process_objects(jm, initial_data):
"""Claims and processes a chunk of unprocessed JSON jobs data blobs."""
# Load some rows into the objectstore
blobs = [
job_json(submit_timestamp="1330454755"),
job_json(submit_timestamp="1330454756"),
job_json(submit_timestamp="1330454757"),
job_json(submit_timestamp="1330454755", job_guid="guid1"),
job_json(submit_timestamp="1330454756", job_guid="guid2"),
job_json(submit_timestamp="1330454757", job_guid="guid3"),
]
for blob in blobs:
jm.store_job_data(blob)
jm.store_job_data(*blob)
# just process two rows
jm.process_objects(2)
@ -85,7 +84,7 @@ def test_process_objects(jm, initial_data):
def test_process_objects_invalid_json(jm):
"""process_objects fail for invalid json"""
jm.store_job_data("invalid json")
jm.store_job_data("invalid json", "myguid")
row_id = jm._get_last_insert_id("objectstore")
jm.process_objects(1)
@ -102,7 +101,7 @@ def test_process_objects_invalid_json(jm):
def test_process_objects_unknown_error(jm, monkeypatch):
"""process_objects fail for unknown reason"""
jm.store_job_data("{}")
jm.store_job_data("{}", "myguid")
row_id = jm._get_last_insert_id("objectstore")
# force an unexpected error to occur
@ -148,3 +147,29 @@ def test_ingest_sample_data(jm, sample_data):
assert complete_count == data_length
assert loading_count == 0
assert len(job_rows) == data_length
def test_objectstore_update_content(jm, sample_data):
"""
Test updating an object of the objectstore.
"""
original_obj = sample_data.job_data[0]
jm.store_job_data(json.dumps(original_obj), original_obj["job"]["job_guid"])
obj_updated = original_obj.copy()
obj_updated["job"]["state"] = "new_state"
jm.store_job_data(json.dumps(obj_updated), obj_updated["job"]["job_guid"])
stored_objs = jm.get_os_dhub().execute(
proc="objectstore_test.selects.row_by_guid",
placeholders=[obj_updated["job"]["job_guid"]]
)
# check that it didn't create a new object
assert len(stored_objs) == 1
stored_blob = json.loads(stored_objs[0]["json_blob"])
# check that the blob was updated
assert stored_blob["job"]["state"] == "new_state"

Просмотреть файл

@ -23,6 +23,10 @@
"row": {
"sql": "SELECT * FROM `objectstore` WHERE id = ?",
"host": "master_host"
},
"row_by_guid": {
"sql": "SELECT * FROM `objectstore` WHERE `job_guid` = ?",
"host": "master_host"
}
}
}

Просмотреть файл

@ -1,2 +1,3 @@
from .base import *
from .refdata import *
from .jobs import *

Просмотреть файл

@ -97,20 +97,32 @@ class JobsModel(TreeherderModelBase):
secret = ds.get_oauth_consumer_secret(key)
return secret
def store_job_data(self, json_data, error=None):
"""Write the JSON to the objectstore to be queued for processing."""
def store_job_data(self, json_data, job_guid, error=None):
"""
Write the JSON to the objectstore to be queued for processing.
job_guid is needed in order to decide wether the object exists or not
"""
loaded_timestamp = utils.get_now_timestamp()
error = "N" if error is None else "Y"
error_msg = error or ""
# this query inserts the object if its guid is not present,
# otherwise it does nothing
self.get_os_dhub().execute(
proc='objectstore.inserts.store_json',
placeholders=[loaded_timestamp, json_data, error, error_msg],
placeholders=[loaded_timestamp, job_guid, json_data, error, error_msg, job_guid],
debug_show=self.DEBUG
)
return self._get_last_insert_id()
# this update is needed in case the object was already stored,
# otherwise it's redundant.
# TODO: find a way to do a conditional update
self.get_os_dhub().execute(
proc='objectstore.updates.update_json',
placeholders=[loaded_timestamp, json_data, error, error_msg, job_guid],
debug_show=self.DEBUG
)
def retrieve_job_data(self, limit):
"""
@ -132,12 +144,11 @@ class JobsModel(TreeherderModelBase):
return json_blobs
def load_job_data(self, data):
"""
Load JobData instance into jobs db, return job_id.
@@@: should I return the job_guid instead?
Example:
{
"sources": [
@ -278,8 +289,6 @@ class JobsModel(TreeherderModelBase):
# it is ok to have an empty or missing artifact
pass
return job_id
def _set_result_set(self, revision_hash):
"""Set result set revision hash"""
@ -433,14 +442,13 @@ class JobsModel(TreeherderModelBase):
def process_objects(self, loadlimit):
"""Processes JSON blobs from the objectstore into jobs schema."""
rows = self.claim_objects(loadlimit)
job_ids_loaded = []
for row in rows:
row_id = int(row['id'])
import traceback
try:
data = JobData.from_json(row['json_blob'])
job_id = self.load_job_data(data)
self.load_job_data(data)
revision_hash = data["revision_hash"]
except JobDataError as e:
self.mark_object_error(row_id, str(e))
@ -451,10 +459,7 @@ class JobsModel(TreeherderModelBase):
e.__class__.__name__, unicode(e))
)
else:
self.mark_object_complete(row_id, job_id, revision_hash)
job_ids_loaded.append(job_id)
return job_ids_loaded
self.mark_object_complete(row_id, revision_hash)
def claim_objects(self, limit):
"""
@ -516,11 +521,11 @@ class JobsModel(TreeherderModelBase):
return json_blobs
def mark_object_complete(self, object_id, job_id, revision_hash):
def mark_object_complete(self, object_id, revision_hash):
""" Call to database to mark the task completed """
self.get_os_dhub().execute(
proc="objectstore.updates.mark_complete",
placeholders=[job_id, revision_hash, object_id],
placeholders=[revision_hash, object_id],
debug_show=self.DEBUG
)

Просмотреть файл

@ -3,12 +3,16 @@
"store_json":{
"sql":"INSERT INTO `objectstore` (`loaded_timestamp`,
`job_guid`,
`json_blob`,
`error`,
`error_msg`)
VALUES (?, ?, ?, ?)
",
SELECT ?, ?, ?, ?, ?
FROM DUAL
WHERE NOT EXISTS (
SELECT `job_guid` from `objectstore`
WHERE `job_guid` = ?
)",
"host":"master_host"
}
},
@ -48,7 +52,7 @@
"get_error_metadata":{
"sql":"SELECT `id`, job_id, loaded_timestamp, processed_state, error_msg, worker_id
"sql":"SELECT `id`, job_guid, loaded_timestamp, processed_state, error_msg, worker_id
FROM `objectstore`
WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN ? AND ?",
@ -63,7 +67,7 @@
error,
processed_state,
loaded_timestamp,
job_id
job_guid
FROM `objectstore` WHERE `id` = ?",
"host":"read_host"
@ -73,7 +77,7 @@
"sql":"SELECT json_blob, error_msg, error
FROM `objectstore`
WHERE `job_id` IN (REP0)",
WHERE `job_guid` IN (REP0)",
"host":"read_host"
},
@ -116,7 +120,6 @@
"sql":"UPDATE `objectstore`
SET `processed_state` = 'complete',
`job_id` = ?,
`revision_hash` = ?
WHERE `processed_state` = 'loading'
AND `id` = ?
@ -141,6 +144,17 @@
"host":"master_host"
},
"update_json":{
"sql":"UPDATE `objectstore`
SET `loaded_timestamp` = ?,
`json_blob` = ?,
`error` = ?,
`error_msg` = ?
WHERE `job_guid` = ?",
"host":"master_host"
}
}
}

Просмотреть файл

@ -38,7 +38,7 @@ DROP TABLE IF EXISTS `objectstore`;
*
* Example Data:
*
* job_id - Referenced project_jobs_1.job.id
* job_guid - Referenced project_jobs_1.job.guid
* revision_hash - Hash of any number of revisions associated with the result set.
* loaded_timestamp - Timestamp when the structure was first loaded.
* processed_state - ready | loading | complete
@ -53,7 +53,7 @@ DROP TABLE IF EXISTS `objectstore`;
**************************/
CREATE TABLE `objectstore` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
`job_id` bigint(11) unsigned DEFAULT NULL,
`job_guid` varchar(50) COLLATE utf8_bin NOT NULL,
`revision_hash` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`loaded_timestamp` int(11) unsigned NOT NULL,
`processed_state` enum('ready','loading','complete') COLLATE utf8_bin DEFAULT 'ready',
@ -62,7 +62,7 @@ CREATE TABLE `objectstore` (
`json_blob` mediumblob,
`worker_id` int(11) unsigned DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_job_id` (`job_id`),
KEY `idx_job_id` (`job_guid`),
KEY `idx_processed_state` (`processed_state`),
KEY `idx_error` (`error`),
KEY `idx_worker_id` (`worker_id`)