base model generic to datasource now

This commit is contained in:
Cameron Dawson 2013-04-18 22:26:30 -07:00
Родитель a025220123
Коммит 1290f6c646
8 изменённых файлов: 168 добавлений и 69 удалений

Просмотреть файл

@ -102,8 +102,8 @@ def jm():
""" Give a test access to a JobsModel instance. """ """ Give a test access to a JobsModel instance. """
from django.conf import settings from django.conf import settings
from treeherder.model.derived.jobs import JobsModel from treeherder.model.derived.jobs import JobsModel
return JobsModel(settings.DATABASES["default"]["TEST_NAME"]) return JobsModel.create(settings.DATABASES["default"]["TEST_NAME"])
# return JobsModel(settings.DATABASES["default"]["TEST_NAME"])
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def jobs_ds(): def jobs_ds():
@ -113,7 +113,7 @@ def jobs_ds():
project=settings.DATABASES["default"]["TEST_NAME"], project=settings.DATABASES["default"]["TEST_NAME"],
dataset=1, dataset=1,
contenttype="jobs", contenttype="jobs",
host="localhost", host=settings.DATABASES['default']['HOST'],
) )
@ -125,5 +125,8 @@ def objectstore_ds():
project=settings.DATABASES["default"]["TEST_NAME"], project=settings.DATABASES["default"]["TEST_NAME"],
dataset=1, dataset=1,
contenttype="objectstore", contenttype="objectstore",
host="localhost", host=settings.DATABASES['default']['HOST'],
) )
#ARE WE SURE LOCALHOST IS RIGHT HERE, FOR THE VAGRANT VM?

Просмотреть файл

@ -55,4 +55,3 @@ def test_job_group_manager(refdata):
assert row_data["name"] == 'mygroup' assert row_data["name"] == 'mygroup'
assert row_data["description"] == 'fill me' assert row_data["description"] == 'fill me'
assert row_data["active_status"] == 'active' assert row_data["active_status"] == 'active'

Просмотреть файл

@ -23,10 +23,11 @@ def xtest_disconnect(jm):
assert src.dhub.connection["master_host"]["con_obj"].open is False assert src.dhub.connection["master_host"]["con_obj"].open is False
def test_claim_objects(objectstore_ds, jm): def test_claim_objects(objectstore_ds, jobs_ds, jm):
"""``claim_objects`` claims & returns unclaimed rows up to a limit.""" """``claim_objects`` claims & returns unclaimed rows up to a limit."""
# s = objectstore_ds os = objectstore_ds
j = jobs_ds
blobs = [ blobs = [
job_json(testrun={"date": "1330454755"}), job_json(testrun={"date": "1330454755"}),
@ -59,7 +60,7 @@ def test_claim_objects(objectstore_ds, jm):
assert loading_rows == 3 assert loading_rows == 3
def xtest_mark_object_complete(jobs_ds, jm): def test_mark_object_complete(jobs_ds, jm):
"""Marks claimed row complete and records run id.""" """Marks claimed row complete and records run id."""
jm.store_job_data(job_json()) jm.store_job_data(job_json())
row_id = jm.claim_objects(1)[0]["id"] row_id = jm.claim_objects(1)[0]["id"]
@ -81,7 +82,7 @@ def xtest_process_objects(jobs_ds, jm):
job_json(testrun={"date": "1330454755"}), job_json(testrun={"date": "1330454755"}),
job_json(testrun={"date": "1330454756"}), job_json(testrun={"date": "1330454756"}),
job_json(testrun={"date": "1330454757"}), job_json(testrun={"date": "1330454757"}),
] ]
for blob in blobs: for blob in blobs:
jm.store_job_data(blob) jm.store_job_data(blob)

Просмотреть файл

