Bug 1140349 - Remove the objectstore code

After the previous commit, the Objectstore is effectively "dead code".
So this commit removes all the dead code after anything left over in
the Objectstore has been drained and added to the DB.
This commit is contained in:
Cameron Dawson 2015-06-18 12:50:50 -07:00
Родитель 45c3fe6e68
Коммит 00cfe6643d
48 изменённых файлов: 306 добавлений и 1461 удалений

Просмотреть файл

@ -8,6 +8,6 @@ worker_pushlog: newrelic-admin run-program celery -A treeherder worker -Q pushlo
worker_buildapi_pending: newrelic-admin run-program celery -A treeherder worker -Q buildapi_pending --maxtasksperchild=20 --concurrency=5
worker_buildapi_running: newrelic-admin run-program celery -A treeherder worker -Q buildapi_running --maxtasksperchild=20 --concurrency=5
worker_buildapi_4hr: newrelic-admin run-program celery -A treeherder worker -Q buildapi_4hr --maxtasksperchild=20 --concurrency=1
worker_default: newrelic-admin run-program celery -A treeherder worker -Q default,process_objects,cycle_data,calculate_eta,populate_performance_series,fetch_bugs --maxtasksperchild=50 --concurrency=3
worker_default: newrelic-admin run-program celery -A treeherder worker -Q default,cycle_data,calculate_eta,populate_performance_series,fetch_bugs --maxtasksperchild=50 --concurrency=3
worker_hp: newrelic-admin run-program celery -A treeherder worker -Q classification_mirroring,publish_to_pulse --maxtasksperchild=50 --concurrency=1
worker_log_parser: newrelic-admin run-program celery -A treeherder worker -Q log_parser_fail,log_parser,log_parser_hp,log_parser_json --maxtasksperchild=50 --concurrency=5

Просмотреть файл

@ -23,6 +23,6 @@ if [ ! -f $LOGFILE ]; then
fi
exec $NEWRELIC_ADMIN celery -A treeherder worker -c 3 \
-Q default,process_objects,cycle_data,calculate_eta,populate_performance_series,fetch_bugs \
-Q default,cycle_data,calculate_eta,populate_performance_series,fetch_bugs \
-E --maxtasksperchild=500 \
--logfile=$LOGFILE -l INFO -n default.%h

Просмотреть файл

@ -18,12 +18,7 @@ Here is a brief description of what each periodic task will do for you:
*fetch-buildapi-build4h*
Same as before, but it collects all the jobs completed in the last 4 hours.
*process-objects*
As the name says, processes job objects from the objectstore to the jobs store.
Once a job is processed, it becomes available in the restful interface for consumption.
See the `dataflow diagram`_ for more info
Follows a data flow diagram which can help to understand better how these tasks are used by treeherder
The following is a data flow diagram which can help to understand better how these tasks are used by treeherder
.. image:: https://cacoo.com/diagrams/870thliGfT89pLZc-B5E80.png
:width: 800px

Просмотреть файл

@ -226,14 +226,6 @@ structures to send, do something like this:
from thclient import (TreeherderAuth, TreeherderClient, TreeherderClientError,
TreeherderJobCollection)
#####
# TreeherderJobCollection() takes a 'type' parameter that can be set to 'update'
# if the job objects are being used for updating status (status = 'running' | 'pending') and
# don't contain a full data payload. If type is not set, the job object go to the
# objectstore (status = 'completed'). If the collection is passed a type like so,
# TreeherderJobCollection(type='update') the status of the object will be updated in
# the RDBS schema
#####
tjc = TreeherderJobCollection()
for data in dataset:

Просмотреть файл

@ -128,13 +128,8 @@ def jm(request):
# patch in additional test-only procs on the datasources
add_test_procs_file(
model.get_dhub("objectstore"),
model.get_datasource("objectstore").key,
"objectstore_test.json",
)
add_test_procs_file(
model.get_dhub("jobs"),
model.get_datasource("jobs").key,
model.get_dhub(),
model.get_datasource().key,
"jobs_test.json",
)
@ -166,17 +161,8 @@ def jobs_ds():
from treeherder.model.models import Datasource
return Datasource.objects.create(
project=settings.DATABASES["default"]["TEST_NAME"],
contenttype="jobs",
)
@pytest.fixture()
def objectstore_ds():
from django.conf import settings
from treeherder.model.models import Datasource
return Datasource.objects.create(
project=settings.DATABASES["default"]["TEST_NAME"],
contenttype="objectstore",
# TODO: remove contenttype from the database/model
contenttype="jobs"
)
@ -324,12 +310,11 @@ def resultset_with_three_jobs(jm, sample_data, sample_resultset):
# Store and process the jobs so they are present in the tables.
jm.store_job_data(blobs)
jm.process_objects(num_jobs, raise_errors=True)
return resultset_creation['inserted_result_set_ids'][0]
@pytest.fixture
def eleven_jobs_stored(jm, sample_data, sample_resultset):
def eleven_jobs_stored(jm, sample_data, sample_resultset, mock_log_parser):
"""stores a list of 11 job samples"""
jm.store_result_set_data(sample_resultset)
@ -359,12 +344,6 @@ def eleven_jobs_stored(jm, sample_data, sample_resultset):
jm.store_job_data(blobs)
@pytest.fixture
def eleven_jobs_processed(jm, mock_log_parser, eleven_jobs_stored):
"""stores and processes list of 11 job samples"""
jm.process_objects(11, raise_errors=True)
@pytest.fixture
def set_oauth_credentials():
OAuthCredentials.set_credentials(SampleData.get_credentials())

Просмотреть файл

