refactor objectstore to allow updates

This commit is contained in:
mdoglio 2013-06-04 12:16:52 +01:00
Родитель 0d6e2bfe33
Коммит fc5e9f5d9e
8 изменённых файлов: 91 добавлений и 37 удалений

Просмотреть файл

@ -26,7 +26,11 @@ def ref_data_json():
def job_json(**kwargs): def job_json(**kwargs):
return json.dumps(job_data(**kwargs)) """
return a tuple containing a job guid and its json data
"""
data = job_data(**kwargs)
return json.dumps(data), data['job']['job_guid']
def job_data(**kwargs): def job_data(**kwargs):

Просмотреть файл

@ -42,7 +42,7 @@ def do_job_ingestion(jm, job_data):
""" """
for blob in job_data: for blob in job_data:
jm.store_job_data(json.dumps(blob)) jm.store_job_data(json.dumps(blob), blob['job']['job_guid'])
jobs = jm.process_objects(1) jobs = jm.process_objects(1)
assert len(jobs) == 1 assert len(jobs) == 1
job_id = jobs[0] job_id = jobs[0]
@ -106,7 +106,7 @@ def test_artifact_log_ingestion(jm, initial_data):
u"blob": "" u"blob": ""
} }
blob = job_data(artifact=artifact) blob = job_data(artifact=artifact)
jm.store_job_data(json.dumps(blob)) jm.store_job_data(json.dumps(blob), blob['job']['job_guid'])
job_ids = jm.process_objects(1) job_ids = jm.process_objects(1)
assert get_objectstore_last_error(jm) == u"N" assert get_objectstore_last_error(jm) == u"N"
@ -125,7 +125,7 @@ def test_bad_date_value_ingestion(jm, initial_data):
""" """
blob = job_data(start_timestamp="foo") blob = job_data(start_timestamp="foo")
jm.store_job_data(json.dumps(blob)) jm.store_job_data(json.dumps(blob), blob['job']['job_guid'])
job_ids = jm.process_objects(1) job_ids = jm.process_objects(1)
assert get_objectstore_last_error( assert get_objectstore_last_error(

Просмотреть файл

@ -9,9 +9,9 @@ slow = pytest.mark.slow
def test_claim_objects(jm, sample_data): def test_claim_objects(jm, sample_data):
"""``claim_objects`` claims & returns unclaimed rows up to a limit.""" """``claim_objects`` claims & returns unclaimed rows up to a limit."""
blobs = [json.dumps(job) for job in sample_data.job_data[:3]] blobs = dict((job['job']['job_guid'], json.dumps(job)) for job in sample_data.job_data[:3])
for blob in blobs: for job_guid, blob in blobs.items():
jm.store_job_data(blob) jm.store_job_data(blob, job_guid)
rows1 = jm.claim_objects(2) rows1 = jm.claim_objects(2)
@ -29,7 +29,7 @@ def test_claim_objects(jm, sample_data):
assert len(rows2) == 1 assert len(rows2) == 1
# all three blobs were fetched by one of the workers # all three blobs were fetched by one of the workers
assert set([r["json_blob"] for r in rows1 + rows2]) == set(blobs) assert set([r["json_blob"] for r in rows1 + rows2]) == set(blobs.values())
# the blobs are all marked as "loading" in the database # the blobs are all marked as "loading" in the database
assert loading_rows == 3 assert loading_rows == 3
@ -37,17 +37,16 @@ def test_claim_objects(jm, sample_data):
def test_mark_object_complete(jm): def test_mark_object_complete(jm):
"""Marks claimed row complete and records run id.""" """Marks claimed row complete and records run id."""
jm.store_job_data(job_json()) jm.store_job_data(*job_json())
row_id = jm.claim_objects(1)[0]["id"] row_id = jm.claim_objects(1)[0]["id"]
job_id = 7 # any arbitrary number; no cross-db constraint checks
revision_hash = "fakehash" revision_hash = "fakehash"
jm.mark_object_complete(row_id, job_id, revision_hash) jm.mark_object_complete(row_id, revision_hash)
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute( row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0] proc="objectstore_test.selects.row", placeholders=[row_id])[0]
assert row_data["job_id"] == job_id
assert row_data["revision_hash"] == revision_hash assert row_data["revision_hash"] == revision_hash
assert row_data["processed_state"] == "complete" assert row_data["processed_state"] == "complete"
@ -56,13 +55,13 @@ def test_process_objects(jm, initial_data):
"""Claims and processes a chunk of unprocessed JSON jobs data blobs.""" """Claims and processes a chunk of unprocessed JSON jobs data blobs."""
# Load some rows into the objectstore # Load some rows into the objectstore
blobs = [ blobs = [
job_json(submit_timestamp="1330454755"), job_json(submit_timestamp="1330454755", job_guid="guid1"),
job_json(submit_timestamp="1330454756"), job_json(submit_timestamp="1330454756", job_guid="guid2"),
job_json(submit_timestamp="1330454757"), job_json(submit_timestamp="1330454757", job_guid="guid3"),
] ]
for blob in blobs: for blob in blobs:
jm.store_job_data(blob) jm.store_job_data(*blob)
# just process two rows # just process two rows
jm.process_objects(2) jm.process_objects(2)
@ -85,7 +84,7 @@ def test_process_objects(jm, initial_data):
def test_process_objects_invalid_json(jm): def test_process_objects_invalid_json(jm):
"""process_objects fail for invalid json""" """process_objects fail for invalid json"""
jm.store_job_data("invalid json") jm.store_job_data("invalid json", "myguid")
row_id = jm._get_last_insert_id("objectstore") row_id = jm._get_last_insert_id("objectstore")
jm.process_objects(1) jm.process_objects(1)
@ -102,7 +101,7 @@ def test_process_objects_invalid_json(jm):
def test_process_objects_unknown_error(jm, monkeypatch): def test_process_objects_unknown_error(jm, monkeypatch):
"""process_objects fail for unknown reason""" """process_objects fail for unknown reason"""
jm.store_job_data("{}") jm.store_job_data("{}", "myguid")
row_id = jm._get_last_insert_id("objectstore") row_id = jm._get_last_insert_id("objectstore")
# force an unexpected error to occur # force an unexpected error to occur
@ -148,3 +147,29 @@ def test_ingest_sample_data(jm, sample_data):
assert complete_count == data_length assert complete_count == data_length
assert loading_count == 0 assert loading_count == 0
assert len(job_rows) == data_length assert len(job_rows) == data_length
def test_objectstore_update_content(jm, sample_data):
"""
Test updating an object of the objectstore.
"""
original_obj = sample_data.job_data[0]
jm.store_job_data(json.dumps(original_obj), original_obj["job"]["job_guid"])
obj_updated = original_obj.copy()
obj_updated["job"]["state"] = "new_state"
jm.store_job_data(json.dumps(obj_updated), obj_updated["job"]["job_guid"])
stored_objs = jm.get_os_dhub().execute(
proc="objectstore_test.selects.row_by_guid",
placeholders=[obj_updated["job"]["job_guid"]]
)
# check that it didn't create a new object
assert len(stored_objs) == 1
stored_blob = json.loads(stored_objs[0]["json_blob"])
# check that the blob was updated
assert stored_blob["job"]["state"] == "new_state"

Просмотреть файл

@ -23,6 +23,10 @@
"row": { "row": {
"sql": "SELECT * FROM `objectstore` WHERE id = ?", "sql": "SELECT * FROM `objectstore` WHERE id = ?",
"host": "master_host" "host": "master_host"
},
"row_by_guid": {
"sql": "SELECT * FROM `objectstore` WHERE `job_guid` = ?",
"host": "master_host"
} }
} }
} }