@ -5,31 +5,72 @@ access.
""" """
from django.conf import settings from django.conf import settings
from treeherder.model.sql.sql_datasource import SQLDataSource from treeherder.model.models import Datasource
class TreeherderModelBase(object): class TreeherderModelBase(object):
"""Base model class for all TreeHerder models""" """
Base model class for all derived models
CONTENT_TYPES = [] """
def __init__(self, project): def __init__(self, project):
"""Encapsulate the dataset access for this ``project`` """
self.project = project self.project = project
self.sources = {} self.sources = {}
for ct in self.CONTENT_TYPES: self.dhubs = {}
self.sources[ct] = SQLDataSource(project, ct)
self.DEBUG = settings.DEBUG self.DEBUG = settings.DEBUG
def __unicode__(self): def __unicode__(self):
"""Unicode representation is project name.""" """Unicode representation is project name."""
return self.project return self.project
def disconnect(self): def get_dhub(self, contenttype, procs_file_name=None):
"""Iterate over and disconnect all data sources.""" """
for src in self.sources.itervalues(): The configured datahub for the given contenttype
src.disconnect()
def get_project_cache_key(self, str_data): """
return "{0}_{1}".format(self.project, str_data) if not procs_file_name:
procs_file_name = "{0}.json".format(contenttype)
if not contenttype in self.dhubs.keys():
self.dhubs[contenttype] = self.get_datasource(
contenttype).dhub(procs_file_name)
return self.dhubs[contenttype]
def get_datasource(self, contenttype):
"""The datasource for this contenttype of the project."""
if not contenttype in self.sources.keys():
self.sources[contenttype] = self._get_datasource(contenttype)
# self.sources[contenttype] = Datasource.objects.get(
# project = self.project,
# contenttype = contenttype,
# dataset = 1,
# )
return self.sources[contenttype]
def _get_datasource(self, contenttype):
"""Find the datasource for this contenttype in the cache."""
candidate_sources = []
for source in Datasource.objects.cached():
if (source.project == self.project and
source.contenttype == contenttype):
candidate_sources.append(source)
if not candidate_sources:
raise DatasetNotFoundError(
"No dataset found for project %r, contenttype %r."
% (self.project, contenttype)
)
candidate_sources.sort(key=lambda s: s.dataset, reverse=True)
return candidate_sources[0]
class DatasetNotFoundError(ValueError):
pass

Просмотреть файл

