Merge pull request #291 from mozilla/bug-1102228-improve-data-cycling

Bug 1102228 improve data cycling
This commit is contained in:
Mauro Doglio 2014-11-27 17:13:11 +00:00
Родитель 78ecb0900c 06f62d21f6
Коммит 0a02d494ef
7 изменённых файлов: 230 добавлений и 203 удалений

Просмотреть файл

@ -3,7 +3,7 @@
"get_jobs_for_cycling": {
"sql": "SELECT id FROM `job` WHERE `result_set_id` IN (
SELECT id FROM result_set WHERE push_timestamp >= ?
SELECT id FROM result_set WHERE push_timestamp < ?
)",
"host": "master_host"
},

Просмотреть файл

@ -9,6 +9,9 @@ import itertools
import pprint
import copy
from django.conf import settings
from django.core.management import call_command
from treeherder.model.derived.base import DatasetNotFoundError
from tests.sample_data_generator import job_data, result_set
from tests.sampledata import SampleData
@ -211,45 +214,41 @@ def test_ingest_retry_sample_job_no_running(jm, refdata, sample_data, initial_da
assert jl[0]['result'] == 'retry'
def test_cycle_all_data(jm, refdata, sample_data, initial_data, sample_resultset, mock_log_parser):
def test_cycle_all_data(jm, refdata, sample_data, initial_data,
sample_resultset, mock_log_parser):
"""
Test cycling the sample data
"""
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset, False)
# build a date that will cause the data to be cycled
cycle_date_ts = int(time.time() - (jm.DATA_CYCLE_INTERVAL + 100000))
time_now = time.time()
cycle_date_ts = time_now - 7 * 24 * 3600
jm.get_dhub(jm.CT_JOBS).execute(
jm.jobs_execute(
proc="jobs_test.updates.set_result_sets_push_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.get_dhub(jm.CT_JOBS).execute(
jobs_to_be_deleted = jm.jobs_execute(
proc="jobs_test.selects.get_jobs_for_cycling",
placeholders=[cycle_date_ts]
placeholders=[time_now - 24 * 3600]
)
job_count = len(jobs_to_be_deleted)
jobs_before = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_before = jm.get_dhub(jm.CT_JOBS).execute(proc="jobs_test.selects.jobs")
call_command('cycle_data', sleep_time=0, cycle_interval=1)
sql_targets = jm.cycle_data({}, False)
jobs_after = jm.jobs_execute(proc="jobs_test.selects.jobs")
jobs_after = jm.get_dhub(jm.CT_JOBS).execute(proc="jobs_test.selects.jobs")
jm.disconnect()
refdata.disconnect()
assert len(jobs_before) == job_count
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
# There should be no jobs after cycling
assert len(jobs_after) == 0
assert sql_targets['jobs.deletes.cycle_job'] == job_count
def test_cycle_one_job(jm, refdata, sample_data, initial_data, sample_resultset, mock_log_parser):
def test_cycle_one_job(jm, refdata, sample_data, initial_data,
sample_resultset, mock_log_parser):
"""
Test cycling one job in a group of jobs to confirm there are no
unexpected deletions
@ -258,43 +257,75 @@ def test_cycle_one_job(jm, refdata, sample_data, initial_data, sample_resultset,
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset, False)
# set all the result_sets to a non cycle time
non_cycle_date_ts = int(time.time() - (jm.DATA_CYCLE_INTERVAL - 100000))
jm.get_dhub(jm.CT_JOBS).execute(
time_now = time.time()
cycle_date_ts = int(time_now - 7 * 24 * 3600)
jm.jobs_execute(
proc="jobs_test.updates.set_result_sets_push_timestamp",
placeholders=[ non_cycle_date_ts ]
placeholders=[time_now]
)
# build a date that will cause the data to be cycled
cycle_date_ts = int(time.time() - (jm.DATA_CYCLE_INTERVAL + 100000))
jm.get_dhub(jm.CT_JOBS).execute(
jm.jobs_execute(
proc="jobs_test.updates.set_one_result_set_push_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.get_dhub(jm.CT_JOBS).execute(
jobs_to_be_deleted = jm.jobs_execute(
proc="jobs_test.selects.get_result_set_jobs",
placeholders=[1]
)
job_count = len(jobs_to_be_deleted)
jobs_before = jm.jobs_execute(proc="jobs_test.selects.jobs")
sql_targets = jm.cycle_data({}, False)
call_command('cycle_data', sleep_time=0, cycle_interval=1, debug=True)
assert sql_targets['jobs.deletes.cycle_job'] == job_count
jobs_after = jm.jobs_execute(proc="jobs_test.selects.jobs")
#Confirm that the target result set has no jobs in the
#jobs table
jobs_count_after_delete = jm.get_dhub(jm.CT_JOBS).execute(
jobs_to_be_deleted_after = jm.jobs_execute(
proc="jobs_test.selects.get_result_set_jobs",
placeholders=[1]
)
assert len(jobs_count_after_delete) == 0
assert len(jobs_to_be_deleted_after) == 0
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
def test_cycle_all_data_in_chunks(jm, refdata, sample_data, initial_data,
sample_resultset, mock_log_parser):
"""
Test cycling the sample data in chunks.
"""
job_data = sample_data.job_data[:20]
test_utils.do_job_ingestion(jm, refdata, job_data, sample_resultset, False)
# build a date that will cause the data to be cycled
time_now = time.time()
cycle_date_ts = int(time_now - 7 * 24 * 3600)
jm.jobs_execute(
proc="jobs_test.updates.set_result_sets_push_timestamp",
placeholders=[cycle_date_ts]
)
jobs_to_be_deleted = jm.jobs_execute(
proc="jobs_test.selects.get_jobs_for_cycling",
placeholders=[time_now - 24 * 3600]
)
jobs_before = jm.jobs_execute(proc="jobs_test.selects.jobs")
call_command('cycle_data', sleep_time=0, cycle_interval=1, chunk_size=3)
jobs_after = jm.jobs_execute(proc="jobs_test.selects.jobs")
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
# There should be no jobs after cycling
assert len(jobs_after) == 0
jm.disconnect()
refdata.disconnect()
def test_bad_date_value_ingestion(jm, initial_data, mock_log_parser):
"""

Просмотреть файл

@ -6,6 +6,7 @@ import json
import MySQLdb
import time
import logging
from datetime import datetime
from operator import itemgetter
@ -155,9 +156,6 @@ class JobsModel(TreeherderModelBase):
"jobs.deletes.cycle_result_set"
]
# 6 months in seconds
DATA_CYCLE_INTERVAL = 15552000
@classmethod
def create(cls, project, host=None):
"""
@ -184,6 +182,16 @@ class JobsModel(TreeherderModelBase):
"""Get the dhub for jobs"""
return self.get_dhub(self.CT_JOBS)
def execute(self, data_type, **kwargs):
"""
Execute a query based on the data_type provided.
"""
if data_type == 'jobs':
dhub = self.get_jobs_dhub()
else:
dhub = self.get_os_dhub()
return utils.retry_execute(dhub, logger, **kwargs)
def jobs_execute(self, **kwargs):
return utils.retry_execute(self.get_jobs_dhub(), logger, **kwargs)
@ -732,155 +740,123 @@ class JobsModel(TreeherderModelBase):
return round( sorted_list[length / 2], 0 )
def cycle_data(self, sql_targets={}, execute_sleep=True):
min_date = int(time.time() - self.DATA_CYCLE_INTERVAL)
def cycle_data(self, cycle_interval, chunk_size, sleep_time):
"""Delete data older than cycle_interval, splitting the target data
into chunks of chunk_size size. Returns the number of result sets deleted"""
max_date = datetime.now() - cycle_interval
max_timestamp = int(time.mktime(max_date.timetuple()))
# Retrieve list of result sets to delete
result_set_data = self.jobs_execute(
proc='jobs.selects.get_result_sets_to_cycle',
placeholders=[min_date],
placeholders=[max_timestamp],
debug_show=self.DEBUG
)
if not result_set_data:
return 0
if len(result_set_data) == 0:
sql_targets['total_count'] = 0
return sql_targets
# group the result_set data in chunks
result_set_chunk_list = zip(*[iter(result_set_data)] * chunk_size)
# append the remaining result_set not fitting in a complete chunk
result_set_chunk_list.append(
result_set_data[-(len(result_set_data) % chunk_size):])
rs_placeholders = map( lambda x:x['id'], result_set_data )
rs_where_in_clause = [ ','.join( ['%s'] * len(rs_placeholders) ) ]
for result_set_chunks in result_set_chunk_list:
# Retrieve list of revisions associated with result sets
revision_data = self.jobs_execute(
proc='jobs.selects.get_revision_ids_to_cycle',
placeholders=rs_placeholders,
replace=rs_where_in_clause,
debug_show=self.DEBUG
)
# Retrieve list of revisions associated with result sets
rs_placeholders = [x['id'] for x in result_set_chunks]
rs_where_in_clause = [','.join(['%s'] * len(rs_placeholders))]
revision_data = self.jobs_execute(
proc='jobs.selects.get_revision_ids_to_cycle',
placeholders=rs_placeholders,
replace=rs_where_in_clause,
debug_show=self.DEBUG
)
rev_placeholders = map( lambda x:x['revision_id'], revision_data )
rev_where_in_clause = [ ','.join( ['%s'] * len(rev_placeholders) ) ]
# Retrieve list of jobs associated with result sets
rev_placeholders = [x['revision_id'] for x in revision_data]
rev_where_in_clause = [','.join(['%s'] * len(rev_placeholders))]
job_data = self.jobs_execute(
proc='jobs.selects.get_jobs_to_cycle',
placeholders=rs_placeholders,
replace=rs_where_in_clause,
debug_show=self.DEBUG
)
# Retrieve list of jobs associated with result sets
job_data = self.jobs_execute(
proc='jobs.selects.get_jobs_to_cycle',
placeholders=rs_placeholders,
replace=rs_where_in_clause,
debug_show=self.DEBUG
)
job_guid_dict = dict((d['id'], d['job_guid']) for d in job_data)
job_where_in_clause = [','.join(['%s'] * len(job_guid_dict))]
guid_placeholders = []
job_id_placeholders = []
# Associate placeholders and replace data with sql
obj_targets = []
for sql in self.OBJECTSTORE_CYCLE_TARGETS:
obj_targets.append({
"proc": sql,
"placeholders": job_guid_dict.values(),
"replace": job_where_in_clause
})
for d in job_data:
guid_placeholders.append(d['job_guid'])
job_id_placeholders.append(d['id'])
jobs_targets = []
for proc in self.JOBS_CYCLE_TARGETS:
query_name = proc.split('.')[-1]
if query_name == 'cycle_revision':
jobs_targets.append({
"proc": proc,
"placeholders": rev_placeholders,
"replace": rev_where_in_clause
})
jobs_where_in_clause = [ ','.join( ['%s'] * len(job_id_placeholders) ) ]
elif query_name == 'cycle_revision_map':
jobs_targets.append({
"proc": proc,
"placeholders": rs_placeholders,
"replace": rs_where_in_clause
})
# Associate placeholders and replace data with sql
obj_targets = []
for sql in self.OBJECTSTORE_CYCLE_TARGETS:
obj_targets.append(
{ "sql":sql,
"placeholders":guid_placeholders,
"replace":jobs_where_in_clause } )
elif query_name == 'cycle_result_set':
jobs_targets.append({
"proc": proc,
"placeholders": rs_placeholders,
"replace": rs_where_in_clause
})
jobs_targets = []
for sql in self.JOBS_CYCLE_TARGETS:
else:
jobs_targets.append({
"proc": proc,
"placeholders": job_guid_dict.keys(),
"replace": job_where_in_clause
})
if sql == 'cycle_revision':
# remove data from specified objectstore and jobs tables that is
# older than max_timestamp
self._execute_table_deletes(obj_targets, 'objectstore', sleep_time)
self._execute_table_deletes(jobs_targets, 'jobs', sleep_time)
jobs_targets.append(
{ "sql":sql,
"placeholders":rev_placeholders,
"replace":rev_where_in_clause } )
return len(result_set_data)
elif sql == 'cycle_revision_map':
jobs_targets.append(
{ "sql":sql,
"placeholders":rs_placeholders,
"replace":rs_where_in_clause } )
elif sql == 'cycle_result_set':
jobs_targets.append(
{ "sql":sql,
"placeholders":rs_placeholders,
"replace":rs_where_in_clause } )
else:
jobs_targets.append(
{ "sql":sql,
"placeholders":job_id_placeholders,
"replace":jobs_where_in_clause } )
sql_targets['total_count'] = 0
# remove data from specified objectstore and jobs tables that is
# older than 6 months
self._execute_table_deletes(
min_date, self.get_os_dhub(), obj_targets, sql_targets,
execute_sleep
)
self._execute_table_deletes(
min_date, self.get_jobs_dhub(), jobs_targets, sql_targets,
execute_sleep
)
return sql_targets
def _execute_table_deletes(
self, min_date, dhub, sql_to_execute, sql_targets, execute_sleep):
def _execute_table_deletes(self, sql_to_execute, data_type, sleep_time):
for sql_obj in sql_to_execute:
sql = sql_obj['sql']
placeholders = sql_obj['placeholders']
replace = sql_obj['replace']
if sql not in sql_targets:
# First pass for sql
sql_targets[sql] = None
if len(placeholders) == 0:
sql_targets[sql] = 0
if not sql_obj['placeholders']:
continue
sql_obj['debug_show'] = self.DEBUG
if (sql_targets[sql] == None) or (sql_targets[sql] > 0):
# Disable foreign key checks to improve performance
self.execute(data_type,
proc='generic.db_control.disable_foreign_key_checks',
debug_show=self.DEBUG)
# Disable foreign key checks to improve performance
dhub.execute(
proc='generic.db_control.disable_foreign_key_checks',
debug_show=self.DEBUG
)
self.execute(data_type, **sql_obj)
self.get_dhub(data_type).commit('master_host')
dhub.execute(
proc=sql,
placeholders=placeholders,
replace=replace,
debug_show=self.DEBUG,
)
# Re-enable foreign key checks to improve performance
self.execute(data_type,
proc='generic.db_control.enable_foreign_key_checks',
debug_show=self.DEBUG)
row_count = dhub.connection['master_host']['cursor'].rowcount
dhub.commit('master_host')
# Re-enable foreign key checks to improve performance
dhub.execute(
proc='generic.db_control.enable_foreign_key_checks',
debug_show=self.DEBUG
)
sql_targets[sql] = row_count
sql_targets['total_count'] += row_count
if execute_sleep:
# Allow some time for other queries to get through
time.sleep(5)
if sleep_time:
# Allow some time for other queries to get through
time.sleep(sleep_time)
def get_bug_job_map_list(self, offset, limit, conditions=None):
"""

Просмотреть файл

@ -1,34 +1,77 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
from optparse import make_option
import datetime
from django.core.management.base import BaseCommand
from treeherder.model.models import Repository
from treeherder.model.derived import JobsModel
from treeherder.model.tasks import cycle_data
from treeherder.model.models import Datasource
from django.conf import settings
class Command(BaseCommand):
help = """Cycle data that exceeds the time constraint limit"""
option_list = BaseCommand.option_list + (
make_option('--debug',
make_option(
'--debug',
action='store_true',
dest='debug',
default=None,
default=False,
help='Write debug messages to stdout'),
make_option('--iterations',
make_option(
'--cycle-interval',
action='store',
dest='iterations',
default=5,
help='Number of data cycle iterations to execute in a single run'),
dest='cycle_interval',
default=0,
type='int',
help='Data cycle interval expressed in days'),
make_option(
'--chunk-size',
action='store',
dest='chunk_size',
default=100,
type='int',
help=('Define the size of the chunks '
'the target data will be divided in')),
make_option(
'--sleep-time',
action='store',
dest='sleep_time',
default=2,
type='int',
help='How many seconds to pause between each query'),
)
def handle(self, *args, **options):
self.is_debug = options['debug']
debug = options.get("debug", None)
max_iterations = int(options.get("iterations"))
if options['cycle_interval']:
cycle_interval = datetime.timedelta(days=options['cycle_interval'])
else:
cycle_interval = settings.DATA_CYCLE_INTERVAL
cycle_data(max_iterations, debug)
self.debug("cycle interval: {0}".format(cycle_interval))
projects = Datasource.objects\
.filter(contenttype='jobs')\
.values_list('project', flat=True)
for project in projects:
self.debug("Cycling Database: {0}".format(project))
jm = JobsModel(project)
try:
num_deleted = jm.cycle_data(cycle_interval,
options['chunk_size'],
options['sleep_time'])
self.debug("Deleted {0} resultsets from {1}".format(
num_deleted, project))
finally:
jm.disconnect()
def debug(self, msg):
if self.is_debug:
self.stdout.write(msg)

Просмотреть файл

@ -495,7 +495,7 @@
},
"get_result_sets_to_cycle":{
"sql":"SELECT id FROM result_set WHERE push_timestamp < ? LIMIT 5000",
"sql":"SELECT id FROM result_set WHERE push_timestamp < ?",
"host":"master_host"
},

Просмотреть файл

@ -1,13 +1,14 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
from celery import task
from django.core.management import call_command
from django.conf import settings
from treeherder.model.derived import JobsModel
from treeherder.model.models import Datasource, Repository
@task(name='process-objects')
def process_objects(limit=None):
"""
@ -23,39 +24,12 @@ def process_objects(limit=None):
finally:
jm.disconnect()
# Run a maximum of 1 per hour
@task(name='cycle-data', rate_limit='1/h')
def cycle_data(max_iterations=50, debug=False):
def cycle_data():
call_command('cycle_data')
projects = Repository.objects.all().values_list('name', flat=True)
for project in projects:
jm = JobsModel(project)
sql_targets = {}
if debug:
print "Cycling Database: {0}".format(project)
cycle_iterations = max_iterations
while cycle_iterations > 0:
sql_targets = jm.cycle_data(sql_targets)
if debug:
print "Iterations: {0}".format(str(cycle_iterations))
print "sql_targets"
print sql_targets
cycle_iterations -= 1
# No more items to delete
if sql_targets['total_count'] == 0:
cycle_iterations = 0
jm.disconnect()
@task(name='calculate-eta', rate_limit='1/h')
def calculate_eta(sample_window_seconds=21600, debug=False):

Просмотреть файл

@ -5,6 +5,7 @@
# Django settings for webapp project.
import os
import sys
from datetime import timedelta
from treeherder import path
# Insure the vendor libraries are added to the python path
@ -39,6 +40,8 @@ TREEHERDER_PERF_SERIES_TIME_RANGES = [
{ "seconds":7776000, "days":90 },
]
DATA_CYCLE_INTERVAL = timedelta(days=30*6)
RABBITMQ_USER = os.environ.get("TREEHERDER_RABBITMQ_USER", "")
RABBITMQ_PASSWORD = os.environ.get("TREEHERDER_RABBITMQ_PASSWORD", "")
RABBITMQ_VHOST = os.environ.get("TREEHERDER_RABBITMQ_VHOST", "")