Просмотреть файл

@ -1,2 +1,3 @@
from .base import * from .base import *
from .refdata import * from .refdata import *
from .jobs import *

Просмотреть файл

@ -78,8 +78,11 @@ class JobsModel(TreeherderModelBase):
secret = ds.get_oauth_consumer_secret(key) secret = ds.get_oauth_consumer_secret(key)
return secret return secret
def store_job_data(self, json_data, error=None): def store_job_data(self, json_data, job_guid, error=None):
"""Write the JSON to the objectstore to be queued for processing.""" """
Write the JSON to the objectstore to be queued for processing.
job_guid is needed in order to decide wether the object exists or not
"""
loaded_timestamp = utils.get_now_timestamp() loaded_timestamp = utils.get_now_timestamp()
error = "N" if error is None else "Y" error = "N" if error is None else "Y"
@ -87,11 +90,15 @@ class JobsModel(TreeherderModelBase):
self.get_os_dhub().execute( self.get_os_dhub().execute(
proc='objectstore.inserts.store_json', proc='objectstore.inserts.store_json',
placeholders=[loaded_timestamp, json_data, error, error_msg], placeholders=[loaded_timestamp, job_guid, json_data, error, error_msg, job_guid],
debug_show=self.DEBUG debug_show=self.DEBUG
) )
return self._get_last_insert_id() self.get_os_dhub().execute(
proc='objectstore.updates.update_json',
placeholders=[loaded_timestamp, json_data, error, error_msg, job_guid],
debug_show=self.DEBUG
)
def retrieve_job_data(self, limit): def retrieve_job_data(self, limit):
""" """
@ -113,12 +120,11 @@ class JobsModel(TreeherderModelBase):
return json_blobs return json_blobs
def load_job_data(self, data): def load_job_data(self, data):
""" """
Load JobData instance into jobs db, return job_id. Load JobData instance into jobs db, return job_id.
@@@: should I return the job_guid instead?
Example: Example:
{ {
"sources": [ "sources": [
@ -432,7 +438,7 @@ class JobsModel(TreeherderModelBase):
e.__class__.__name__, unicode(e)) e.__class__.__name__, unicode(e))
) )
else: else:
self.mark_object_complete(row_id, job_id, revision_hash) self.mark_object_complete(row_id, revision_hash)
job_ids_loaded.append(job_id) job_ids_loaded.append(job_id)
return job_ids_loaded return job_ids_loaded
@ -497,11 +503,11 @@ class JobsModel(TreeherderModelBase):
return json_blobs return json_blobs
def mark_object_complete(self, object_id, job_id, revision_hash): def mark_object_complete(self, object_id, revision_hash):
""" Call to database to mark the task completed """ """ Call to database to mark the task completed """
self.get_os_dhub().execute( self.get_os_dhub().execute(
proc="objectstore.updates.mark_complete", proc="objectstore.updates.mark_complete",
placeholders=[job_id, revision_hash, object_id], placeholders=[revision_hash, object_id],
debug_show=self.DEBUG debug_show=self.DEBUG
) )

