Bug 1151806 - Implement chunking for job ingestion

This commit is contained in:
Cameron Dawson 2015-05-12 17:29:53 -07:00
Родитель 8507928699
Коммит e71e781565
8 изменённых файлов: 89 добавлений и 23 удалений

Просмотреть файл

@ -248,6 +248,29 @@ class TreeherderArtifactCollectionTest(DataSetup, unittest.TestCase):
self.assertTrue(len(self.artifact_data) == len(tac.data))
def test_collection_chunking(self):
tac = TreeherderArtifactCollection()
for artifact in self.artifact_data:
ta = TreeherderArtifact(artifact)
tac.add(ta)
# reconstruct the chunks and make sure we have the same data
rebuilt_data = []
chunk_num = 0
for chunk in tac.get_chunks(3):
chunk_data = chunk.get_collection_data()
rebuilt_data.extend(chunk_data)
chunk_num += 1
# the last one should be the "remainder" in an uneven size
if chunk_num == 4:
assert len(chunk_data) == 1
else:
assert len(chunk_data) == 3
assert rebuilt_data == tac.get_collection_data()
class TreeherderJobCollectionTest(DataSetup, unittest.TestCase):

Просмотреть файл

@ -490,3 +490,17 @@ def pulse_resultset_consumer(request):
@pytest.fixture
def pulse_action_consumer(request):
return pulse_consumer('job-actions', request)
@pytest.fixture
def mock_error_summary(monkeypatch):
bs_obj = ["foo", "bar"]
from treeherder.model import error_summary
def _get_error_summary(params):
return bs_obj
monkeypatch.setattr(error_summary, "get_error_summary", _get_error_summary)
return bs_obj

Просмотреть файл

@ -15,8 +15,8 @@ from tests.sampledata import SampleData
@pytest.fixture
def mock_post_json_data(monkeypatch, jm):
def _post_json_data(url, data):
def _post_json_data(url, data, chunk_size=1):
# does not do any chunking in this test
if data:
th_collection = data[jm.project]

Просмотреть файл

@ -552,6 +552,19 @@ class TreeherderCollection(object):
for d in self.data:
d.validate()
def get_chunks(self, chunk_size):
"""
Return a generator of new collections broken into chunks of size ``chunk_size``.
Each chunk will be a ``TreeherderCollection`` of the same
type as the original with a max of ``chunk_size`` count of
``TreeherderData`` objects.
Each collection must then be POSTed individually.
"""
for i in range(0, len(self.data), chunk_size):
yield self.__class__(self.data[i:i + chunk_size])
class TreeherderJobCollection(TreeherderCollection):
"""

Просмотреть файл

@ -445,7 +445,8 @@ class Builds4hJobsProcess(JsonExtractorMixin,
self.transform(extracted_content,
filter_to_revision=filter_to_revision,
filter_to_project=filter_to_project,
filter_to_job_group=filter_to_job_group)
filter_to_job_group=filter_to_job_group),
chunk_size=settings.BUILDAPI_BUILDS4H_CHUNK_SIZE
)
@ -462,7 +463,8 @@ class PendingJobsProcess(JsonExtractorMixin,
'pending',
filter_to_revision=filter_to_revision,
filter_to_project=filter_to_project,
filter_to_job_group=filter_to_job_group)
filter_to_job_group=filter_to_job_group),
chunk_size=settings.BUILDAPI_PENDING_CHUNK_SIZE
)
@ -479,5 +481,6 @@ class RunningJobsProcess(JsonExtractorMixin,
'running',
filter_to_revision=filter_to_revision,
filter_to_project=filter_to_project,
filter_to_job_group=filter_to_job_group)
filter_to_job_group=filter_to_job_group),
chunk_size=settings.BUILDAPI_RUNNING_CHUNK_SIZE
)

Просмотреть файл

@ -121,5 +121,5 @@ class ResultSetsLoaderMixin(JsonLoaderMixin):
class OAuthLoaderMixin(object):
def load(self, th_collections):
th_publisher.post_treeherder_collections(th_collections)
def load(self, th_collections, chunk_size=1):
th_publisher.post_treeherder_collections(th_collections, chunk_size)

Просмотреть файл

@ -14,31 +14,36 @@ from treeherder.etl.oauth_utils import OAuthCredentials
logger = logging.getLogger(__name__)
def post_treeherder_collections(th_collections):
def post_treeherder_collections(th_collections, chunk_size=1):
errors = []
cli = TreeherderClient(
protocol=settings.TREEHERDER_REQUEST_PROTOCOL,
host=settings.TREEHERDER_REQUEST_HOST,
)
for project in th_collections:
credentials = OAuthCredentials.get_credentials(project)
cli = TreeherderClient(
protocol=settings.TREEHERDER_REQUEST_PROTOCOL,
host=settings.TREEHERDER_REQUEST_HOST,
)
logger.info(
"collection loading request for project {0}: {1}".format(
project,
th_collections[project].endpoint_base))
try:
cli.post_collection(project, credentials.get('consumer_key'),
credentials.get('consumer_secret'),
th_collections[project])
except Exception, e:
errors.append({
"project": project,
"url": th_collections[project].endpoint_base,
"message": str(e)
})
collection_chunks = th_collections[project].get_chunks(chunk_size)
for collection in collection_chunks:
try:
cli.post_collection(project, credentials.get('consumer_key'),
credentials.get('consumer_secret'),
collection)
except Exception, e:
errors.append({
"project": project,
"url": th_collections[project].endpoint_base,
"message": str(e)
})
if errors:
raise CollectionNotLoadedException(errors)

Просмотреть файл

@ -304,6 +304,14 @@ BUILDAPI_PENDING_URL = "https://secure.pub.build.mozilla.org/builddata/buildjson
BUILDAPI_RUNNING_URL = "https://secure.pub.build.mozilla.org/builddata/buildjson/builds-running.js"
BUILDAPI_BUILDS4H_URL = "https://secure.pub.build.mozilla.org/builddata/buildjson/builds-4hr.js.gz"
# the max size of a posted request to treeherder client during Buildbot
# data job ingestion.
# If TreeherderCollections are larger, they will be chunked
# to this size.
BUILDAPI_PENDING_CHUNK_SIZE = 50
BUILDAPI_RUNNING_CHUNK_SIZE = 50
BUILDAPI_BUILDS4H_CHUNK_SIZE = 50
PARSER_MAX_STEP_ERROR_LINES = 100
PARSER_MAX_SUMMARY_LINES = 200