Merge pull request #268 from mozilla/mysql-retry

Mysql retry
This commit is contained in:
Mauro Doglio 2014-10-30 15:07:33 +00:00
Родитель d7f9350560 9d098e8ade
Коммит 23ee49570d
8 изменённых файлов: 173 добавлений и 143 удалений

Просмотреть файл

@ -516,3 +516,34 @@ def test_ingest_job_with_updated_job_group(jm, refdata, sample_data, initial_dat
second_job_group_name = second_job_stored[0]["job_group_name"]
assert first_job_group_name == second_job_group_name
def test_retry_on_operational_failure(jm, initial_data, monkeypatch):
"""Test that we retry 20 times on operational failures"""
from _mysql_exceptions import OperationalError
from treeherder.model import utils
from datasource.bases.SQLHub import SQLHub
orig_retry_execute = utils.retry_execute
retry_count = {'num': 0}
def retry_execute_mock(dhub, logger, retries=0, **kwargs):
retry_count['num'] = retries
#if it goes beyond 20, we may be in an infinite retry loop
assert retries <= 20
return orig_retry_execute(dhub, logger, retries, **kwargs)
monkeypatch.setattr(utils, "retry_execute", retry_execute_mock)
def execute_mock(*args, **kwargs):
raise OperationalError("got exception")
monkeypatch.setattr(SQLHub, "execute", execute_mock)
try:
jm.get_job_list(0, 10)
except OperationalError:
assert True
assert retry_count['num'] == 20

Просмотреть файл

@ -94,6 +94,10 @@ class TbplBugRequest(object):
'name': set([("=", "buildapi_complete")])
})[0]
job_data = jm.get_job(self.job_id)[0]
except IndexError:
logger.exception(("Unable to find buildapi_complete artifact for "
"job_id: {0}").format(self.job_id))
raise
finally:
jm.disconnect()

Просмотреть файл

@ -3,6 +3,8 @@
access.
"""
import logging
from django.conf import settings
from treeherder.model.models import Datasource
@ -14,6 +16,7 @@ class TreeherderModelBase(object):
Base model class for all derived models
"""
logger = logging.getLogger(__name__)
def __init__(self, project):
"""Encapsulate the dataset access for this ``project`` """
@ -103,22 +106,6 @@ class TreeherderModelBase(object):
args=[self.project, ids, data_type]
)
def get_row_by_id(self, contenttype, table_name, obj_id):
"""
Given an ``id`` get the row for that item.
Return none if not found
"""
proc = "generic.selects.get_row_by_id"
iter_obj = self.get_dhub(contenttype).execute(
proc=proc,
replace=[table_name],
placeholders=[obj_id],
debug_show=self.DEBUG,
return_type='iter',
)
return self.as_single(iter_obj, table_name, id=obj_id)
def disconnect(self):
"""Iterate over and disconnect all data sources."""
self.refdata_model.disconnect()

Просмотреть файл