Просмотреть файл

@ -3,12 +3,16 @@
"store_json":{ "store_json":{
"sql":"INSERT INTO `objectstore` (`loaded_timestamp`, "sql":"INSERT INTO `objectstore` (`loaded_timestamp`,
`job_guid`,
`json_blob`, `json_blob`,
`error`, `error`,
`error_msg`) `error_msg`)
VALUES (?, ?, ?, ?) SELECT ?, ?, ?, ?, ?
", FROM DUAL
WHERE NOT EXISTS (
SELECT `job_guid` from `objectstore`
WHERE `job_guid` = ?
)",
"host":"master_host" "host":"master_host"
} }
}, },
@ -48,7 +52,7 @@
"get_error_metadata":{ "get_error_metadata":{
"sql":"SELECT `id`, job_id, loaded_timestamp, processed_state, error_msg, worker_id "sql":"SELECT `id`, job_guid, loaded_timestamp, processed_state, error_msg, worker_id
FROM `objectstore` FROM `objectstore`
WHERE `error` = 'Y' WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN ? AND ?", AND loaded_timestamp BETWEEN ? AND ?",
@ -63,7 +67,7 @@
error, error,
processed_state, processed_state,
loaded_timestamp, loaded_timestamp,
job_id job_guid
FROM `objectstore` WHERE `id` = ?", FROM `objectstore` WHERE `id` = ?",
"host":"read_host" "host":"read_host"
@ -73,7 +77,7 @@
"sql":"SELECT json_blob, error_msg, error "sql":"SELECT json_blob, error_msg, error
FROM `objectstore` FROM `objectstore`
WHERE `job_id` IN (REP0)", WHERE `job_guid` IN (REP0)",
"host":"read_host" "host":"read_host"
}, },
@ -116,7 +120,6 @@
"sql":"UPDATE `objectstore` "sql":"UPDATE `objectstore`
SET `processed_state` = 'complete', SET `processed_state` = 'complete',
`job_id` = ?,
`revision_hash` = ? `revision_hash` = ?
WHERE `processed_state` = 'loading' WHERE `processed_state` = 'loading'
AND `id` = ? AND `id` = ?
@ -141,6 +144,17 @@
"host":"master_host" "host":"master_host"
},
"update_json":{
"sql":"UPDATE `objectstore`
SET `loaded_timestamp` = ?,
`json_blob` = ?,
`error` = ?,
`error_msg` = ?
WHERE `job_guid` = ?",
"host":"master_host"
} }
} }
} }

Просмотреть файл

@ -38,7 +38,7 @@ DROP TABLE IF EXISTS `objectstore`;
* *
* Example Data: * Example Data:
* *
* job_id - Referenced project_jobs_1.job.id * job_guid - Referenced project_jobs_1.job.guid
* revision_hash - Hash of any number of revisions associated with the result set. * revision_hash - Hash of any number of revisions associated with the result set.
* loaded_timestamp - Timestamp when the structure was first loaded. * loaded_timestamp - Timestamp when the structure was first loaded.
* processed_state - ready | loading | complete * processed_state - ready | loading | complete
@ -53,7 +53,7 @@ DROP TABLE IF EXISTS `objectstore`;
**************************/ **************************/
CREATE TABLE `objectstore` ( CREATE TABLE `objectstore` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT, `id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
`job_id` bigint(11) unsigned DEFAULT NULL, `job_guid` varchar(50) COLLATE utf8_bin NOT NULL,
`revision_hash` varchar(50) COLLATE utf8_bin DEFAULT NULL, `revision_hash` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`loaded_timestamp` int(11) unsigned NOT NULL, `loaded_timestamp` int(11) unsigned NOT NULL,
`processed_state` enum('ready','loading','complete') COLLATE utf8_bin DEFAULT 'ready', `processed_state` enum('ready','loading','complete') COLLATE utf8_bin DEFAULT 'ready',
@ -62,7 +62,7 @@ CREATE TABLE `objectstore` (
`json_blob` mediumblob, `json_blob` mediumblob,
`worker_id` int(11) unsigned DEFAULT NULL, `worker_id` int(11) unsigned DEFAULT NULL,
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `idx_job_id` (`job_id`), KEY `idx_job_id` (`job_guid`),
KEY `idx_processed_state` (`processed_state`), KEY `idx_processed_state` (`processed_state`),
KEY `idx_error` (`error`), KEY `idx_error` (`error`),
KEY `idx_worker_id` (`worker_id`) KEY `idx_worker_id` (`worker_id`)