@ -26,47 +26,56 @@ class JobsModel(TreeherderModelBase):
CT_OBJECTSTORE = "objectstore" CT_OBJECTSTORE = "objectstore"
CONTENT_TYPES = [CT_JOBS, CT_OBJECTSTORE] CONTENT_TYPES = [CT_JOBS, CT_OBJECTSTORE]
# @classmethod @classmethod
# def create(cls, project, hosts=None, types=None): def create(cls, project, hosts=None, types=None):
# """ """
# Create all the datasource tables for this project. Create all the datasource tables for this project.
#
# ``hosts`` is an optional dictionary mapping contenttype names to the ``hosts`` is an optional dictionary mapping contenttype names to the
# database server host on which the database for that contenttype should database server host on which the database for that contenttype should
# be created. Not all contenttypes need to be represented; any that be created. Not all contenttypes need to be represented; any that
# aren't will use the default (``TREEHERDER_DATABASE_HOST``). aren't will use the default (``TREEHERDER_DATABASE_HOST``).
#
# ``types`` is an optional dictionary mapping contenttype names to the ``types`` is an optional dictionary mapping contenttype names to the
# type of database that should be created. For MySQL/MariaDB databases, type of database that should be created. For MySQL/MariaDB databases,
# use "MySQL-Engine", where "Engine" could be "InnoDB", "Aria", etc. Not use "MySQL-Engine", where "Engine" could be "InnoDB", "Aria", etc. Not
# all contenttypes need to be represented; any that aren't will use the all contenttypes need to be represented; any that aren't will use the
# default (``MySQL-InnoDB``). default (``MySQL-InnoDB``).
#
#
# """ """
# hosts = hosts or {} hosts = hosts or {}
# types = types or {} types = types or {}
#
# for ct in cls.CONTENT_TYPES: for ct in [cls.CT_JOBS, cls.CT_OBJECTSTORE]:
# Datasource.create( dataset = Datasource.get_latest_dataset(project, ct)
# project, source = Datasource(
# ct, project=project,
# host=hosts.get(ct), contenttype=ct,
# db_type=types.get(ct), dataset=dataset or 1,
# ) )
# source.save()
# return cls(project=project)
return cls(project=project)
def get_jobs_dhub(self):
"""Get the dhub for jobs"""
return self.get_dhub(self.CT_JOBS)
def get_os_dhub(self):
"""Get the dhub for the objectstore"""
return self.get_dhub(self.CT_OBJECTSTORE)
def get_oauth_consumer_secret(self, key): def get_oauth_consumer_secret(self, key):
ds = self.sources[self.CT_OBJECTSTORE].datasource ds = self.get_datasource(self.CT_OBJECTSTORE)
secret = ds.get_oauth_consumer_secret(key) secret = ds.get_oauth_consumer_secret(key)
return secret return secret
def _get_last_insert_id(self, source=None): def _get_last_insert_id(self, contenttype=None):
"""Return last-inserted ID.""" """Return last-inserted ID."""
if not source: if not contenttype:
source = self.CT_JOBS contenttype = self.CT_JOBS
return self.sources[source].dhub.execute( return self.get_dhub(contenttype).execute(
proc='generic.selects.get_last_insert_id', proc='generic.selects.get_last_insert_id',
debug_show=self.DEBUG, debug_show=self.DEBUG,
return_type='iter', return_type='iter',
@ -79,7 +88,7 @@ class JobsModel(TreeherderModelBase):
error = "N" if error is None else "Y" error = "N" if error is None else "Y"
error_msg = error or "" error_msg = error or ""
self.sources[self.CT_OBJECTSTORE].dhub.execute( self.get_os_dhub().execute(
proc='objectstore.inserts.store_json', proc='objectstore.inserts.store_json',
placeholders=[loaded_timestamp, json_data, error, error_msg], placeholders=[loaded_timestamp, json_data, error, error_msg],
debug_show=self.DEBUG debug_show=self.DEBUG
@ -98,7 +107,7 @@ class JobsModel(TreeherderModelBase):
""" """
proc = "objectstore.selects.get_unprocessed" proc = "objectstore.selects.get_unprocessed"
json_blobs = self.sources[self.CT_OBJECTSTORE].dhub.execute( json_blobs = self.get_os_dhub().execute(
proc=proc, proc=proc,
placeholders=[limit], placeholders=[limit],
debug_show=self.DEBUG, debug_show=self.DEBUG,
@ -274,7 +283,7 @@ class JobsModel(TreeherderModelBase):
return job_id return job_id
def _insert_data(self, statement, placeholders, executemany=False): def _insert_data(self, statement, placeholders, executemany=False):
self.sources[self.CT_JOBS].dhub.execute( self.get_jobs_dhub().execute(
proc='jobs.inserts.' + statement, proc='jobs.inserts.' + statement,
debug_show=self.DEBUG, debug_show=self.DEBUG,
placeholders=placeholders, placeholders=placeholders,
@ -288,7 +297,7 @@ class JobsModel(TreeherderModelBase):
def _get_last_insert_id(self, source=CT_JOBS): def _get_last_insert_id(self, source=CT_JOBS):
"""Return last-inserted ID.""" """Return last-inserted ID."""
return self.sources[source].dhub.execute( return self.get_dhub(source).execute(
proc='generic.selects.get_last_insert_id', proc='generic.selects.get_last_insert_id',
debug_show=self.DEBUG, debug_show=self.DEBUG,
return_type='iter', return_type='iter',
@ -360,17 +369,17 @@ class JobsModel(TreeherderModelBase):
# Note: this claims rows for processing. Failure to call load_job_data # Note: this claims rows for processing. Failure to call load_job_data
# on this data will result in some json blobs being stuck in limbo # on this data will result in some json blobs being stuck in limbo
# until another worker comes along with the same connection ID. # until another worker comes along with the same connection ID.
self.sources[self.CT_OBJECTSTORE].dhub.execute( self.get_os_dhub().execute(
proc=proc_mark, proc=proc_mark,
placeholders=[limit], placeholders=[limit],
debug_show=self.DEBUG, debug_show=self.DEBUG,
) )
resetwarnings() resetwarnings()
# Return all JSON blobs claimed by this connection ID (could possibly # Return all JSON blobs claimed by this connection ID (could possibly
# include orphaned rows from a previous run). # include orphaned rows from a previous run).
json_blobs = self.sources[self.CT_OBJECTSTORE].dhub.execute( json_blobs = self.get_os_dhub().execute(
proc=proc_get, proc=proc_get,
debug_show=self.DEBUG, debug_show=self.DEBUG,
return_type='tuple' return_type='tuple'
@ -380,7 +389,7 @@ class JobsModel(TreeherderModelBase):
def mark_object_complete(self, object_id, job_id): def mark_object_complete(self, object_id, job_id):
""" Call to database to mark the task completed """ """ Call to database to mark the task completed """
self.sources[self.CT_OBJECTSTORE].dhub.execute( self.get_os_dhub().execute(
proc="objectstore.updates.mark_complete", proc="objectstore.updates.mark_complete",
placeholders=[job_id, object_id], placeholders=[job_id, object_id],
debug_show=self.DEBUG debug_show=self.DEBUG
@ -388,7 +397,7 @@ class JobsModel(TreeherderModelBase):
def mark_object_error(self, object_id, error): def mark_object_error(self, object_id, error):
""" Call to database to mark the task completed """ """ Call to database to mark the task completed """
self.sources[self.CT_OBJECTSTORE].dhub.execute( self.get_os_dhub().execute(
proc="objectstore.updates.mark_error", proc="objectstore.updates.mark_error",
placeholders=[error, object_id], placeholders=[error, object_id],
debug_show=self.DEBUG debug_show=self.DEBUG

Просмотреть файл

@ -2,7 +2,6 @@ import os
from django.conf import settings from django.conf import settings
from datasource.bases.BaseHub import BaseHub from datasource.bases.BaseHub import BaseHub
from datasource.DataHub import DataHub from datasource.DataHub import DataHub
from .base import TreeherderModelBase
class RefDataManager(object): class RefDataManager(object):

Просмотреть файл

@ -9,6 +9,7 @@ from datasource.hubs.MySQL import MySQL
from django.conf import settings from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
from django.db import models from django.db import models
from django.db.models import Max
from treeherder import path from treeherder import path
@ -157,13 +158,26 @@ class MachineNote(models.Model):
class DatasourceManager(models.Manager): class DatasourceManager(models.Manager):
def cached(self): def cached(self):
"""Return all datasources, caching the results.""" """
Return all datasources, caching the results.
"""
sources = cache.get(SOURCES_CACHE_KEY) sources = cache.get(SOURCES_CACHE_KEY)
if not sources: if not sources:
sources = list(self.all()) sources = list(self.all())
cache.set(SOURCES_CACHE_KEY, sources) cache.set(SOURCES_CACHE_KEY, sources)
return sources return sources
def latest(self, project, contenttype):
"""
@@@ TODO: this needs to use the cache, probably
"""
ds = Datasource.get_latest_dataset(project, contenttype)
return self.get(
project=project,
contenttype=contenttype,
dataset=ds)
class Datasource(models.Model): class Datasource(models.Model):
id = models.IntegerField(primary_key=True) id = models.IntegerField(primary_key=True)
@ -192,6 +206,14 @@ class Datasource(models.Model):
cache.delete(SOURCES_CACHE_KEY) cache.delete(SOURCES_CACHE_KEY)
cls.objects.cached() cls.objects.cached()
@classmethod
def get_latest_dataset(cls, project, contenttype):
"""get the latest dataset"""
return cls.objects.filter(
project=project,
contenttype=contenttype,
).aggregate(Max("dataset"))["dataset__max"]
@property @property
def key(self): def key(self):
"""Unique key for a data source is the project, contenttype, dataset.""" """Unique key for a data source is the project, contenttype, dataset."""
@ -202,6 +224,31 @@ class Datasource(models.Model):
"""Unicode representation is the project's unique key.""" """Unicode representation is the project's unique key."""
return unicode(self.key) return unicode(self.key)
def create_next_dataset(self, schema_file=None):
"""
Create and return the next dataset for this project/contenttype.
The database for the new dataset will be located on the same host.
"""
dataset = Datasource.objects.filter(
project=self.project,
contenttype=self.contenttype
).order_by("-dataset")[0].dataset + 1
# @@@ should we store the schema file name used for the previous
# dataset in the db and use the same one again automatically? or should
# we actually copy the schema of an existing dataset rather than using
# a schema file at all?
return Datasource.objects.create(
project=self.project,
contenttype=self.contenttype,
dataset=dataset,
host=self.datasource.host,
db_type=self.datasource.type,
schema_file=schema_file,
)
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
inserting = not self.pk inserting = not self.pk
# in case you want to add a new datasource and provide # in case you want to add a new datasource and provide

Просмотреть файл

@ -2,7 +2,7 @@
"inserts":{ "inserts":{
"store_json":{ "store_json":{
"sql":"INSERT INTO `objectstore` (`date_timestamp`, "sql":"INSERT INTO `objectstore` (`loaded_timestamp`,
`json_blob`, `json_blob`,
`error`, `error`,
`error_msg`) `error_msg`)