@ -12,7 +12,6 @@ from django.conf import settings
from django.core.cache import cache
from treeherder.model.models import (Datasource,
ExclusionProfile,
ReferenceDataSignatures)
from treeherder.model import utils
@ -23,13 +22,12 @@ from treeherder.etl.common import get_guid_root
from .base import TreeherderModelBase, ObjectNotFoundException
from datasource.DataHub import DataHub
from treeherder.etl.perf_data_adapters import (PerformanceDataAdapter,
TalosDataAdapter)
logger = logging.getLogger(__name__)
class JobsModel(TreeherderModelBase):
"""
Represent a job repository with objectstore
@ -182,6 +180,9 @@ class JobsModel(TreeherderModelBase):
"""Get the dhub for jobs"""
return self.get_dhub(self.CT_JOBS)
def jobs_execute(self, **kwargs):
return utils.retry_execute(self.get_jobs_dhub(), logger, **kwargs)
##################
#
# Job schema data methods
@ -192,6 +193,9 @@ class JobsModel(TreeherderModelBase):
"""Get the dhub for the objectstore"""
return self.get_dhub(self.CT_OBJECTSTORE)
def os_execute(self, **kwargs):
return utils.retry_execute(self.get_os_dhub(), logger, **kwargs)
def get_build_system_type(self, project=None):
if not project:
project = self.project
@ -209,7 +213,7 @@ class JobsModel(TreeherderModelBase):
def get_job(self, id):
"""Return the job row for this ``job_id``"""
repl = [self.refdata_model.get_db_name()]
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_job",
placeholders=[id],
debug_show=self.DEBUG,
@ -238,7 +242,7 @@ class JobsModel(TreeherderModelBase):
proc = "jobs.selects.get_job_list_full"
else:
proc = "jobs.selects.get_job_list"
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc=proc,
replace=repl,
placeholders=placeholders,
@ -279,7 +283,7 @@ class JobsModel(TreeherderModelBase):
def set_state(self, job_id, state):
"""Update the state of an existing job"""
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.set_state',
placeholders=[state, job_id],
debug_show=self.DEBUG
@ -287,7 +291,7 @@ class JobsModel(TreeherderModelBase):
def get_incomplete_job_guids(self, resultset_id):
"""Get list of ids for jobs of resultset that are not in complete state."""
return self.get_jobs_dhub().execute(
return self.jobs_execute(
proc='jobs.selects.get_incomplete_job_guids',
placeholders=[resultset_id],
debug_show=self.DEBUG,
@ -300,7 +304,7 @@ class JobsModel(TreeherderModelBase):
"""Set all pending/running jobs in resultset to usercancel."""
jobs = list(self.get_incomplete_job_guids(resultset_id))
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.cancel_all',
placeholders=[resultset_id],
debug_show=self.DEBUG
@ -315,7 +319,7 @@ class JobsModel(TreeherderModelBase):
def cancel_job(self, job_guid):
"""Set job to usercancel."""
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.cancel_job',
placeholders=[job_guid],
debug_show=self.DEBUG
@ -329,7 +333,7 @@ class JobsModel(TreeherderModelBase):
def get_log_references(self, job_id):
"""Return the log references for the given ``job_id``."""
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_log_references",
placeholders=[job_id],
debug_show=self.DEBUG,
@ -343,7 +347,7 @@ class JobsModel(TreeherderModelBase):
This is everything about the artifact, but not the artifact blob
itself.
"""
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_job_artifact_references",
placeholders=[job_id],
debug_show=self.DEBUG,
@ -367,7 +371,7 @@ class JobsModel(TreeherderModelBase):
proc = "jobs.selects.get_job_artifact"
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc=proc,
replace=repl,
placeholders=placeholders,
@ -397,7 +401,7 @@ class JobsModel(TreeherderModelBase):
proc = "jobs.selects.get_performance_artifact_list"
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc=proc,
replace=repl,
placeholders=placeholders,
@ -421,7 +425,7 @@ class JobsModel(TreeherderModelBase):
def get_job_note(self, id):
"""Return the job note by id."""
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_job_note",
placeholders=[id],
debug_show=self.DEBUG,
@ -430,7 +434,7 @@ class JobsModel(TreeherderModelBase):
def get_job_note_list(self, job_id):
"""Return the job notes by job_id."""
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_job_note_list",
placeholders=[job_id],
debug_show=self.DEBUG,
@ -444,7 +448,7 @@ class JobsModel(TreeherderModelBase):
default value
"""
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.update_last_job_classification',
placeholders=[
job_id,
@ -454,7 +458,7 @@ class JobsModel(TreeherderModelBase):
def insert_job_note(self, job_id, failure_classification_id, who, note):
"""insert a new note for a job and updates its failure classification"""
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.insert_note',
placeholders=[
job_id,
@ -491,7 +495,7 @@ class JobsModel(TreeherderModelBase):
Delete a job note and updates the failure classification for that job
"""
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.deletes.delete_note',
placeholders=[
note_id,
@ -506,7 +510,7 @@ class JobsModel(TreeherderModelBase):
Store a new relation between the given job and bug ids.
"""
try:
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.insert_bug_job_map',
placeholders=[
job_id,
@ -566,7 +570,7 @@ class JobsModel(TreeherderModelBase):
"""
Delete a bug-job entry identified by bug_id and job_id
"""
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.deletes.delete_bug_job_map',
placeholders=[
job_id,
@ -578,7 +582,7 @@ class JobsModel(TreeherderModelBase):
def calculate_eta(self, sample_window_seconds, debug):
# Get the most recent timestamp from jobs
max_timestamp = self.get_jobs_dhub().execute(
max_timestamp = self.jobs_execute(
proc='jobs.selects.get_max_job_submit_timestamp',
return_type='iter',
debug_show=self.DEBUG
@ -588,7 +592,7 @@ class JobsModel(TreeherderModelBase):
time_window = int(max_timestamp) - sample_window_seconds
eta_groups = self.get_jobs_dhub().execute(
eta_groups = self.jobs_execute(
proc='jobs.selects.get_eta_groups',
placeholders=[time_window],
key_column='signature',
@ -640,7 +644,7 @@ class JobsModel(TreeherderModelBase):
submit_timestamp
])
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.set_job_eta',
placeholders=placeholders,
executemany=True,
@ -671,7 +675,7 @@ class JobsModel(TreeherderModelBase):
min_date = int(time.time() - self.DATA_CYCLE_INTERVAL)
# Retrieve list of result sets to delete
result_set_data = self.get_jobs_dhub().execute(
result_set_data = self.jobs_execute(
proc='jobs.selects.get_result_sets_to_cycle',
placeholders=[min_date],
debug_show=self.DEBUG
@ -685,7 +689,7 @@ class JobsModel(TreeherderModelBase):
rs_where_in_clause = [ ','.join( ['%s'] * len(rs_placeholders) ) ]
# Retrieve list of revisions associated with result sets
revision_data = self.get_jobs_dhub().execute(
revision_data = self.jobs_execute(
proc='jobs.selects.get_revision_ids_to_cycle',
placeholders=rs_placeholders,
replace=rs_where_in_clause,
@ -696,7 +700,7 @@ class JobsModel(TreeherderModelBase):
rev_where_in_clause = [ ','.join( ['%s'] * len(rev_placeholders) ) ]
# Retrieve list of jobs associated with result sets
job_data = self.get_jobs_dhub().execute(
job_data = self.jobs_execute(
proc='jobs.selects.get_jobs_to_cycle',
placeholders=rs_placeholders,
replace=rs_where_in_clause,
@ -833,7 +837,7 @@ class JobsModel(TreeherderModelBase):
proc = "jobs.selects.get_bug_job_map_list"
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc=proc,
replace=repl,
placeholders=placeholders,
@ -848,7 +852,7 @@ class JobsModel(TreeherderModelBase):
Raises a ObjectNotFoundException when no object is found
"""
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_bug_job_map_detail",
placeholders=[job_id, bug_id],
debug_show=self.DEBUG,
@ -878,7 +882,7 @@ class JobsModel(TreeherderModelBase):
result_set_id_lookup = {}
if revision_hashes:
result_set_id_lookup = self.get_jobs_dhub().execute(
result_set_id_lookup = self.jobs_execute(
proc='jobs.selects.get_result_set_ids',
placeholders=revision_hashes,
replace=[where_in_list],
@ -898,7 +902,7 @@ class JobsModel(TreeherderModelBase):
proc = "jobs.selects.get_result_set_list_by_ids"
result_set_ids = self.get_jobs_dhub().execute(
result_set_ids = self.jobs_execute(
proc=proc,
replace=[replace_str],
placeholders=placeholders,
@ -931,7 +935,7 @@ class JobsModel(TreeherderModelBase):
# Retrieve the filtered/limited list of result sets
proc = "jobs.selects.get_result_set_list"
result_set_ids = self.get_jobs_dhub().execute(
result_set_ids = self.jobs_execute(
proc=proc,
replace=[replace_str],
placeholders=placeholders,
@ -992,7 +996,7 @@ class JobsModel(TreeherderModelBase):
replacement = " AND revision IN ("+replacement+") "
proc = "jobs.selects.get_revision_resultset_lookup"
lookups = self.get_jobs_dhub().execute(
lookups = self.jobs_execute(
proc=proc,
placeholders=revision_list+[0, len(revision_list)],
debug_show=self.DEBUG,
@ -1009,7 +1013,7 @@ class JobsModel(TreeherderModelBase):
"""
proc = "jobs.selects.get_result_set_details"
lookups = self.get_jobs_dhub().execute(
lookups = self.jobs_execute(
proc=proc,
debug_show=self.DEBUG,
placeholders=[result_set_id],
@ -1042,7 +1046,7 @@ class JobsModel(TreeherderModelBase):
# Retrieve revision details associated with each result_set_id
detail_proc = "jobs.selects.get_result_set_details"
result_set_details = self.get_jobs_dhub().execute(
result_set_details = self.jobs_execute(
proc=detail_proc,
placeholders=ids,
debug_show=self.DEBUG,
@ -1093,7 +1097,7 @@ class JobsModel(TreeherderModelBase):
proc = "jobs.selects.get_result_set_job_list_full"
else:
proc = "jobs.selects.get_result_set_job_list"
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc=proc,
placeholders=result_set_ids,
debug_show=self.DEBUG,
@ -1124,7 +1128,7 @@ class JobsModel(TreeherderModelBase):
repl.append(','.join(id_placeholders))
proc = "jobs.selects.get_result_set_push_timestamp"
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc=proc,
placeholders=result_set_ids,
debug_show=self.DEBUG,
@ -1185,7 +1189,7 @@ class JobsModel(TreeherderModelBase):
if obj_placeholders:
# this query inserts the object if its guid is not present,
# otherwise it does nothing
self.get_os_dhub().execute(
self.os_execute(
proc='objectstore.inserts.store_json',
placeholders=obj_placeholders,
executemany=True,
@ -1205,7 +1209,7 @@ class JobsModel(TreeherderModelBase):
"""
proc = "objectstore.selects.get_unprocessed"
json_blobs = self.get_os_dhub().execute(
json_blobs = self.os_execute(
proc=proc,
placeholders=[limit],
debug_show=self.DEBUG,
@ -1455,7 +1459,7 @@ class JobsModel(TreeherderModelBase):
get_guid_root(row[-1])
]['id']
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.update_job_data',
debug_show=self.DEBUG,
placeholders=job_update_placeholders,
@ -1467,7 +1471,7 @@ class JobsModel(TreeherderModelBase):
# set the job_coalesced_to_guid column for any coalesced
# job found
if coalesced_job_guid_placeholders:
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.update_coalesced_guids',
debug_show=self.DEBUG,
placeholders=coalesced_job_guid_placeholders,
@ -1541,7 +1545,7 @@ class JobsModel(TreeherderModelBase):
replacement = ' OR '.join(state_clauses)
if placeholders:
existing_guids = self.get_jobs_dhub().execute(
existing_guids = self.jobs_execute(
proc='jobs.selects.get_job_guids_in_states',
placeholders=placeholders,
replace=[replacement],
@ -1843,7 +1847,7 @@ class JobsModel(TreeherderModelBase):
return {}
# Store job data
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.create_job_data',
debug_show=self.DEBUG,
placeholders=job_placeholders,
@ -1860,7 +1864,7 @@ class JobsModel(TreeherderModelBase):
rds_where_in_clause = ','.join( ['%s'] * len(reference_data_signatures) )
job_eta_data = self.get_jobs_dhub().execute(
job_eta_data = self.jobs_execute(
proc='jobs.selects.get_last_eta_by_signatures',
debug_show=self.DEBUG,
replace=[rds_where_in_clause],
@ -1885,7 +1889,7 @@ class JobsModel(TreeherderModelBase):
job_guid_where_in_clause = ",".join(["%s"] * len(job_guid_list))
job_id_lookup = self.get_jobs_dhub().execute(
job_id_lookup = self.jobs_execute(
proc='jobs.selects.get_job_ids_by_guids',
debug_show=self.DEBUG,
replace=[job_guid_where_in_clause],
@ -1931,7 +1935,7 @@ class JobsModel(TreeherderModelBase):
tasks.append(task)
# Store the log references
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.set_job_log_url',
debug_show=self.DEBUG,
placeholders=log_placeholders,
@ -1963,7 +1967,7 @@ class JobsModel(TreeherderModelBase):
)
def get_job_log_url_detail(self, job_log_url_id):
obj = self.get_jobs_dhub().execute(
obj = self.jobs_execute(
proc='jobs.selects.get_job_log_url_detail',
debug_show=self.DEBUG,
placeholders=[job_log_url_id])
@ -1982,7 +1986,7 @@ class JobsModel(TreeherderModelBase):
id_placeholders = ["%s"] * len(job_ids)
replacement.append(','.join(id_placeholders))
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_job_log_url_list",
placeholders=job_ids,
replace=replacement,
@ -1993,7 +1997,7 @@ class JobsModel(TreeherderModelBase):
def update_job_log_url_status(self, job_log_url_id,
parse_status, parse_timestamp):
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.update_job_log_url',
debug_show=self.DEBUG,
placeholders=[parse_status, parse_timestamp, job_log_url_id])
@ -2004,7 +2008,7 @@ class JobsModel(TreeherderModelBase):
Store a list of job_artifacts given a list of placeholders
"""
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.set_job_artifact',
debug_show=self.DEBUG,
placeholders=artifact_placeholders,
@ -2016,7 +2020,7 @@ class JobsModel(TreeherderModelBase):
placeholders = signatures
placeholders.append(str(interval_seconds))
data = self.get_jobs_dhub().execute(
data = self.jobs_execute(
proc="jobs.selects.get_performance_series_from_signatures",
debug_show=self.DEBUG,
placeholders=placeholders,
@ -2039,7 +2043,7 @@ class JobsModel(TreeherderModelBase):
props = [el for x in props.items() for el in x]
props.extend(props)
signatures = self.get_jobs_dhub().execute(
signatures = self.jobs_execute(
proc="jobs.selects.get_signatures_from_properties",
debug_show=self.DEBUG,
placeholders=props,
@ -2052,7 +2056,7 @@ class JobsModel(TreeherderModelBase):
signatures_repl = [ ','.join( ['%s'] * len(signatures) ) ]
properties = self.get_jobs_dhub().execute(
properties = self.jobs_execute(
proc="jobs.selects.get_all_properties_of_signatures",
debug_show=self.DEBUG,
placeholders=signatures,
@ -2076,7 +2080,7 @@ class JobsModel(TreeherderModelBase):
jobs_signatures_where_in_clause = [ ','.join( ['%s'] * len(job_ids) ) ]
job_data = self.get_jobs_dhub().execute(
job_data = self.jobs_execute(
proc='jobs.selects.get_signature_list_from_job_ids',
debug_show=self.DEBUG,
replace=jobs_signatures_where_in_clause,
@ -2126,13 +2130,13 @@ class JobsModel(TreeherderModelBase):
# adapt and load data into placeholder structures
tda.adapt_and_load(ref_data, job_data, perf_data)
self.get_jobs_dhub().execute(
self.jobs_execute(
proc="jobs.inserts.set_performance_artifact",
debug_show=self.DEBUG,
placeholders=tda.performance_artifact_placeholders,
executemany=True)
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.set_series_signature',
debug_show=self.DEBUG,
placeholders=tda.signature_property_placeholders,
@ -2150,7 +2154,7 @@ class JobsModel(TreeherderModelBase):
# overwriting each other's blobs. The lock incorporates the time
# interval and signature combination and is specific to a single
# json blob.
lock = self.get_jobs_dhub().execute(
lock = self.jobs_execute(
proc='generic.locks.get_lock',
debug_show=self.DEBUG,
placeholders=[lock_string, 60])
@ -2172,13 +2176,13 @@ class JobsModel(TreeherderModelBase):
series_data_json, t_range, signature
]
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.inserts.set_performance_series',
debug_show=self.DEBUG,
placeholders=insert_placeholders)
# Retrieve and update the series
performance_series = self.get_jobs_dhub().execute(
performance_series = self.jobs_execute(
proc='jobs.selects.get_performance_series',
debug_show=self.DEBUG,
placeholders=[t_range, signature])
@ -2212,7 +2216,7 @@ class JobsModel(TreeherderModelBase):
t_range, signature
]
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='jobs.updates.update_performance_series',
debug_show=self.DEBUG,
placeholders=update_placeholders)
@ -2224,7 +2228,7 @@ class JobsModel(TreeherderModelBase):
finally:
# Make sure we release the lock no matter what errors
# are generated
self.get_jobs_dhub().execute(
self.jobs_execute(
proc='generic.locks.release_lock',
debug_show=self.DEBUG,
placeholders=[lock_string])
@ -2414,7 +2418,7 @@ class JobsModel(TreeherderModelBase):
# Note: this claims rows for processing. Failure to call load_job_data
# on this data will result in some json blobs being stuck in limbo
# until another worker comes along with the same connection ID.
self.get_os_dhub().execute(
self.os_execute(
proc=proc_mark,
placeholders=[limit],
debug_show=self.DEBUG,
@ -2424,7 +2428,7 @@ class JobsModel(TreeherderModelBase):
# Return all JSON blobs claimed by this connection ID (could possibly
# include orphaned rows from a previous run).
json_blobs = self.get_os_dhub().execute(
json_blobs = self.os_execute(
proc=proc_get,
debug_show=self.DEBUG,
return_type='tuple'
@ -2442,7 +2446,7 @@ class JobsModel(TreeherderModelBase):
]
"""
if object_placeholders:
self.get_os_dhub().execute(
self.os_execute(
proc="objectstore.updates.mark_complete",
placeholders=object_placeholders,
executemany=True,
@ -2451,7 +2455,7 @@ class JobsModel(TreeherderModelBase):
def mark_object_error(self, object_id, error):
""" Call to database to mark the task completed """
self.get_os_dhub().execute(
self.os_execute(
proc="objectstore.updates.mark_error",
placeholders=[error, object_id],
debug_show=self.DEBUG
@ -2459,7 +2463,7 @@ class JobsModel(TreeherderModelBase):
def get_json_blob_by_guid(self, guid):
"""retrieves a json_blob given its guid"""
data = self.get_os_dhub().execute(
data = self.os_execute(
proc="objectstore.selects.get_json_blob_by_guid",
placeholders=[guid],
debug_show=self.DEBUG,
@ -2472,7 +2476,7 @@ class JobsModel(TreeherderModelBase):
Mainly used by the restful api to list the last blobs stored
"""
proc = "objectstore.selects.get_json_blob_list"
json_blobs = self.get_os_dhub().execute(
json_blobs = self.os_execute(
proc=proc,
placeholders=[offset, limit],
debug_show=self.DEBUG,
@ -2527,8 +2531,6 @@ class JobsModel(TreeherderModelBase):
# revision_map structures
revision_to_rhash_lookup = dict()
dhub = self.get_jobs_dhub()
# TODO: Confirm whether we need to do a lookup in this loop in the
# memcache to reduce query overhead
for result in result_sets:
@ -2591,7 +2593,7 @@ class JobsModel(TreeherderModelBase):
# in the list of unique_revision_hashes. Use it to determine the new
# result_sets found to publish to pulse.
where_in_clause = ','.join(where_in_list)
result_set_ids_before = dhub.execute(
result_set_ids_before = self.jobs_execute(
proc='jobs.selects.get_result_set_ids',
placeholders=unique_revision_hashes,
replace=[where_in_clause],
@ -2601,17 +2603,17 @@ class JobsModel(TreeherderModelBase):
)
# Insert new result sets
dhub.execute(
self.jobs_execute(
proc='jobs.inserts.set_result_set',
placeholders=revision_hash_placeholders,
executemany=True,
debug_show=self.DEBUG
)
lastrowid = dhub.connection['master_host']['cursor'].lastrowid
lastrowid = self.get_jobs_dhub().connection['master_host']['cursor'].lastrowid
# Retrieve new and already existing result set ids
result_set_id_lookup = dhub.execute(
result_set_id_lookup = self.jobs_execute(
proc='jobs.selects.get_result_set_ids',
placeholders=unique_revision_hashes,
replace=[where_in_clause],
@ -2639,7 +2641,7 @@ class JobsModel(TreeherderModelBase):
)
# Insert new revisions
dhub.execute(
self.jobs_execute(
proc='jobs.inserts.set_revision',
placeholders=revision_placeholders,
executemany=True,
@ -2648,7 +2650,7 @@ class JobsModel(TreeherderModelBase):
# Retrieve new revision ids
rev_where_in_clause = ','.join(rev_where_in_list)
revision_id_lookup = dhub.execute(
revision_id_lookup = self.jobs_execute(
proc='jobs.selects.get_revisions',
placeholders=all_revisions,
replace=[rev_where_in_clause],
@ -2673,7 +2675,7 @@ class JobsModel(TreeherderModelBase):
)
# Insert new revision_map entries
dhub.execute(
self.jobs_execute(
proc='jobs.inserts.set_revision_map',
placeholders=revision_map_placeholders,
executemany=True,

Просмотреть файл

@ -1,4 +1,5 @@
import os
import logging
from hashlib import sha1
import time
from datetime import timedelta, datetime
@ -7,6 +8,9 @@ from django.conf import settings
from datasource.bases.BaseHub import BaseHub
from datasource.DataHub import DataHub
from treeherder.model import utils
logger = logging.getLogger(__name__)
class RefDataManager(object):
"""Model for reference data"""
@ -99,6 +103,9 @@ class RefDataManager(object):
def disconnect(self):
self.dhub.disconnect()
def execute(self, **kwargs):
return utils.retry_execute(self.dhub, logger, **kwargs)
def set_all_reference_data(self):
"""This method executes SQL to store data in all loaded reference
data structures. It returns lookup dictionaries where the key is
@ -546,7 +553,7 @@ class RefDataManager(object):
insert_proc = 'reference.inserts.create_reference_data_signature'
self.dhub.execute(
self.execute(
proc=insert_proc,
placeholders=self.build_signature_placeholders,
executemany=True,
@ -632,7 +639,7 @@ class RefDataManager(object):
if update_placeholders:
# Update the job types with the job group id
self.dhub.execute(
self.execute(
proc='reference.updates.update_job_type_group_id',
placeholders=update_placeholders,
executemany=True,
@ -685,13 +692,13 @@ class RefDataManager(object):
insert_proc = 'reference.inserts.create_machine'
update_proc = 'reference.updates.update_machine_timestamp'
self.dhub.execute(
self.execute(
proc=insert_proc,
placeholders=self.machine_name_placeholders,
executemany=True,
debug_show=self.DEBUG)
name_lookup = self.dhub.execute(
name_lookup = self.execute(
proc=select_proc,
placeholders=self.machine_unique_names,
replace=[where_in_clause],
@ -719,7 +726,7 @@ class RefDataManager(object):
the potential to mangle previous stored machine_ids. This would
be bad...
"""
self.dhub.execute(
self.execute(
proc=update_proc,
placeholders=self.machine_timestamp_update_placeholders,
executemany=True,
@ -749,7 +756,7 @@ class RefDataManager(object):
if not self.oc_placeholders:
return {}
self.dhub.execute(
self.execute(
proc='reference.inserts.create_option_collection',
placeholders=self.oc_placeholders,
executemany=True,
@ -768,7 +775,7 @@ class RefDataManager(object):
if where_filters:
self.dhub.execute(
self.execute(
proc=insert_proc,
placeholders=platform_placeholders,
executemany=True,
@ -780,7 +787,7 @@ class RefDataManager(object):
# NOTE: This query is using master_host to insure we don't have a
# race condition with INSERT into master and SELECT new ids from
# the slave.
data_retrieved = self.dhub.execute(
data_retrieved = self.execute(
proc=select_proc,
placeholders=unique_platforms,
replace=[where_in_clause],
@ -811,13 +818,13 @@ class RefDataManager(object):
# Convert WHERE filters to string
where_in_clause = ",".join(where_in_list)
self.dhub.execute(
self.execute(
proc=insert_proc,
placeholders=name_placeholders,
executemany=True,
debug_show=self.DEBUG)
name_lookup = self.dhub.execute(
name_lookup = self.execute(
proc=select_proc,
placeholders=unique_names,
replace=[where_in_clause],
@ -837,7 +844,7 @@ class RefDataManager(object):
"""
if where_filters:
self.dhub.execute(
self.execute(
proc=insert_proc,
placeholders=name_symbol_placeholders,
executemany=True,
@ -846,7 +853,7 @@ class RefDataManager(object):
# Convert WHERE filters to string
where_in_clause = " OR ".join(where_filters)
data_retrieved = self.dhub.execute(
data_retrieved = self.execute(
proc=select_proc,
placeholders=names_and_symbols,
replace=[where_in_clause],
@ -1128,13 +1135,13 @@ class RefDataManager(object):
insert_proc = 'reference.inserts.create_option'
select_proc='reference.selects.get_options'
self.dhub.execute(
self.execute(
proc=insert_proc,
placeholders=option_placeholders,
executemany=True,
debug_show=self.DEBUG)
option_lookup = self.dhub.execute(
option_lookup = self.execute(
proc=select_proc,
placeholders=unique_options,
replace=[where_in_clause],
@ -1148,17 +1155,6 @@ class RefDataManager(object):
"""The name of the database holding the refdata tables"""
return self.dhub.conf["default_db"]
def get_row_by_id(self, table_name, obj_id):
iter_obj = self.dhub.execute(
proc="reference.selects.get_row_by_id",
replace=[table_name],
placeholders=[obj_id],
debug_show=self.DEBUG,
return_type='iter',
)
return iter_obj
def get_all_option_collections(self):
"""
Returns all option collections in the following data structure
@ -1175,7 +1171,7 @@ class RefDataManager(object):
...
}
"""
return self.dhub.execute(
return self.execute(
proc='reference.selects.get_all_option_collections',
debug_show=self.DEBUG,
key_column='option_collection_hash',
@ -1185,7 +1181,7 @@ class RefDataManager(object):
def get_repository_id(self, name):
"""get the id for the given repository"""
id_iter = self.dhub.execute(
id_iter = self.execute(
proc='reference.selects.get_repository_id',
placeholders=[name],
debug_show=self.DEBUG,
@ -1196,7 +1192,7 @@ class RefDataManager(object):
def get_repository_version_id(self, repository_id):
"""get the latest version available for the given repository"""
id_iter = self.dhub.execute(
id_iter = self.execute(
proc='reference.selects.get_repository_version_id',
placeholders=[repository_id],
debug_show=self.DEBUG,
@ -1207,7 +1203,7 @@ class RefDataManager(object):
def get_or_create_repository_version(self, repository_id, version,
version_timestamp):
self.dhub.execute(
self.execute(
proc='reference.inserts.create_repository_version',
placeholders=[
repository_id,
@ -1247,7 +1243,7 @@ class RefDataManager(object):
version, timestamp_now)
# update the version_timestamp
self.dhub.execute(
self.execute(
proc='reference.updates.update_version_timestamp',
placeholders=[
timestamp_now,
@ -1275,7 +1271,7 @@ class RefDataManager(object):
def get_repository_info(self, repository_id):
"""retrieves all the attributes of a repository"""
repo = self.dhub.execute(
repo = self.execute(
proc='reference.selects.get_repository_info',
placeholders=[repository_id],
debug_show=self.DEBUG,
@ -1285,13 +1281,13 @@ class RefDataManager(object):
return r
def get_all_repository_info(self):
return self.dhub.execute(
return self.execute(
proc='reference.selects.get_all_repository_info',
debug_show=self.DEBUG,
return_type='iter')
def get_bug_numbers_list(self):
return self.dhub.execute(
return self.execute(
proc='reference.selects.get_all_bug_numbers',
debug_show=self.DEBUG,
return_type='iter')
@ -1299,7 +1295,7 @@ class RefDataManager(object):
def delete_bugs(self, bug_ids):
"""delete a list of bugs given the ids"""
self.dhub.execute(
self.execute(
proc='reference.deletes.delete_bugs',
debug_show=self.DEBUG,
replace=[",".join(["%s"] * len(bug_ids))],
@ -1324,7 +1320,7 @@ class RefDataManager(object):
'id', 'status', 'resolution', 'summary',
'cf_crash_signature', 'keywords', 'op_sys', 'last_change_time', 'id')])
self.dhub.execute(
self.execute(
proc='reference.inserts.create_bugscache',
placeholders=placeholders,
executemany=True,
@ -1333,7 +1329,7 @@ class RefDataManager(object):
# removing the first placeholder because is not used in the update query
del placeholders[0]
self.dhub.execute(
self.execute(
proc='reference.updates.update_bugscache',
placeholders=placeholders,
executemany=True,
@ -1352,12 +1348,12 @@ class RefDataManager(object):
time_limit = datetime.now() - timedelta(days=90)
search_term = search_term.join('""')
open_recent = self.dhub.execute(
open_recent = self.execute(
proc='reference.selects.get_open_recent_bugs',
placeholders=[search_term, search_term, time_limit, max_size + 1],
debug_show=self.DEBUG)
all_others = self.dhub.execute(
all_others = self.execute(
proc='reference.selects.get_all_others_bugs',
placeholders=[search_term, search_term, time_limit, max_size + 1],
debug_show=self.DEBUG)
@ -1382,7 +1378,7 @@ class RefDataManager(object):
','.join( ['%s'] * len(signatures) )
]
reference_data = self.dhub.execute(
reference_data = self.execute(
proc="reference.selects.get_reference_data_signature_names",
placeholders=signatures,
replace=reference_data_signatures_where_in_clause,
@ -1400,7 +1396,7 @@ class RefDataManager(object):
reference_data_signatures_where_in_clause = [ ','.join( ['%s'] * len(signatures) ) ]
reference_data = self.dhub.execute(
reference_data = self.execute(
proc="reference.selects.get_reference_data_for_perf_signature",
placeholders=signatures,
replace=reference_data_signatures_where_in_clause,

Просмотреть файл

@ -30,11 +30,6 @@
GROUP BY TABLE_SCHEMA",
"host":"read_host"
},
"get_row_by_id":{
"sql": "SELECT * FROM REP0
WHERE `id` = ?",
"host":"read_host"
}
},
"locks": {

Просмотреть файл

@ -296,11 +296,6 @@
WHERE `name` = ? AND `active_status` = 'active'",
"host":"read_host"
},
"get_row_by_id":{
"sql": "SELECT * FROM REP0
WHERE `id` = ?",
"host":"read_host"
},
"get_all_option_collections":{
"sql":"SELECT option_collection_hash,
GROUP_CONCAT( name SEPARATOR ' ' ) as opt

Просмотреть файл

@ -1,7 +1,7 @@
import time
import json
import datetime
import sys
import random
from _mysql_exceptions import OperationalError
def get_now_timestamp():
@ -57,3 +57,23 @@ def where_wolf(project, flat_exclusions):
condition = " (mp.platform = %s AND jt.name = %s AND opt.name = %s)"
condition_list = " OR ".join([condition] * (len(values_list)/3))
return " AND ({0})".format(condition_list), values_list
def retry_execute(dhub, logger, retries=0, **kwargs):
"""Retry the query in the case of an OperationalError."""
try:
return dhub.execute(**kwargs)
except OperationalError:
if retries < 20:
retries += 1
sleep_time = round(random.random() * .05, 3) # 0 to 50ms
if logger:
logger.info(
"MySQL operational error hit. Retry #{0} in {1}s: {2}".format(
retries, sleep_time, kwargs
))
time.sleep(sleep_time)
return retry_execute(dhub, logger, retries, **kwargs)
else:
raise