@ -48,7 +48,7 @@ def pending_jobs_stored(
pending_jobs.update(result_set_stored[0])
tjc = TreeherderJobCollection(job_type='update')
tjc = TreeherderJobCollection()
tj = tjc.get_job(pending_jobs)
tjc.add(tj)
@ -59,11 +59,11 @@ def pending_jobs_stored(
def running_jobs_stored(
jm, running_jobs, result_set_stored):
"""
stores a list of buildapi running jobs into the objectstore
stores a list of buildapi running jobs
"""
running_jobs.update(result_set_stored[0])
tjc = TreeherderJobCollection(job_type='update')
tjc = TreeherderJobCollection()
tj = tjc.get_job(running_jobs)
tjc.add(tj)
@ -74,7 +74,7 @@ def running_jobs_stored(
def completed_jobs_stored(
jm, completed_jobs, result_set_stored, mock_post_json):
"""
stores a list of buildapi completed jobs into the objectstore
stores a list of buildapi completed jobs
"""
completed_jobs['revision_hash'] = result_set_stored[0]['revision_hash']
@ -83,10 +83,3 @@ def completed_jobs_stored(
tjc.add(tj)
test_utils.post_collection(jm.project, tjc)
@pytest.fixture
def completed_jobs_loaded(jm, completed_jobs_stored):
jm.process_objects(1, raise_errors=True)
jm.disconnect()

Просмотреть файл

@ -31,7 +31,7 @@ def test_running_job_available(jm, initial_data, running_jobs_stored):
assert jobs['results'][0]['state'] == 'running'
def test_completed_job_available(jm, initial_data, completed_jobs_loaded):
def test_completed_job_available(jm, initial_data, completed_jobs_stored):
webapp = TestApp(application)
resp = webapp.get(
reverse("jobs-list", kwargs={"project": jm.project})
@ -59,7 +59,7 @@ def test_pending_stored_to_running_loaded(jm, initial_data, pending_jobs_stored,
assert jobs['results'][0]['state'] == 'running'
def test_finished_job_to_running(jm, initial_data, completed_jobs_loaded, running_jobs_stored):
def test_finished_job_to_running(jm, initial_data, completed_jobs_stored, running_jobs_stored):
"""
tests that a job finished cannot change state
"""

Просмотреть файл

@ -114,7 +114,7 @@ def test_ingest_pending_jobs(jm, initial_data,
etl_process = PendingJobsProcess()
etl_process.run()
stored_obj = jm.get_jobs_dhub().execute(
stored_obj = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
jm.disconnect()
@ -135,7 +135,7 @@ def test_ingest_running_jobs(jm, initial_data,
etl_process = RunningJobsProcess()
etl_process.run()
stored_obj = jm.get_jobs_dhub().execute(
stored_obj = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
jm.disconnect()
@ -156,7 +156,7 @@ def test_ingest_builds4h_jobs(jm, initial_data,
etl_process = Builds4hJobsProcess()
etl_process.run()
stored_obj = jm.get_jobs_dhub().execute(
stored_obj = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
jm.disconnect()
@ -181,7 +181,7 @@ def test_ingest_running_to_complete_job(jm, initial_data,
etl_process = RunningJobsProcess()
etl_process.run()
stored_running = jm.get_jobs_dhub().execute(
stored_running = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
assert len(stored_running) == 1
@ -191,7 +191,7 @@ def test_ingest_running_to_complete_job(jm, initial_data,
etl_process = Builds4hJobsProcess()
etl_process.run()
stored_obj = jm.get_jobs_dhub().execute(
stored_obj = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
jm.disconnect()
@ -217,7 +217,7 @@ def test_ingest_running_job_fields(jm, initial_data,
etl_process = RunningJobsProcess()
etl_process.run()
stored_obj = jm.get_jobs_dhub().execute(
stored_obj = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
jm.disconnect()
@ -278,7 +278,7 @@ def test_ingest_builds4h_jobs_missing_branch(jm, initial_data,
etl_process.run()
stored_obj = jm.get_jobs_dhub().execute(
stored_obj = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
assert len(stored_obj) == 0
@ -309,12 +309,12 @@ def _do_missing_resultset_test(jm, etl_process):
etl_process.run()
stored_obj = jm.get_jobs_dhub().execute(
stored_obj = jm.get_dhub().execute(
proc="jobs_test.selects.jobs")
assert len(stored_obj) == 1
revisions_stored = jm.get_jobs_dhub().execute(
revisions_stored = jm.get_dhub().execute(
proc="jobs_test.selects.revision_ids",
return_type='tuple'
)

Просмотреть файл

@ -11,7 +11,7 @@ from treeherder.etl.classification_mirroring import ElasticsearchDocRequest, Bug
from treeherder.model.derived import ArtifactsModel
def test_elasticsearch_doc_request_body(test_project, eleven_jobs_processed):
def test_elasticsearch_doc_request_body(test_project, eleven_jobs_stored):
"""
Test the request body is created correctly
"""
@ -54,7 +54,7 @@ def test_elasticsearch_doc_request_body(test_project, eleven_jobs_processed):
assert req.body == expected, diff(expected, req.body)
def test_bugzilla_comment_request_body(test_project, eleven_jobs_processed):
def test_bugzilla_comment_request_body(test_project, eleven_jobs_stored):
"""
Test the request body is created correctly
"""
@ -93,7 +93,7 @@ def test_bugzilla_comment_request_body(test_project, eleven_jobs_processed):
assert req.body == expected
def test_bugzilla_comment_length_capped(test_project, eleven_jobs_processed):
def test_bugzilla_comment_length_capped(test_project, eleven_jobs_stored):
"""
Test that the total number of characters in the comment is capped correctly.
"""

Просмотреть файл

@ -27,7 +27,7 @@ def test_ingest_hg_pushlog(jm, initial_data, test_base_dir,
process.run(pushlog_fake_url, jm.project)
pushes_stored = jm.get_jobs_dhub().execute(
pushes_stored = jm.get_dhub().execute(
proc="jobs_test.selects.result_set_ids",
return_type='tuple'
)
@ -47,7 +47,7 @@ def test_ingest_hg_pushlog(jm, initial_data, test_base_dir,
# Ensure we don't match the same revision twice...
rev_to_push.remove(content['revision'])
revisions_stored = jm.get_jobs_dhub().execute(
revisions_stored = jm.get_dhub().execute(
proc="jobs_test.selects.revision_ids",
return_type='tuple'
)
@ -80,7 +80,7 @@ def test_ingest_hg_pushlog_already_stored(jm, initial_data, test_base_dir,
process = HgPushlogProcess()
process.run(pushlog_fake_url, jm.project)
pushes_stored = jm.get_jobs_dhub().execute(
pushes_stored = jm.get_dhub().execute(
proc="jobs_test.selects.result_set_ids",
return_type='tuple'
)
@ -103,7 +103,7 @@ def test_ingest_hg_pushlog_already_stored(jm, initial_data, test_base_dir,
process.run(pushlog_fake_url, jm.project)
pushes_stored = jm.get_jobs_dhub().execute(
pushes_stored = jm.get_dhub().execute(
proc="jobs_test.selects.result_set_ids",
return_type='tuple'
)
@ -128,7 +128,7 @@ def test_ingest_hg_pushlog_not_found_in_json_pushes(jm, initial_data, test_base_
process.run(pushlog_fake_url, jm.project, "123456789012")
pushes_stored = jm.get_jobs_dhub().execute(
pushes_stored = jm.get_dhub().execute(
proc="jobs_test.selects.result_sets",
return_type='tuple'
)
@ -136,7 +136,7 @@ def test_ingest_hg_pushlog_not_found_in_json_pushes(jm, initial_data, test_base_
assert len(pushes_stored) == 1
assert pushes_stored[0]['active_status'] == "onhold"
revisions_stored = jm.get_jobs_dhub().execute(
revisions_stored = jm.get_dhub().execute(
proc="jobs_test.selects.revision_ids",
return_type='tuple'
)

Просмотреть файл

@ -91,14 +91,13 @@ def test_parse_log(jm, initial_data, jobs_with_local_log, sample_resultset,
job['revision_hash'] = sample_resultset[0]['revision_hash']
jm.store_job_data(jobs)
jm.process_objects(1, raise_errors=True)
job_id = jm.get_jobs_dhub().execute(
job_id = jm.get_dhub().execute(
proc="jobs_test.selects.row_by_guid",
placeholders=[jobs[0]['job']['job_guid']]
)[0]['id']
job_artifacts = jm.get_jobs_dhub().execute(
job_artifacts = jm.get_dhub().execute(
proc="jobs_test.selects.job_artifact",
placeholders=[job_id]
)
@ -131,14 +130,13 @@ def test_parse_mozlog_log(jm, initial_data, jobs_with_local_mozlog_log,
job['revision_hash'] = sample_resultset[0]['revision_hash']
jm.store_job_data(jobs)
jm.process_objects(1, raise_errors=True)
job_id = jm.get_jobs_dhub().execute(
job_id = jm.get_dhub().execute(
proc="jobs_test.selects.row_by_guid",
placeholders=[jobs[0]['job']['job_guid']]
)[0]['id']
job_artifacts = jm.get_jobs_dhub().execute(
job_artifacts = jm.get_dhub().execute(
proc="jobs_test.selects.job_artifact",
placeholders=[job_id]
)
@ -170,7 +168,6 @@ def test_parse_talos_log(jm, test_project, initial_data, jobs_with_local_talos_l
jobs = jobs_with_local_talos_log
jm.store_job_data(jobs)
jm.process_objects(1, raise_errors=True)
with ArtifactsModel(test_project) as artifacts_model:
artifact_list = artifacts_model.get_performance_artifact_list(0, 10)
@ -194,14 +191,13 @@ def test_bug_suggestions_artifact(jm, initial_data, jobs_with_local_log,
job['revision_hash'] = sample_resultset[0]['revision_hash']
jm.store_job_data(jobs)
jm.process_objects(1, raise_errors=True)
job_id = jm.get_jobs_dhub().execute(
job_id = jm.get_dhub().execute(
proc="jobs_test.selects.row_by_guid",
placeholders=[jobs[0]['job']['job_guid']]
)[0]['id']
job_artifacts = jm.get_jobs_dhub().execute(
job_artifacts = jm.get_dhub().execute(
proc="jobs_test.selects.job_artifact",
placeholders=[job_id]
)

Просмотреть файл

@ -28,7 +28,7 @@ def test_init_datasources(repository):
count_before = Datasource.objects.all().count()
call_command("init_datasources")
count_after = Datasource.objects.all().count()
assert count_after == count_before + 2
assert count_after == count_before + 1
def test_init_datasources_no_repo():

Просмотреть файл

@ -8,7 +8,7 @@ xfail = pytest.mark.xfail
def test_load_single_artifact(
test_project, eleven_jobs_processed,
test_project, eleven_jobs_stored,
mock_post_json, mock_error_summary,
sample_data):
"""
@ -46,7 +46,7 @@ def test_load_single_artifact(
def test_load_artifact_second_time_fails(
test_project, eleven_jobs_processed,
test_project, eleven_jobs_stored,
mock_post_json, mock_error_summary,
sample_data):
"""

Просмотреть файл

@ -12,7 +12,6 @@ import zlib
from django.conf import settings
from django.core.management import call_command
from treeherder.model.derived.base import DatasetNotFoundError
from treeherder.model.derived import ArtifactsModel
from treeherder.model.derived.jobs import JobsModel
from tests.sample_data_generator import job_data, result_set
@ -48,20 +47,9 @@ def test_disconnect(jm):
# establish the connection to jobs.
jm._get_last_insert_id()
# establish the connection to objectstore
jm.retrieve_job_data(limit=1)
jm.disconnect()
assert not jm.get_os_dhub().connection["master_host"]["con_obj"].open
assert not jm.get_jobs_dhub().connection["master_host"]["con_obj"].open
def test_bad_contenttype(jm):
"""Test trying to get an invalid contenttype"""
with pytest.raises(DatasetNotFoundError):
jm.get_dhub("foo")
jm.disconnect()
assert not jm.get_dhub().connection["master_host"]["con_obj"].open
def test_ingest_single_sample_job(jm, refdata, sample_data, initial_data,
@ -128,7 +116,7 @@ def test_ingest_running_to_retry_sample_job(jm, refdata, sample_data, initial_da
# for pending and running jobs, we call this directly, just like
# the web api does.
jm.load_job_data(job_data)
jm.store_job_data(job_data)
jl = jm.get_job_list(0, 1)
initial_job_id = jl[0]["id"]
@ -136,12 +124,10 @@ def test_ingest_running_to_retry_sample_job(jm, refdata, sample_data, initial_da
# now we simulate the complete version of the job coming in
job['state'] = 'completed'
job['result'] = 'retry'
# convert the job_guid to what it would be on a retry from objectstore
# convert the job_guid to what it would be on a retry
job['job_guid'] = job['job_guid'] + "_" + str(job['end_timestamp'])[-5:]
jm.store_job_data(job_data)
jm.process_objects(10, raise_errors=True)
jl = jm.get_job_list(0, 10)
jm.disconnect()
@ -164,7 +150,7 @@ def test_ingest_running_to_retry_to_success_sample_job(jm, refdata, sample_data,
job['state'] = 'running'
job['result'] = 'unknown'
jm.load_job_data(job_data)
jm.store_job_data(job_data)
jl = jm.get_job_list(0, 1)
initial_job_id = jl[0]["id"]
@ -172,11 +158,10 @@ def test_ingest_running_to_retry_to_success_sample_job(jm, refdata, sample_data,
# now we simulate the complete RETRY version of the job coming in
job['state'] = 'completed'
job['result'] = 'retry'
# convert the job_guid to what it would be on a retry from objectstore
# convert the job_guid to what it would be on a retry
job['job_guid'] = job_guid_root + "_" + str(job['end_timestamp'])[-5:]
jm.store_job_data(job_data)
jm.process_objects(10, raise_errors=True)
# now we simulate the complete SUCCESS version of the job coming in
job['state'] = 'completed'
@ -185,7 +170,6 @@ def test_ingest_running_to_retry_to_success_sample_job(jm, refdata, sample_data,
job['job_guid'] = job_guid_root
jm.store_job_data(job_data)
jm.process_objects(10, raise_errors=True)
jl = jm.get_job_list(0, 10)
@ -210,11 +194,10 @@ def test_ingest_retry_sample_job_no_running(jm, refdata, sample_data, initial_da
# complete version of the job coming in
job['state'] = 'completed'
job['result'] = 'retry'
# convert the job_guid to what it would be on a retry from objectstore
# convert the job_guid to what it would be on a retry
job['job_guid'] = job['job_guid'] + "_" + str(job['end_timestamp'])[-5:]
jm.store_job_data(job_data)
jm.process_objects(10, raise_errors=True)
jl = jm.get_job_list(0, 10)
@ -236,21 +219,21 @@ def test_cycle_all_data(jm, refdata, sample_data, initial_data,
time_now = time.time()
cycle_date_ts = time_now - 7 * 24 * 3600
jm.jobs_execute(
jm.execute(
proc="jobs_test.updates.set_result_sets_push_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.jobs_execute(
jobs_to_be_deleted = jm.execute(
proc="jobs_test.selects.get_jobs_for_cycling",
placeholders=[time_now - 24 * 3600]
)
jobs_before = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_before = jm.execute(proc="jobs_test.selects.jobs")
call_command('cycle_data', sleep_time=0, cycle_interval=1)
jobs_after = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
@ -271,30 +254,30 @@ def test_cycle_one_job(jm, refdata, sample_data, initial_data,
time_now = time.time()
cycle_date_ts = int(time_now - 7 * 24 * 3600)
jm.jobs_execute(
jm.execute(
proc="jobs_test.updates.set_result_sets_push_timestamp",
placeholders=[time_now]
)
jm.jobs_execute(
jm.execute(
proc="jobs_test.updates.set_one_result_set_push_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.jobs_execute(
jobs_to_be_deleted = jm.execute(
proc="jobs_test.selects.get_result_set_jobs",
placeholders=[1]
)
jobs_before = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_before = jm.execute(proc="jobs_test.selects.jobs")
call_command('cycle_data', sleep_time=0, cycle_interval=1, debug=True)
jobs_after = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
# Confirm that the target result set has no jobs in the
# jobs table
jobs_to_be_deleted_after = jm.jobs_execute(
jobs_to_be_deleted_after = jm.execute(
proc="jobs_test.selects.get_result_set_jobs",
placeholders=[1]
)
@ -316,21 +299,21 @@ def test_cycle_all_data_in_chunks(jm, refdata, sample_data, initial_data,
time_now = time.time()
cycle_date_ts = int(time_now - 7 * 24 * 3600)
jm.jobs_execute(
jm.execute(
proc="jobs_test.updates.set_result_sets_push_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.jobs_execute(
jobs_to_be_deleted = jm.execute(
proc="jobs_test.selects.get_jobs_for_cycling",
placeholders=[time_now - 24 * 3600]
)
jobs_before = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_before = jm.execute(proc="jobs_test.selects.jobs")
call_command('cycle_data', sleep_time=0, cycle_interval=1, chunk_size=3)
jobs_after = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
@ -347,42 +330,21 @@ def test_bad_date_value_ingestion(jm, initial_data, mock_log_parser):
blob = job_data(start_timestamp="foo",
revision_hash=rs['revision_hash'])
jm.store_job_data([blob])
jm.store_result_set_data([rs])
jm.process_objects(1)
# Confirm that we don't get a ValueError when casting a non-number
last_error = get_objectstore_last_error(
jm) == u"invalid literal for long() with base 10: 'foo'"
jm.disconnect()
assert not last_error
def get_objectstore_last_error(jm):
row_id = jm._get_last_insert_id("objectstore")
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0]
jm.disconnect()
return row_data['error_msg']
jm.store_job_data([blob])
# if no exception, we are good.
def test_store_result_set_data(jm, initial_data, sample_resultset):
data = jm.store_result_set_data(sample_resultset)
result_set_ids = jm.get_dhub(jm.CT_JOBS).execute(
result_set_ids = jm.get_dhub().execute(
proc="jobs_test.selects.result_set_ids",
key_column='revision_hash',
return_type='dict'
)
revision_ids = jm.get_dhub(jm.CT_JOBS).execute(
revision_ids = jm.get_dhub().execute(
proc="jobs_test.selects.revision_ids",
key_column='revision',
return_type='dict'
@ -446,7 +408,7 @@ def test_store_performance_artifact(
replace = [','.join(['%s'] * len(job_ids))]
performance_artifact_signatures = jm.get_jobs_dhub().execute(
performance_artifact_signatures = jm.get_dhub().execute(
proc="jobs.selects.get_performance_artifact",
debug_show=jm.DEBUG,
placeholders=job_ids,
@ -454,7 +416,7 @@ def test_store_performance_artifact(
return_type='set',
key_column='series_signature')
series_signatures = jm.get_jobs_dhub().execute(
series_signatures = jm.get_dhub().execute(
proc="jobs.selects.get_all_series_signatures",
return_type='set',
key_column='signature',
@ -472,7 +434,7 @@ def test_store_performance_series(jm, test_project):
FakePerfData.SERIES_TYPE,
FakePerfData.SIGNATURE,
FakePerfData.SERIES)
stored_series = jm.get_jobs_dhub().execute(
stored_series = jm.get_dhub().execute(
proc="jobs.selects.get_performance_series",
placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE])
blob = json.loads(zlib.decompress(stored_series[0]['blob']))
@ -496,7 +458,7 @@ def test_store_duplicate_performance_series(jm, test_project):
FakePerfData.SERIES_TYPE,
FakePerfData.SIGNATURE,
series_copy)
stored_series = jm.get_jobs_dhub().execute(
stored_series = jm.get_dhub().execute(
proc="jobs.selects.get_performance_series",
placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE])
blob = json.loads(zlib.decompress(stored_series[0]['blob']))
@ -514,11 +476,11 @@ def test_store_performance_series_timeout_recover(jm, test_project):
# run more or less immediately so that the lock is engaged
def _lock_unlock():
with JobsModel(test_project) as jm2:
jm2.get_jobs_dhub().execute(
jm2.get_dhub().execute(
proc='generic.locks.get_lock',
placeholders=[FakePerfData.get_fake_lock_string()])
time.sleep(1)
jm2.get_jobs_dhub().execute(
jm2.get_dhub().execute(
proc='generic.locks.release_lock',
placeholders=[FakePerfData.get_fake_lock_string()])
t = threading.Thread(target=_lock_unlock)
@ -530,7 +492,7 @@ def test_store_performance_series_timeout_recover(jm, test_project):
FakePerfData.SIGNATURE,
FakePerfData.SERIES)
t.join()
stored_series = jm.get_jobs_dhub().execute(
stored_series = jm.get_dhub().execute(
proc="jobs.selects.get_performance_series",
placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE])
@ -544,7 +506,7 @@ def test_store_performance_series_timeout_recover(jm, test_project):
def test_store_performance_series_timeout_fail(jm, test_project):
# timeout case 2: a lock is on our series, but it will not expire in time
jm.get_jobs_dhub().execute(
jm.get_dhub().execute(
proc='generic.locks.get_lock',
placeholders=[FakePerfData.get_fake_lock_string()])
old_timeout = settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT
@ -554,7 +516,7 @@ def test_store_performance_series_timeout_fail(jm, test_project):
FakePerfData.SERIES_TYPE,
FakePerfData.SIGNATURE,
FakePerfData.SERIES)
stored_series = jm.get_jobs_dhub().execute(
stored_series = jm.get_dhub().execute(
proc="jobs.selects.get_performance_series",
placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE])
assert not stored_series
@ -597,7 +559,7 @@ def test_ingesting_skip_existing(jm, sample_data, initial_data, refdata,
job_data = sample_data.job_data[:1]
test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset)
jm.load_job_data(sample_data.job_data[:2])
jm.store_job_data(sample_data.job_data[:2])
jl = jm.get_job_list(0, 10)
assert len(jl) == 2
@ -614,8 +576,7 @@ def test_ingest_job_with_updated_job_group(jm, refdata, sample_data, initial_dat
first_job["job"]["group_name"] = "first group name"
first_job["job"]["group_symbol"] = "1"
first_job["revision_hash"] = result_set_stored[0]["revision_hash"]
jm.load_job_data([first_job])
jm.process_objects(1)
jm.store_job_data([first_job])
second_job = copy.deepcopy(first_job)
# create a new guid to ingest the job again
@ -625,8 +586,7 @@ def test_ingest_job_with_updated_job_group(jm, refdata, sample_data, initial_dat
second_job["job"]["group_symbol"] = "2"
second_job["revision_hash"] = result_set_stored[0]["revision_hash"]
jm.load_job_data([second_job])
jm.process_objects(1)
jm.store_job_data([second_job])
second_job_lookup = jm.get_job_ids_by_guid([second_job_guid])
second_job_stored = jm.get_job(second_job_lookup[second_job_guid]["id"])

Просмотреть файл

@ -1,182 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import json
import pytest
from .sample_data_generator import job_data
from tests.sample_data_generator import result_set
slow = pytest.mark.slow
def test_claim_objects(jm, sample_data):
"""``claim_objects`` claims & returns unclaimed rows up to a limit."""
blobs = []
blob_lookup = set()
for job in sample_data.job_data[:3]:
blobs.append(job)
blob_lookup.add(json.dumps(job))
jm.store_job_data(blobs)
rows1 = jm.claim_objects(2)
# a separate worker with a separate connection
from treeherder.model.derived.jobs import JobsModel
jm2 = JobsModel(jm.project)
rows2 = jm2.claim_objects(2)
loading_rows = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.counts.loading")[0]["loading_count"]
jm.disconnect()
jm2.disconnect()
assert len(rows1) == 2
# second worker asked for two rows but only got one that was left
assert len(rows2) == 1
# all three blobs were fetched by one of the workers
for r in rows1 + rows2:
assert r['json_blob'] in blob_lookup
# the blobs are all marked as "loading" in the database
assert loading_rows == 3
def test_mark_object_complete(jm):
"""Marks claimed row complete and records run id."""
jm.store_job_data([job_data()])
row_id = jm.claim_objects(1)[0]["id"]
revision_hash = "fakehash"
object_placeholders = [
[revision_hash, row_id]
]
jm.mark_objects_complete(object_placeholders)
row_data = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.selects.row", placeholders=[row_id])[0]
jm.disconnect()
assert row_data["revision_hash"] == revision_hash
assert row_data["processed_state"] == "complete"
def test_process_objects(jm, initial_data, mock_log_parser):
"""Claims and processes a chunk of unprocessed JSON jobs data blobs."""
# Load some rows into the objectstore
rs = result_set()
blobs = [
job_data(submit_timestamp="1330454755",
job_guid="guid1", revision_hash=rs['revision_hash']),
job_data(submit_timestamp="1330454756",
job_guid="guid2", revision_hash=rs['revision_hash']),
job_data(submit_timestamp="1330454757",
job_guid="guid3", revision_hash=rs['revision_hash']),
]
jm.store_result_set_data([rs])
jm.store_job_data(blobs)
# just process two rows
jm.process_objects(2, raise_errors=True)
test_run_rows = jm.get_dhub(jm.CT_JOBS).execute(
proc="jobs_test.selects.jobs")
date_set = set([r['submit_timestamp'] for r in test_run_rows])
expected_dates = set([1330454755, 1330454756, 1330454757])
complete_count = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.counts.complete")[0]["complete_count"]
loading_count = jm.get_dhub(jm.CT_OBJECTSTORE).execute(
proc="objectstore_test.counts.loading")[0]["loading_count"]
jm.disconnect()
assert complete_count == 2
assert loading_count == 0
assert date_set.issubset(expected_dates)
assert len(date_set) == 2
def test_process_objects_unknown_error(jm):
"""process_objects fail for invalid json"""
response = jm.store_job_data(['{invalid json}'])
exp_resp = {u'Unknown error: TypeError: string indices must be integers, not str': '{invalid json}'}
row_id = jm._get_last_insert_id("objectstore")
jm.disconnect()
assert row_id == 0
assert response == exp_resp
def test_ingest_sample_data(jm, sample_data, sample_resultset, mock_log_parser):
"""Process all job structures in the job_data.txt file"""
resultset_count = len(sample_resultset)
jm.store_result_set_data(sample_resultset)
blobs = []
for index, job in enumerate(sample_data.job_data[0:resultset_count]):
job['revision_hash'] = sample_resultset[index]['revision_hash']
blobs.append(job)
jm.store_job_data(blobs)
jm.process_objects(resultset_count, raise_errors=True)
job_rows = jm.get_jobs_dhub().execute(
proc="jobs_test.selects.jobs")
complete_count = jm.get_os_dhub().execute(
proc="objectstore_test.counts.complete")[0]["complete_count"]
loading_count = jm.get_os_dhub().execute(
proc="objectstore_test.counts.loading")[0]["loading_count"]
jm.disconnect()
assert complete_count == resultset_count
assert loading_count == 0
assert len(job_rows) == resultset_count
@pytest.mark.xfail
def test_objectstore_update_content(jm, sample_data):
"""
Test updating an object of the objectstore.
"""
original_obj = sample_data.job_data[100]
jm.store_job_data([original_obj])
obj_updated = original_obj.copy()
obj_updated["job"]["state"] = "pending"
jm.store_job_data([obj_updated])
stored_objs = jm.get_os_dhub().execute(
proc="objectstore_test.selects.row_by_guid",
placeholders=[obj_updated["job"]["job_guid"]]
)
jm.disconnect()
# check that it didn't create a new object
assert len(stored_objs) == 1
stored_blob = json.loads(stored_objs[0]["json_blob"])
# check that the blob was updated
assert stored_blob["job"]["state"] == "pending"

Просмотреть файл

@ -1,32 +0,0 @@
{
"counts": {
"loading": {
"sql": "SELECT COUNT(`id`) AS loading_count
FROM `objectstore`
WHERE processed_state = 'loading'
",
"host_type": "master_host"
},
"complete": {
"sql": "SELECT COUNT(`id`) AS complete_count
FROM `objectstore`
WHERE processed_state = 'complete'
",
"host_type": "master_host"
}
},
"selects": {
"all": {
"sql": "SELECT * FROM `objectstore`",
"host_type": "master_host"
},
"row": {
"sql": "SELECT * FROM `objectstore` WHERE id = ?",
"host_type": "master_host"
},
"row_by_guid": {
"sql": "SELECT * FROM `objectstore` WHERE `job_guid` = ?",
"host_type": "master_host"
}
}
}

Просмотреть файл

@ -3,7 +3,6 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import json
import itertools
from webtest.app import TestApp
from requests import Request
@ -135,12 +134,6 @@ def do_job_ingestion(jm, refdata, job_data, sample_resultset, verify_data=True):
# Store the modified json blobs
jm.store_job_data(blobs)
# Process the job objects in chunks of size == process_objects_limit
process_objects_limit = 1000
chunks = grouper(job_data, process_objects_limit)
for c in chunks:
jm.process_objects(process_objects_limit, raise_errors=True)
if verify_data:
# Confirms stored data matches whats in the reference data structs
verify_build_platforms(refdata, build_platforms_ref)
@ -154,20 +147,6 @@ def do_job_ingestion(jm, refdata, job_data, sample_resultset, verify_data=True):
verify_artifacts(jm, artifacts_ref)
verify_coalesced(jm, coalesced_job_guids, coalesced_replacements)
# Default verification confirms we loaded all of the objects
complete_count = jm.get_os_dhub().execute(
proc="objectstore_test.counts.complete")[0]["complete_count"]
loading_count = jm.get_os_dhub().execute(
proc="objectstore_test.counts.loading")[0]["loading_count"]
assert complete_count == len(job_data)
assert loading_count == 0
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return itertools.izip_longest(*args, fillvalue=fillvalue)
def verify_build_platforms(refdata, build_platforms_ref):
@ -249,7 +228,7 @@ def verify_products(refdata, products_ref):
def verify_result_sets(jm, result_sets_ref):
revision_hashes = jm.get_jobs_dhub().execute(
revision_hashes = jm.get_dhub().execute(
proc='jobs.selects.get_all_result_set_revision_hashes',
key_column='revision_hash',
return_type='set'
@ -260,7 +239,7 @@ def verify_result_sets(jm, result_sets_ref):
def verify_log_urls(jm, log_urls_ref):
log_urls = jm.get_jobs_dhub().execute(
log_urls = jm.get_dhub().execute(
proc='jobs.selects.get_all_log_urls',
key_column='url',
return_type='set'
@ -271,7 +250,7 @@ def verify_log_urls(jm, log_urls_ref):
def verify_artifacts(jm, artifacts_ref):
artifacts = jm.get_jobs_dhub().execute(
artifacts = jm.get_dhub().execute(
proc='jobs.selects.get_all_artifacts',
key_column='name',
return_type='dict'
@ -290,7 +269,7 @@ def verify_coalesced(jm, coalesced_job_guids, coalesced_replacements):
if coalesced_job_guid_list:
rep_str = ','.join(coalesced_replacements)
data = jm.get_jobs_dhub().execute(
data = jm.get_dhub().execute(
proc='jobs.selects.get_jobs_by_coalesced_guids',
replace=[rep_str],
placeholders=coalesced_job_guid_list

Просмотреть файл

@ -18,7 +18,7 @@ xfail = pytest.mark.xfail
# we don't have/need an artifact list endpoint.
def test_artifact_detail(webapp, test_project, eleven_jobs_processed, sample_artifacts, jm):
def test_artifact_detail(webapp, test_project, eleven_jobs_stored, sample_artifacts, jm):
"""
test retrieving a single artifact from the artifact-detail
endpoint.
@ -77,7 +77,7 @@ def test_artifact_detail_bad_project(webapp, jm):
jm.disconnect()
def test_artifact_create_text_log_summary(webapp, test_project, eleven_jobs_processed,
def test_artifact_create_text_log_summary(webapp, test_project, eleven_jobs_stored,
mock_post_json, mock_error_summary,
sample_data):
"""
@ -116,7 +116,7 @@ def test_artifact_create_text_log_summary(webapp, test_project, eleven_jobs_proc
def test_artifact_create_text_log_summary_and_bug_suggestions(
webapp, test_project, eleven_jobs_processed,
webapp, test_project, eleven_jobs_stored,
mock_post_json, mock_error_summary,
sample_data):
"""

Просмотреть файл

@ -10,7 +10,7 @@ import json
from time import time
def test_create_bug_job_map_no_auth(eleven_jobs_processed, jm):
def test_create_bug_job_map_no_auth(eleven_jobs_stored, jm):
"""
test creating a single note via endpoint
"""
@ -33,7 +33,7 @@ def test_create_bug_job_map_no_auth(eleven_jobs_processed, jm):
jm.disconnect()
def test_create_bug_job_map(eleven_jobs_processed, mock_message_broker, jm):
def test_create_bug_job_map(eleven_jobs_stored, mock_message_broker, jm):
"""
test creating a single note via endpoint
"""
@ -67,7 +67,7 @@ def test_create_bug_job_map(eleven_jobs_processed, mock_message_broker, jm):
jm.disconnect()
def test_create_bug_job_map_dupe(eleven_jobs_processed, mock_message_broker, jm):
def test_create_bug_job_map_dupe(eleven_jobs_stored, mock_message_broker, jm):
"""
test creating the same bug map skips it
"""
@ -106,7 +106,7 @@ def test_create_bug_job_map_dupe(eleven_jobs_processed, mock_message_broker, jm)
jm.disconnect()
def test_bug_job_map_list(webapp, jm, eleven_jobs_processed):
def test_bug_job_map_list(webapp, jm, eleven_jobs_stored):
"""
test retrieving a list of bug_job_map
"""
@ -139,7 +139,7 @@ def test_bug_job_map_list(webapp, jm, eleven_jobs_processed):
jm.disconnect()
def test_bug_job_map_detail(webapp, jm, eleven_jobs_processed):
def test_bug_job_map_detail(webapp, jm, eleven_jobs_stored):
"""
test retrieving a list of bug_job_map
"""
@ -173,7 +173,7 @@ def test_bug_job_map_detail(webapp, jm, eleven_jobs_processed):
jm.disconnect()
def test_bug_job_map_delete(webapp, eleven_jobs_processed,
def test_bug_job_map_delete(webapp, eleven_jobs_stored,
jm, mock_message_broker):
"""
test retrieving a list of bug_job_map
@ -208,7 +208,7 @@ def test_bug_job_map_delete(webapp, eleven_jobs_processed,
jm.disconnect()
def test_bug_job_map_delete_no_auth(jm, eleven_jobs_processed):
def test_bug_job_map_delete_no_auth(jm, eleven_jobs_stored):
"""
test retrieving a list of bug_job_map
"""

Просмотреть файл

@ -9,7 +9,7 @@ from django.contrib.auth.models import User
import json
def test_job_list(webapp, eleven_jobs_processed, jm):
def test_job_list(webapp, eleven_jobs_stored, jm):
"""
test retrieving a list of ten json blobs from the jobs-list
endpoint.
@ -72,7 +72,7 @@ def test_job_list(webapp, eleven_jobs_processed, jm):
assert jobs[0]['id'] == 10
def test_job_list_bad_project(webapp, eleven_jobs_processed, jm):
def test_job_list_bad_project(webapp, eleven_jobs_stored, jm):
"""
test retrieving a job list with a bad project throws 404.
"""
@ -83,7 +83,7 @@ def test_job_list_bad_project(webapp, eleven_jobs_processed, jm):
webapp.get(badurl, status=404)
def test_job_list_equals_filter(webapp, eleven_jobs_processed, jm):
def test_job_list_equals_filter(webapp, eleven_jobs_stored, jm):
"""
test retrieving a job list with a querystring filter.
"""
@ -96,7 +96,7 @@ def test_job_list_equals_filter(webapp, eleven_jobs_processed, jm):
assert len(resp['results']) == 1
def test_job_list_in_filter(webapp, eleven_jobs_processed, jm):
def test_job_list_in_filter(webapp, eleven_jobs_stored, jm):
"""
test retrieving a job list with a querystring filter.
"""
@ -110,7 +110,7 @@ def test_job_list_in_filter(webapp, eleven_jobs_processed, jm):
assert len(resp['results']) == 2
def test_job_detail(webapp, eleven_jobs_processed, sample_artifacts, jm):
def test_job_detail(webapp, eleven_jobs_stored, sample_artifacts, jm):
"""
test retrieving a single job from the jobs-detail
endpoint.
@ -128,7 +128,7 @@ def test_job_detail(webapp, eleven_jobs_processed, sample_artifacts, jm):
jm.disconnect()
def test_job_retrigger_unauthorized(webapp, eleven_jobs_processed, jm):
def test_job_retrigger_unauthorized(webapp, eleven_jobs_stored, jm):
"""
Validate that only authenticated users can hit this endpoint.
"""
@ -138,7 +138,7 @@ def test_job_retrigger_unauthorized(webapp, eleven_jobs_processed, jm):
webapp.post(url, status=403)
def test_job_retrigger_authorized(webapp, eleven_jobs_processed, jm,
def test_job_retrigger_authorized(webapp, eleven_jobs_stored, jm,
pulse_action_consumer):
"""
Validate that only authenticated users can hit this endpoint.
@ -163,7 +163,7 @@ def test_job_retrigger_authorized(webapp, eleven_jobs_processed, jm,
user.delete()
def test_job_cancel_authorized(webapp, eleven_jobs_processed, jm,
def test_job_cancel_authorized(webapp, eleven_jobs_stored, jm,
pulse_action_consumer):
"""
Validate that only authenticated users can hit this endpoint.
@ -188,7 +188,7 @@ def test_job_cancel_authorized(webapp, eleven_jobs_processed, jm,
user.delete()
def test_job_detail_bad_project(webapp, eleven_jobs_processed, jm):
def test_job_detail_bad_project(webapp, eleven_jobs_stored, jm):
"""
test retrieving a single job from the jobs-detail
endpoint.

Просмотреть файл

@ -124,7 +124,7 @@ def test_note_detail_bad_project(webapp, jm):
jm.disconnect()
def test_create_note(webapp, eleven_jobs_processed, mock_message_broker, jm):
def test_create_note(webapp, eleven_jobs_stored, mock_message_broker, jm):
"""
test creating a single note via endpoint when authenticated
"""
@ -165,7 +165,7 @@ def test_create_note(webapp, eleven_jobs_processed, mock_message_broker, jm):
jm.disconnect()
def test_create_note_no_auth(eleven_jobs_processed, jm):
def test_create_note_no_auth(eleven_jobs_stored, jm):
"""
test creating a single note via endpoint when not authenticated
gets a 403 Forbidden

Просмотреть файл

@ -1,99 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
from django.core.urlresolvers import reverse
from treeherder.client import TreeherderJobCollection
from tests import test_utils
def test_objectstore_list(webapp, eleven_jobs_stored, jm):
"""
test retrieving a list of ten json blobs from the objectstore-list
endpoint.
"""
resp = webapp.get(
reverse('objectstore-list',
kwargs={'project': jm.project})
)
assert resp.status_int == 200
assert isinstance(resp.json, list)
assert len(resp.json) == 10
def test_objectstore_detail(webapp, eleven_jobs_stored, sample_data, jm):
"""
test retrieving a single json blobs from the objectstore-detail
endpoint.
"""
job_guid = sample_data.job_data[0]["job"]["job_guid"]
resp = webapp.get(
reverse('objectstore-detail',
kwargs={'project': jm.project, 'pk': job_guid})
)
assert resp.status_int == 200
assert isinstance(resp.json, dict)
assert resp.json['job']['job_guid'] == job_guid
def test_objectstore_detail_not_found(webapp, jm):
"""
test retrieving a HTTP 404 from the objectstore-detail
endpoint.
"""
resp = webapp.get(
reverse('objectstore-detail',
kwargs={'project': jm.project, 'pk': 'myguid1'}),
expect_errors=True
)
assert resp.status_int == 404
def test_objectstore_with_bad_secret(job_sample, jm):
"""
test calling with the wrong project secret.
extected result are:
- return code 403
- return message authentication failed
"""
tjc = TreeherderJobCollection()
tj = tjc.get_job(job_sample)
tjc.add(tj)
resp = test_utils.post_collection(
jm.project, tjc, status=403, consumer_secret='not-so-secret'
)
assert resp.status_int == 403
assert resp.json['detail'] == "Client authentication failed for project, {0}".format(jm.project)
assert resp.json['response'] == "invalid_client"
def test_objectstore_with_bad_key(job_sample, jm):
"""
test calling with the wrong project key.
extected result are:
- return code 403
- return message failed
"""
tjc = TreeherderJobCollection()
tj = tjc.get_job(job_sample)
tjc.add(tj)
resp = test_utils.post_collection(
jm.project, tjc, status=403, consumer_key='wrong-key'
)
assert resp.status_int == 403
assert resp.json['response'] == "access_denied"
assert resp.json['detail'] == "oauth_consumer_key does not match project, {0}, credentials".format(jm.project)

Просмотреть файл

@ -3,7 +3,7 @@
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
def test_project_endpoint(webapp, eleven_jobs_processed, jm):
def test_project_endpoint(webapp, eleven_jobs_stored, jm):
"""
tests the project endpoint
"""
@ -13,7 +13,7 @@ def test_project_endpoint(webapp, eleven_jobs_processed, jm):
assert resp.json['max_performance_artifact_id'] == 0
def test_project_endpoint_does_not_exist(webapp, eleven_jobs_processed, jm):
def test_project_endpoint_does_not_exist(webapp, eleven_jobs_stored, jm):
"""
tests the project endpoint where project does not exist
"""

Просмотреть файл

@ -13,7 +13,7 @@ from treeherder.webapp.api import utils
import json
def test_resultset_list(webapp, eleven_jobs_processed, jm):
def test_resultset_list(webapp, eleven_jobs_stored, jm):
"""
test retrieving a list of ten json blobs from the jobs-list
endpoint. ``full`` set to false, so it doesn't return revisions.
@ -82,7 +82,7 @@ def test_resultset_list_empty_rs_still_show(webapp, initial_data,
jm.disconnect()
def test_resultset_list_filter_by_revision(webapp, eleven_jobs_processed, jm):
def test_resultset_list_filter_by_revision(webapp, eleven_jobs_stored, jm):
"""
test retrieving a resultset list, filtered by a date range
"""
@ -175,7 +175,7 @@ def test_resultset_list_without_jobs(webapp, initial_data,
jm.disconnect()
def test_resultset_detail(webapp, eleven_jobs_processed, jm):
def test_resultset_detail(webapp, eleven_jobs_stored, jm):
"""
test retrieving a resultset from the resultset-detail
endpoint.
@ -240,7 +240,7 @@ def test_resultset_create(sample_resultset, jm, initial_data):
assert resp.status_int == 200
assert resp.json['message'] == 'well-formed JSON stored'
stored_objs = jm.get_jobs_dhub().execute(
stored_objs = jm.get_dhub().execute(
proc="jobs_test.selects.resultset_by_rev_hash",
placeholders=[sample_resultset[0]['revision_hash']]
)
@ -316,7 +316,7 @@ def test_resultset_cancel_all(jm, resultset_with_three_jobs, pulse_action_consum
user.delete()
def test_resultset_status(webapp, eleven_jobs_processed, jm):
def test_resultset_status(webapp, eleven_jobs_stored, jm):
"""
test retrieving the status of a resultset
"""

Просмотреть файл

@ -49,7 +49,7 @@ def sample_artifacts(jm, sample_data):
@pytest.fixture
def sample_notes(jm, sample_data, eleven_jobs_processed):
def sample_notes(jm, sample_data, eleven_jobs_stored):
"""provide 11 jobs with job notes."""
jobs = jm.get_job_list(0, 10)

Просмотреть файл

@ -557,7 +557,7 @@ class TreeherderJobCollection(TreeherderCollection):
Collection of job objects
"""
def __init__(self, data=[], job_type=''):
def __init__(self, data=[]):
super(TreeherderJobCollection, self).__init__('jobs', data)

Просмотреть файл

@ -397,9 +397,7 @@ class PendingRunningTransformerMixin(object):
treeherder_data['job'] = new_job
if project not in th_collections:
th_collections[project] = TreeherderJobCollection(
job_type='update'
)
th_collections[project] = TreeherderJobCollection()
# get treeherder job instance and add the job instance
# to the collection instance

Просмотреть файл

@ -128,8 +128,8 @@ def generate_job_guid(request_id, buildername, endtime=None):
# for some jobs (I'm looking at you, ``retry``) we need the endtime to be
# unique because the job_guid otherwise looks identical
# for all retries and the complete job. The ``job_guid`` needs to be
# unique in the ``objectstore``, or it will skip the rest of the retries
# and/or the completed outcome.
# unique, or else each retry will overwrite the last, and finally the complete
# job will overwrite that. Then you'll never know there were any retries.
if endtime:
job_guid = "{0}_{1}".format(job_guid, str(endtime)[-5:])
return job_guid

Просмотреть файл

@ -20,7 +20,7 @@ DEFAULT_CREDENTIALS_PATH = os.path.join(
class Command(BaseCommand):
"""Management command to export project credentials."""
help = "Exports the objectstore Oauth keys for etl data import tasks"
help = "Exports the Oauth keys for etl data import tasks"
option_list = BaseCommand.option_list + (
make_option(

Просмотреть файл

@ -13,7 +13,7 @@ class Command(BaseCommand):
"""Management command to import project credentials."""
help = "Import the objectstore Oauth keys for etl data import tasks"
help = "Import the Oauth keys for etl data import tasks"
args = "<credentials_file>"
def handle(self, *args, **options):
@ -23,7 +23,7 @@ class Command(BaseCommand):
with open(args[0]) as credentials_file:
credentials = json.loads(credentials_file.read())
ds_list = Datasource.objects.filter(project__in=credentials.keys(),
contenttype='objectstore')
contenttype='jobs')
datasource_dict = dict((ds.project, ds) for ds in ds_list)
for project, cred in credentials.items():
if project in datasource_dict:

Просмотреть файл

@ -7,8 +7,7 @@ from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from treeherder.etl.pushlog import HgPushlogProcess
from treeherder.model.derived import JobsModel, RefDataManager
from treeherder.model.tasks import process_objects
from treeherder.model.derived import RefDataManager
from treeherder.etl.buildapi import (RunningJobsProcess,
PendingJobsProcess,
Builds4hJobsProcess)
@ -37,11 +36,6 @@ class Command(BaseCommand):
"(e.g. 'T')")
)
def _process_all_objects_for_project(self, project):
jm = JobsModel(project)
while jm.get_num_unprocessed_objects() > 0:
process_objects.delay(project=project)
def _handle(self, *args, **options):
if len(args) != 2:
raise CommandError("Need to specify (only) branch and changeset")
@ -69,8 +63,6 @@ class Command(BaseCommand):
# job ingestion expects the short version, so we truncate it.
push_sha = process.run(pushlog_url, project, changeset=changeset)[0:12]
self._process_all_objects_for_project(project)
Builds4hJobsProcess().run(filter_to_project=project,
filter_to_revision=push_sha,
filter_to_job_group=options['filter_job_group'])
@ -81,8 +73,6 @@ class Command(BaseCommand):
filter_to_revision=push_sha,
filter_to_job_group=options['filter_job_group'])
self._process_all_objects_for_project(project)
def handle(self, *args, **options):
if options['profile_file']:

Просмотреть файл

@ -6,7 +6,6 @@ from StringIO import StringIO
import gzip
import urllib2
import logging
from collections import defaultdict
import simplejson as json
@ -53,31 +52,6 @@ class JsonLoaderMixin(object):
return urllib2.urlopen(req, json.dumps(data), timeout=settings.TREEHERDER_REQUESTS_TIMEOUT)
class ObjectstoreLoaderMixin(JsonLoaderMixin):
def load(self, jobs):
"""post a list of jobs to the objectstore ingestion endpoint """
# group the jobs by project
projects = defaultdict(list)
for job in jobs:
projects[job['project']].append(job)
for project, jobs in projects.items():
endpoint = reverse('objectstore-list', kwargs={"project": project})
url = "{0}/{1}/".format(
settings.API_HOSTNAME.strip('/'),
endpoint.strip('/')
)
response = super(ObjectstoreLoaderMixin, self).load(url, jobs)
if response.getcode() != 200:
message = json.loads(response.read())
logger.error("Job loading failed: {0}".format(message['message']))
class ResultSetsLoaderMixin(JsonLoaderMixin):
def load(self, result_sets, project):

Просмотреть файл

@ -16,8 +16,7 @@ from treeherder.etl.pushlog import HgPushlogProcess
@task(name='fetch-buildapi-pending', time_limit=3 * 60)
def fetch_buildapi_pending():
"""
Fetches the buildapi pending jobs api and load them to
the objectstore ingestion endpoint
Fetches the buildapi pending jobs api and load them
"""
PendingJobsProcess().run()
@ -25,8 +24,7 @@ def fetch_buildapi_pending():
@task(name='fetch-buildapi-running', time_limit=3 * 60)
def fetch_buildapi_running():
"""
Fetches the buildapi running jobs api and load them to
the objectstore ingestion endpoint
Fetches the buildapi running jobs api and load them
"""
RunningJobsProcess().run()
@ -34,8 +32,7 @@ def fetch_buildapi_running():
@task(name='fetch-buildapi-build4h', time_limit=3 * 60)
def fetch_buildapi_build4h():
"""
Fetches the buildapi running jobs api and load them to
the objectstore ingestion endpoint
Fetches the buildapi running jobs api and load them
"""
Builds4hJobsProcess().run()

Просмотреть файл

@ -21,14 +21,8 @@ class ArtifactsModel(TreeherderModelBase):
"""
Represent the artifacts for a job repository
content-types:
jobs
"""
# content types that every project will have
CT_JOBS = "jobs"
CONTENT_TYPES = [CT_JOBS]
INDEXED_COLUMNS = {
"job_artifact": {
"id": "id",
@ -45,8 +39,8 @@ class ArtifactsModel(TreeherderModelBase):
}
}
def jobs_execute(self, **kwargs):
return utils.retry_execute(self.get_dhub(self.CT_JOBS), logger, **kwargs)
def execute(self, **kwargs):
return utils.retry_execute(self.get_dhub(), logger, **kwargs)
def get_job_artifact_references(self, job_id):
"""
@ -55,7 +49,7 @@ class ArtifactsModel(TreeherderModelBase):
This is everything about the artifact, but not the artifact blob
itself.
"""
data = self.jobs_execute(
data = self.execute(
proc="jobs.selects.get_job_artifact_references",
placeholders=[job_id],
debug_show=self.DEBUG,
@ -79,7 +73,7 @@ class ArtifactsModel(TreeherderModelBase):
proc = "jobs.selects.get_job_artifact"
data = self.jobs_execute(
data = self.execute(
proc=proc,
replace=repl,
placeholders=placeholders,
@ -112,7 +106,7 @@ class ArtifactsModel(TreeherderModelBase):
proc = "jobs.selects.get_performance_artifact_list"
data = self.jobs_execute(
data = self.execute(
proc=proc,
replace=repl,
placeholders=placeholders,
@ -131,7 +125,7 @@ class ArtifactsModel(TreeherderModelBase):
def get_max_performance_artifact_id(self):
"""Get the maximum performance artifact id."""
data = self.jobs_execute(
data = self.execute(
proc="jobs.selects.get_max_performance_artifact_id",
debug_show=self.DEBUG,
)
@ -141,7 +135,7 @@ class ArtifactsModel(TreeherderModelBase):
"""
Store a list of job_artifacts given a list of placeholders
"""
self.jobs_execute(
self.execute(
proc='jobs.inserts.set_job_artifact',
debug_show=self.DEBUG,
placeholders=artifact_placeholders,
@ -181,13 +175,13 @@ class ArtifactsModel(TreeherderModelBase):
# adapt and load data into placeholder structures
tda.adapt_and_load(ref_data, job_data, perf_data)
self.jobs_execute(
self.execute(
proc="jobs.inserts.set_performance_artifact",
debug_show=self.DEBUG,
placeholders=tda.performance_artifact_placeholders,
executemany=True)
self.jobs_execute(
self.execute(
proc='jobs.inserts.set_series_signature',
debug_show=self.DEBUG,
placeholders=tda.signature_property_placeholders,
@ -327,7 +321,7 @@ class ArtifactsModel(TreeherderModelBase):
jobs_signatures_where_in_clause = [','.join(['%s'] * len(job_ids))]
job_data = self.jobs_execute(
job_data = self.execute(
proc='jobs.selects.get_signature_list_from_job_ids',
debug_show=self.DEBUG,
replace=jobs_signatures_where_in_clause,

Просмотреть файл

@ -29,8 +29,8 @@ class TreeherderModelBase(object):
"""Encapsulate the dataset access for this ``project`` """
self.project = project
self.sources = {}
self.dhubs = {}
self.source = None
self.dhub = None
self.DEBUG = settings.DEBUG
self.refdata_model = RefDataManager()
@ -51,9 +51,7 @@ class TreeherderModelBase(object):
for source in Datasource.objects.cached():
if (source.contenttype == 'objectstore') and \
source.oauth_consumer_key and \
source.oauth_consumer_secret:
if source.oauth_consumer_key and source.oauth_consumer_secret:
credentials[source.project] = {
'consumer_key': source.oauth_consumer_key,
@ -62,27 +60,27 @@ class TreeherderModelBase(object):
return credentials
def get_dhub(self, contenttype, procs_file_name=None):
def get_dhub(self, procs_file_name=None):
"""
The configured datahub for the given contenttype
The configured datahub
"""
if not procs_file_name: # pragma: no cover
procs_file_name = "{0}.json".format(contenttype)
procs_file_name = "jobs.json"
if contenttype not in self.dhubs.keys():
datasource = self.get_datasource(contenttype)
if not self.dhub:
datasource = self.get_datasource()
self.dhubs[contenttype] = datasource.dhub(procs_file_name)
return self.dhubs[contenttype]
self.dhub = datasource.dhub(procs_file_name)
return self.dhub
def get_datasource(self, contenttype):
"""The datasource for this contenttype of the project."""
def get_datasource(self):
"""The datasource of the project."""
if contenttype not in self.sources.keys():
self.sources[contenttype] = self._get_datasource(contenttype)
if not self.source:
self.source = self._get_datasource()
return self.sources[contenttype]
return self.source
def get_inserted_row_ids(self, dhub):
"""
@ -144,30 +142,28 @@ class TreeherderModelBase(object):
def disconnect(self):
"""Iterate over and disconnect all data sources."""
self.refdata_model.disconnect()
for dhub in self.dhubs.itervalues():
dhub.disconnect()
if self.dhub:
self.dhub.disconnect()
def _get_datasource(self, contenttype):
"""Find the datasource for this contenttype in the cache."""
def _get_datasource(self):
"""Find the datasource in the cache."""
try:
return next(source for source in Datasource.objects.cached()
if source.project == self.project and source.contenttype == contenttype)
if source.project == self.project and source.contenttype == 'jobs')
except StopIteration:
raise DatasetNotFoundError(self.project, contenttype)
raise DatasetNotFoundError(self.project)
@python_2_unicode_compatible
class DatasetNotFoundError(ValueError):
def __init__(self, project, contenttype, *args, **kwargs):
def __init__(self, project, *args, **kwargs):
super(DatasetNotFoundError, self).__init__(*args, **kwargs)
self.project = project
self.contenttype = contenttype
def __str__(self):
return u"No dataset found for project {0} and contenttype '{1}'".format(
return u"No dataset found for project {0}".format(
self.project,
self.contenttype,
)

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -22,14 +22,6 @@ class Command(BaseCommand):
default=False,
help='Write debug messages to stdout'),
make_option(
'--objectstore-cycle-interval',
action='store',
dest='os_cycle_interval',
default=0,
type='int',
help='Data cycle interval for the objectstore, expressed in days'),
make_option(
'--cycle-interval',
action='store',
@ -38,15 +30,6 @@ class Command(BaseCommand):
type='int',
help='Data cycle interval expressed in days'),
make_option(
'--objectstore-chunk-size',
action='store',
dest='os_chunk_size',
default=10000,
type='int',
help=('Define the size of the chunks '
'the objectstore data will be divided in')),
make_option(
'--chunk-size',
action='store',
@ -68,18 +51,12 @@ class Command(BaseCommand):
def handle(self, *args, **options):
self.is_debug = options['debug']
if options['os_cycle_interval']:
os_cycle_interval = datetime.timedelta(days=options['os_cycle_interval'])
else:
os_cycle_interval = settings.OBJECTSTORE_CYCLE_INTERVAL
if options['cycle_interval']:
cycle_interval = datetime.timedelta(days=options['cycle_interval'])
else:
cycle_interval = settings.DATA_CYCLE_INTERVAL
self.debug("cycle interval... objectstore: {}, jobs: {}".format(os_cycle_interval,
cycle_interval))
self.debug("cycle interval... jobs: {}".format(cycle_interval))
projects = Datasource.objects\
.filter(contenttype='jobs')\
@ -87,13 +64,10 @@ class Command(BaseCommand):
for project in projects:
self.debug("Cycling Database: {0}".format(project))
with JobsModel(project) as jm:
os_deleted, rs_deleted = jm.cycle_data(os_cycle_interval,
cycle_interval,
options['os_chunk_size'],
options['chunk_size'],
options['sleep_time'])
self.debug("Deleted {} objectstore rows and {} resultsets from {}".format(
os_deleted, rs_deleted, project))
rs_deleted = jm.cycle_data(cycle_interval,
options['chunk_size'],
options['sleep_time'])
self.debug("Deleted {} resultsets from {}".format(rs_deleted, project))
def debug(self, msg):
if self.is_debug:

Просмотреть файл

@ -24,7 +24,7 @@ class Command(BaseCommand):
def handle(self, *args, **options):
if options["reset"]:
confirm = input("""You have requested an init of the datasources.
This will IRREVERSIBLY DESTROY all data in the jobs and objectstore databases.
This will IRREVERSIBLY DESTROY all data in the jobs database.
Are you sure you want to do this?
Type 'yes' to continue, or 'no' to cancel: """)
@ -34,9 +34,5 @@ Type 'yes' to continue, or 'no' to cancel: """)
projects = Repository.objects.filter(active_status='active').values_list('name', flat=True)
for project in projects:
for contenttype in ("jobs", "objectstore"):
Datasource.objects.get_or_create(
contenttype=contenttype,
project=project,
)
Datasource.objects.get_or_create(contenttype="jobs", project=project)
Datasource.reset_cache()

Просмотреть файл

@ -1,22 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
from optparse import make_option
from django.core.management.base import BaseCommand
from treeherder.model.tasks import process_objects
class Command(BaseCommand):
help = """Process a number of objects with status 'ready' in the objectstore"""
option_list = BaseCommand.option_list + (
make_option('--limit',
action='store',
dest='limit',
default=None,
help='Limit the number of objects to process'),
)
def handle(self, *args, **options):
process_objects.delay(limit=options['limit'])

Просмотреть файл

@ -75,9 +75,9 @@ class Command(BaseCommand):
jm.store_performance_series(time_interval, 'talos_data',
str(new_hash), series)
jm.jobs_execute(proc='jobs.deletes.delete_performance_series',
jm.execute(proc='jobs.deletes.delete_performance_series',
placeholders=[signature_hash])
jm.jobs_execute(proc='jobs.deletes.delete_series_signature',
jm.execute(proc='jobs.deletes.delete_series_signature',
placeholders=[signature_hash])
return new_hash

Просмотреть файл

@ -24,15 +24,6 @@ class Command(BaseCommand):
default='all',
help='A comma separated list of datasources to execute the sql code on'),
make_option(
'--data-type',
action='store',
dest='data_type',
default='jobs',
choices=['jobs', 'objectstore'],
help=('The target data-type of the sql code (jobs or objectstore, '
'default jobs)')),
make_option(
'-s', '--sql-statement',
action='store',
@ -63,7 +54,7 @@ class Command(BaseCommand):
self.stdout.write("SQL command: {}".format(sql_code))
datasources = Datasource.objects.filter(contenttype=options['data_type'])
datasources = Datasource.objects.filter(contenttype='jobs')
if options['datasources'] != 'all':
if ',' in options['datasources']:
datasources = datasources.filter(

Просмотреть файл

@ -222,12 +222,8 @@ class Datasource(models.Model):
if '-' in self.name:
self.name = self.name.replace('-', '_')
self.oauth_consumer_key = None
self.oauth_consumer_secret = None
if self.contenttype == 'objectstore':
self.oauth_consumer_key = uuid.uuid4()
self.oauth_consumer_secret = uuid.uuid4()
self.oauth_consumer_key = uuid.uuid4()
self.oauth_consumer_secret = uuid.uuid4()
# validate the model before saving
self.full_clean()

Просмотреть файл

@ -1,197 +0,0 @@
{
"deletes":{
"cycle_objectstore":{
"sql":"DELETE
FROM objectstore
WHERE `processed_state` = 'complete'
AND `loaded_timestamp` < ?
ORDER BY `id`
LIMIT ?",
"host_type": "master_host"
}
},
"inserts":{
"store_json":{
"sql":"INSERT INTO `objectstore` (`loaded_timestamp`,
`job_guid`,
`json_blob`,
`error`,
`error_msg`)
SELECT ?, ?, ?, ?, ?
FROM DUAL
WHERE NOT EXISTS (
SELECT `job_guid` from `objectstore`
WHERE `job_guid` = ?
)",
"host_type":"master_host"
}
},
"selects":{
"get_claimed":{
"sql":"SELECT `json_blob`, `id`
FROM `objectstore`
WHERE `worker_id` = CONNECTION_ID()
AND `processed_state` = 'loading'
AND `error` = 'N'",
"host_type":"master_host"
},
"get_num_unprocessed":{
"sql":"SELECT COUNT(*) as count
FROM `objectstore`
WHERE `processed_state` = 'ready'
AND `error` = 'N'",
"host_type":"master_host"
},
"get_unprocessed":{
"sql":"SELECT `json_blob`, `id`
FROM `objectstore`
WHERE `processed_state` = 'ready'
AND `error` = 'N'
LIMIT ?",
"host_type":"master_host"
},
"get_all_errors":{
"sql":"SELECT `json_blob`, `id`
FROM `objectstore`
WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN ? AND ?",
"host_type":"read_host"
},
"get_error_metadata":{
"sql":"SELECT `id`, job_guid, loaded_timestamp, processed_state, error_msg, worker_id
FROM `objectstore`
WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN ? AND ?",
"host_type":"read_host"
},
"get_json_blob":{
"sql":"SELECT json_blob,
error_msg,
error,
processed_state,
loaded_timestamp,
job_guid
FROM `objectstore` WHERE `id` = ?",
"host_type":"read_host"
},
"get_json_blob_by_guid":{
"sql":"SELECT json_blob,
error_msg,
error,
processed_state,
loaded_timestamp,
job_guid
FROM `objectstore` WHERE `job_guid` = ?",
"host_type":"read_host"
},
"get_json_blob_list":{
"sql":"SELECT json_blob,
error_msg,
error,
processed_state,
loaded_timestamp,
job_guid
FROM `objectstore`
ORDER BY loaded_timestamp DESC
LIMIT ?,?",
"host_type":"read_host"
},
"get_json_blob_for_test_run":{
"sql":"SELECT json_blob, error_msg, error
FROM `objectstore`
WHERE `job_guid` IN (REP0)",
"host_type":"read_host"
},
"get_error_counts":{
"sql":"SELECT
(CASE
WHEN error_msg LIKE 'Malformed JSON%'
THEN 'Malformed JSON'
ELSE 'Other'
END) AS message, count(id) AS count
FROM `objectstore`
WHERE `error` = 'Y'
AND loaded_timestamp BETWEEN REP0 AND REP1
GROUP BY message",
"host_type":"read_host"
}
},
"updates":{
"mark_loading":{
"sql":"UPDATE `objectstore`
SET `processed_state` = 'loading',
`worker_id` = CONNECTION_ID()
WHERE `processed_state` = 'ready'
AND `error` = 'N'
ORDER BY `id`
LIMIT ?
",
"host_type":"master_host"
},
"mark_complete":{
"sql":"UPDATE `objectstore`
SET `processed_state` = 'complete',
`revision_hash` = ?
WHERE `processed_state` = 'loading'
AND `id` = ?
AND `worker_id` = CONNECTION_ID()
",
"host_type":"master_host"
},
"mark_error":{
"sql":"UPDATE `objectstore`
SET `processed_state` = 'ready',
`worker_id` = NULL,
`error` = 'Y',
`error_msg` = ?
WHERE `processed_state` = 'loading'
AND `id` = ?
AND `worker_id` = CONNECTION_ID()
",
"host_type":"master_host"
}
}
}

Просмотреть файл

@ -1,81 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
--
-- Host: localhost Database: project_objectstore_1
-- ------------------------------------------------------
-- Server version 5.6.10
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--
-- Table structure for table `objectstore`
--
DROP TABLE IF EXISTS `objectstore`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
/**************************
* Table: objectstore
*
* An object store for the incoming JSON structures described in
* sample_data/job_data.json.sample. These structures are transfered to
* project_jobs.sql.tmpl and treeherder_reference_1.sql.tmpl by a
* scheduled job.
*
* Population Method: dynamic from incoming data
*
* Example Data:
*
* job_guid - Referenced project_jobs_1.job.guid
* revision_hash - Hash of any number of revisions associated with the result set.
* loaded_timestamp - Timestamp when the structure was first loaded.
* processed_state - ready | loading | complete
* ready - Object ready for processing
* loading - Object in the process of loading
* complete - Object processing is complete.
* error - N | Y, if yes there may be a error_msg
* error_msg - Any error messages associated with processing the JSON into
* the reference and project job schemas.
* json_blob - The JSON blob.
* worker_id - Identifier for worker process transfering the data.
**************************/
CREATE TABLE `objectstore` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
`job_guid` varchar(50) COLLATE utf8_bin NOT NULL,
`revision_hash` varchar(50) COLLATE utf8_bin DEFAULT NULL,
`loaded_timestamp` int(11) unsigned NOT NULL,
`processed_state` enum('ready','loading','complete') COLLATE utf8_bin DEFAULT 'ready',
`error` enum('N','Y') COLLATE utf8_bin DEFAULT 'N',
`error_msg` mediumtext COLLATE utf8_bin,
`json_blob` mediumblob,
`worker_id` int(11) unsigned DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `idx_job_id` (`job_guid`),
KEY `idx_processed_state` (`processed_state`),
KEY `idx_error` (`error`),
KEY `idx_worker_id` (`worker_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
/*!40101 SET character_set_client = @saved_cs_client */;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2013-02-27 8:55:27

Просмотреть файл

@ -7,7 +7,7 @@ from celery import task
from django.core.management import call_command
from django.conf import settings
from treeherder.model.models import Datasource, Repository
from treeherder.model.models import Repository
from treeherder.model.exchanges import TreeherderPublisher
from treeherder.model.pulse_publisher import load_schemas
from treeherder.model.error_summary import load_error_summary
@ -44,28 +44,6 @@ class LazyPublisher():
pulse_connection = LazyPublisher()
@task(name='process-objects')
def process_objects(limit=None, project=None):
"""
Process a number of objects from the objectstore
and load them to the jobs store
"""
from treeherder.model.derived.jobs import JobsModel
# default limit to 100
limit = limit or 100
if project:
projects_to_process = [project]
else:
projects_to_process = Datasource.objects.values_list(
'project', flat=True).distinct()
for project in projects_to_process:
with JobsModel(project) as jm:
jm.process_objects(limit)
# Run a maximum of 1 per hour
@task(name='cycle-data', rate_limit='1/h')
def cycle_data():

Просмотреть файл

@ -31,7 +31,6 @@ TREEHERDER_PERF_SERIES_TIME_RANGES = [
]
DATA_CYCLE_INTERVAL = timedelta(days=30 * 4)
OBJECTSTORE_CYCLE_INTERVAL = timedelta(days=1)
RABBITMQ_USER = os.environ.get("TREEHERDER_RABBITMQ_USER", "guest")
RABBITMQ_PASSWORD = os.environ.get("TREEHERDER_RABBITMQ_PASSWORD", "guest")
@ -194,7 +193,6 @@ CELERY_QUEUES = (
Queue('buildapi_pending', Exchange('default'), routing_key='buildapi_pending'),
Queue('buildapi_running', Exchange('default'), routing_key='buildapi_running'),
Queue('buildapi_4hr', Exchange('default'), routing_key='buildapi_4hr'),
Queue('process_objects', Exchange('default'), routing_key='process_objects'),
Queue('cycle_data', Exchange('default'), routing_key='cycle_data'),
Queue('calculate_eta', Exchange('default'), routing_key='calculate_eta'),
Queue('populate_performance_series', Exchange('default'), routing_key='populate_performance_series'),
@ -243,14 +241,6 @@ CELERYBEAT_SCHEDULE = {
"queue": "buildapi_4hr"
}
},
'fetch-process-objects-every-minute': {
'task': 'process-objects',
'schedule': timedelta(minutes=1),
'relative': True,
'options': {
'queue': 'process_objects'
}
},
'cycle-data-every-day': {
'task': 'cycle-data',
'schedule': timedelta(days=1),
@ -292,7 +282,6 @@ REST_FRAMEWORK = {
),
'DEFAULT_THROTTLE_RATES': {
'jobs': '220/minute',
'objectstore': '220/minute',
'resultset': '220/minute'
}
}

Просмотреть файл

@ -167,9 +167,7 @@ class JobsViewSet(viewsets.ViewSet):
def create(self, request, project, jm):
"""
This method adds a job to a given resultset.
The incoming data has the same structure as for
the objectstore ingestion.
"""
jm.load_job_data(request.DATA)
jm.store_job_data(request.DATA)
return Response({'message': 'Job successfully updated'})

Просмотреть файл

@ -2,8 +2,6 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import simplejson as json
from rest_framework import viewsets
from rest_framework.response import Response
from treeherder.webapp.api.utils import (with_jobs,
@ -18,7 +16,7 @@ class ObjectstoreViewSet(viewsets.ViewSet):
Update will not be implemented as JobModel will always do
a conditional create and then an update.
"""
throttle_scope = 'objectstore'
throttle_scope = 'jobs'
@with_jobs
@oauth_required
@ -33,31 +31,10 @@ class ObjectstoreViewSet(viewsets.ViewSet):
/jobs/ create endpoint for backward compatibility with previous
versions of the Treeherder client and api.
"""
jm.load_job_data(request.DATA)
jm.store_job_data(request.DATA)
return Response('DEPRECATED: {} {}: {}'.format(
"This API will be removed soon.",
"Please change to using",
"/api/project/{}/jobs/".format(project)
))
@with_jobs
def retrieve(self, request, project, jm, pk=None):
"""
GET method implementation for detail view
"""
obj = jm.get_json_blob_by_guid(pk)
if obj:
return Response(json.loads(obj[0]['json_blob']))
else:
return Response("No objectstore entry with guid: {0}".format(pk), 404)
@with_jobs
def list(self, request, project, jm):
"""
GET method implementation for list view
"""
offset = int(request.QUERY_PARAMS.get('offset', 0))
count = min(int(request.QUERY_PARAMS.get('count', 10)), 1000)
objs = jm.get_json_blob_list(offset, count)
return Response([json.loads(obj['json_blob']) for obj in objs])