Revert "Bug 1395254 - Consume Taskcluster jobs from standard queue exchanges (#5042)"

This reverts commit 75dc375a93.
This commit is contained in:
Armen Zambrano G 2019-07-23 16:24:13 -04:00
Родитель 905869c2fd
Коммит 7f04a91af7
29 изменённых файлов: 73 добавлений и 20687 удалений

Просмотреть файл

@ -167,7 +167,7 @@ If new pushes or CI job results are not appearing in Treeherder's UI:
it suggests that Treeherder's `pulse_listener_{pushes,jobs}` dynos have stopped
consuming Pulse events, and so might need [restarting].
3. Failing that, it's possible the issue might lie in the services that send events to
those Pulse exchanges, such as `taskcluster-github` or
those Pulse exchanges, such as `taskcluster-treeherder`, `taskcluster-github` or
the Taskcluster systems upstream of those. Ask for help in the IRC channel
`#taskcluster`.

Просмотреть файл

@ -2,7 +2,7 @@
For ingestion from **Pulse** exchanges, on your local machine, you can choose
to ingest from any exchange you like. Some exchanges will be registered in
`sources.py` for use by the Treeherder servers. You can use those to get the
`settings.py` for use by the Treeherder servers. You can use those to get the
same data as Treeherder. Or you can specify your own and experiment with
posting your own data.
@ -32,7 +32,8 @@ string would be:
### 3. Read Pushes
On your localhost set your Pulse config environment variable:
On the **host machine**, set your Pulse config environment variable, so that it's available
for docker-compose to use:
```bash
export PULSE_URL="amqp://foo:bar@pulse.mozilla.org:5671/?ssl=1"
@ -98,18 +99,23 @@ push_sources = [
#### Jobs
Job Exchanges and Projects are defined in `job_sources`, however, it can
also be configured in the environment by using the `PULSE_JOB_SOURCES` environment variables.
This defines a list of exchanges with projects.
Job Exchanges and Projects are defined in `job_sources`, however can
also be configured in the environment like so:
`PULSE_JOB_SOURCES` defines a list of exchanges with projects.
```bash
export PULSE_JOB_SOURCES="exchange/taskcluster-queue/v1/task-pending.#,exchange/taskcluster-queue/v1/task-completed.#",
export PULSE_JOB_SOURCES="exchange/taskcluster-treeherder/v1/jobs.mozilla-central:mozilla-inbound",
```
In this example we've defined two exchanges:
In this example we've defined one exchange:
- `exchange/taskcluster-queue/v1/task-pending`
- `exchange/taskcluster-queue/v1/task-completed`
- `exchange/taskcluster-treeherder/v1/jobs`
The taskcluster-treeherder exchange defines two projects:
- `mozilla-central`
- `mozilla-inbound`
When Jobs are read from Pulse and added to Treeherder's celery queue we generate a routing key by prepending `#.` to each project key.

Просмотреть файл

@ -122,7 +122,7 @@ import yaml
import jsonschema
with open('schemas/text-log-summary-artifact.yml') as f:
schema = yaml.load(f, Loader=yaml.FullLoader)
schema = yaml.load(f)
jsonschema.validate(data, schema)
```

Просмотреть файл

@ -1,99 +0,0 @@
""" Script to compare two pushes from different Treeherder instances"""
import logging
import pprint
import uuid
import slugid
from deepdiff import DeepDiff
from thclient import TreeherderClient
logging.basicConfig()
logger = logging.getLogger(__name__).setLevel(logging.DEBUG)
def remove_some_attributes(job, remote_job):
# I belive these differences are expected since they are dependant to when the data
# was inserted inside of the database
del job["build_platform_id"]
del job["id"]
del job["job_group_id"]
del job["job_type_id"]
del job["last_modified"]
del job["push_id"]
del job["result_set_id"]
del remote_job["build_platform_id"]
del remote_job["id"]
del remote_job["job_group_id"]
del remote_job["job_type_id"]
del remote_job["last_modified"]
del remote_job["push_id"]
del remote_job["result_set_id"]
if job.get("end_timestamp"):
del job["end_timestamp"]
del job["start_timestamp"]
del remote_job["end_timestamp"]
del remote_job["start_timestamp"]
if job.get("failure_classification_id"):
del job["failure_classification_id"]
del remote_job["failure_classification_id"]
def print_url_to_taskcluster(job_guid):
job_guid = job["job_guid"]
(decoded_task_id, retry_id) = job_guid.split("/")
# As of slugid v2, slugid.encode() returns a string not bytestring under Python 3.
taskId = slugid.encode(uuid.UUID(decoded_task_id))
print("https://taskcluster-ui.herokuapp.com/tasks/{}".format(taskId))
if __name__ == "__main__":
# XXX: This script should take arguments instead being hardcoded
# http://localhost:5000/#/jobs?repo=mozilla-central&revision=eb7f4d56f54b3283fc15983ee859b5e62fcb9f3b
local = TreeherderClient(server_url="http://localhost:8000")
local_jobs = local.get_jobs("mozilla-central", push_id=8717, count=None)
# https://treeherder.mozilla.org/#/jobs?repo=mozilla-central&revision=eb7f4d56f54b3283fc15983ee859b5e62fcb9f3b
remote = TreeherderClient("https://treeherder.mozilla.org")
remote_jobs = remote.get_jobs("mozilla-central", push_id=516192, count=None)
remote_dict = {}
for job in remote_jobs:
remote_dict[job["job_guid"]] = job
local_dict = {}
local_not_found = []
for job in local_jobs:
remote_job = remote_dict.get(job["job_guid"])
if remote_job is None:
local_not_found.append(job)
else:
# You can use this value in a url with &selectedJob=
jobId = job["id"]
remove_some_attributes(job, remote_job)
differences = DeepDiff(job, remote_dict[job["job_guid"]])
if differences:
pprint.pprint(differences)
print(jobId)
else:
# Delete jobs that don"t have any differences
del remote_dict[job["job_guid"]]
print("We have found: {} jobs on the local instance.".format(len(local_jobs)))
print("We have found: {} jobs on the remote instance.".format(len(remote_jobs)))
if remote_dict:
print("There are the first 10 remote jobs we do not have locally. Follow the link to investigate.")
for job in list(remote_dict.values())[0:10]:
print_url_to_taskcluster(job["job_guid"])
if local_not_found:
print("Number of jobs not found locally: {} jobs".format(len(local_not_found)))
for job in local_not_found:
print_url_to_taskcluster(job["job_guid"])
if remote_dict is None and local_not_found is None:
print("We have not found any differences between the two pushes!! :D")

Просмотреть файл

@ -229,30 +229,7 @@ idna==2.8 \
--hash=sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407
# required for taskcluster
slugid==2.0.0 \
--hash=sha256:aec8b0e01c4ad32e38e12d609eab3ec912fd129aaf6b2ded0199b56a5f8fd67c \
--hash=sha256:a950d98b72691178bdd4d6c52743c4a2aa039207cf7a97d71060a111ff9ba297
taskcluster==7.0.1 \
--hash=sha256:97e19674738515e6891f4d7928280aecb2686def8b9d72a15477d7297f34c18c
aiohttp==3.5.4 \
--hash=sha256:00d198585474299c9c3b4f1d5de1a576cc230d562abc5e4a0e81d71a20a6ca55
mohawk==1.0.0 \
--hash=sha256:fca4e34d8f5492f1c33141c98b96e168a089e5692ce65fb747e4bb613f5fe552
async-timeout==3.0.1 \
--hash=sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3
multidict==4.5.2 \
--hash=sha256:c18498c50c59263841862ea0501da9f2b3659c00db54abfbf823a80787fde8ce
yarl==1.3.0 \
--hash=sha256:024ecdc12bc02b321bc66b41327f930d1c2c543fa9a561b39861da9388ba7aa9
taskcluster-urls==11.0.0 \
--hash=sha256:2aceab7cf5b1948bc197f2e5e50c371aa48181ccd490b8bada00f1e3baf0c5cc
slugid==2.0.0 --hash=sha256:aec8b0e01c4ad32e38e12d609eab3ec912fd129aaf6b2ded0199b56a5f8fd67c --hash=sha256:a950d98b72691178bdd4d6c52743c4a2aa039207cf7a97d71060a111ff9ba297
graphene-django==2.3.2 \
--hash=sha256:7720a459da5bc99fba251f697c4d41858612bf1a36096326af86739dd31705f3 \

Просмотреть файл

@ -148,7 +148,3 @@ zope.deprecation==4.4.0 \
--hash=sha256:0d453338f04bacf91bbfba545d8bcdf529aa829e67b705eac8c1a7fdce66e2df
zope.proxy==4.3.2 \
--hash=sha256:ab6d6975d9c51c13cac828ff03168de21fb562b0664c59bcdc4a4b10f39a5b17
# To test async code
pytest-asyncio==0.10.0 \
--hash=sha256:d734718e25cfc32d2bf78d346e99d33724deeba774cc4afdf491530c6184b63b

Просмотреть файл

@ -14,18 +14,8 @@ properties:
pattern: '^[A-Za-z0-9/+-]+$'
minLength: 1
maxLength: 50
realTaskId:
title: 'Real task ID'
description: |
On a follow PR we will replace taskId with realTaskId and have a jobGuid instead of the
current taskId
type: 'string'
# From TaskCluster https://github.com/taskcluster/taskcluster/blob/master/services/queue/schemas/constants.yml
pattern: '^[A-Za-z0-9_-]{8}[Q-T][A-Za-z0-9_-][CGKOSWaeimquy26-][A-Za-z0-9_-]{10}[AQgw]$'
minLength: 1
maxLength: 50
runId:
title: 'Run ID'
retryId:
title: 'Retry ID'
description: |
The infrastructure retry iteration on this job. The number of times this
job has been retried by the infrastructure.
@ -386,6 +376,11 @@ properties:
extra:
type: 'object'
description: Extra information that Treeherder reads on a best-effort basis
version:
type: 'integer'
description: Message version
enum:
- 1
additionalProperties: false
required:
@ -395,6 +390,7 @@ required:
- display
- state
- jobKind
- version
definitions:
machine:

Просмотреть файл

@ -1,134 +0,0 @@
$schema: 'http://json-schema.org/draft-07/schema#'
title: 'Treeherder Configuration'
description: |
Definition of the Treeherder configuration data that can be contained within
a task definition under task.extra.treeherder. This information is useful for
determining job properties to report to Treeherder.
type: object
properties:
reason:
description: |
Examples include:
- scheduled
- scheduler
- Self-serve: Rebuilt by foo@example.com
- Self-serve: Requested by foo@example.com
- The Nightly scheduler named 'mozilla-inbound periodic' triggered this build
- unknown
type: 'string'
minLength: 1
maxLength: 125
tier:
type: 'integer'
description: |
Tiers are used for classifying jobs according to the Sheriffing policy.
These jobs can be hidden based on exclusion profiles within Treeherder and
display of these jobs toggled by UI settings.
By default jobs which do not specify a tier will be classified as Tier 1.
minimum: 1
maximum: 3
jobKind:
type: 'string'
description: |
jobKind specifies the type of task that should be reported to Treeherder.
The jobKind could cause Treeherder to display/treat the task differently.
For instance, tasks with a jobKind of 'build' will be reported as red when
the task fails, 'test' as orange, and any jobs not specifying jobKind or
'other' will be red.
default: 'other'
enum:
- build
- test
- other
machine:
type: 'object'
properties:
platform:
type: 'string'
description: |
The platform specified here maps to platforms that Treeherder recognizes.
Jobs with the same platform will be displayed within the same row on
Treeherder and obey any ordering that is defined'.
If no build platform is specified, the workerType specified for the job
will be used.
pattern: '^[A-Za-z0-9_-]+$'
minLength: 1
maxLength: 50
os:
type: 'string'
pattern: '^[A-Za-z0-9_-]+$'
minLength: 1
maxLength: 25
architecture:
type: 'string'
pattern: '^[A-Za-z0-9_-]+$'
minLength: 1
maxLength: 25
additionalProperties: false
required: [platform]
labels:
title: 'labels'
description: |
Labels are a dimension of a platform. The values here can vary wildly,
so most strings are valid for this. The list of labels that are used
is malleable going forward.
These were formerly known as "Collection" calling labels now so they
can be understood to be just strings that denotes a characteristic of the job.
These labels will be used for grouping jobs with a particular job platform.
For instance, a job with the label "debug" will be put into the debug platform
on Treeherder. By default, if no label is specified, the job will be classified
as "opt"
Some examples of labels that have been used:
opt Optimize Compiler GCC optimize flags
debug Debug flags passed in
pgo Profile Guided Optimization - Like opt, but runs with profiling, then builds again using that profiling
asan Address Sanitizer
tsan Thread Sanitizer Build
type: 'array'
uniqueItems: false
items:
type: 'string'
minLength: 1
maxLength: 50
pattern: '^[A-Za-z0-9_-]+$'
symbol:
title: 'symbol'
description: |
This is the symbol that will appear in a Treeherder resultset for a
given push. This symbol could be something such as "B" or a number representing
the current chunk.
type: 'string'
minLength: 0
maxLength: 25
groupName:
title: 'group name'
type: 'string'
minLength: 1
maxLength: 100
groupSymbol:
title: 'group symbol'
description: |
Group Symbol is the symbol that job symbols will be grouped under. This
is useful if there is a particular group of jobs that should be displayed
together. For example, a test suite named "Media Tests" with the group symbol
of "ME" would have all jobs with that group symbol appear as
ME(symbol 1, symbol 2, ...).
type: 'string'
minLength: 1
maxLength: 25
productName:
description: |
Examples include:
- 'firefox'
- 'taskcluster'
- 'xulrunner'
type: 'string'
minLength: 1
maxLength: 125
required: [symbol]
additionalProperties: true

Просмотреть файл

@ -1,11 +1,9 @@
import copy
import pytest
import responses
from treeherder.etl.exceptions import MissingPushException
from treeherder.etl.job_loader import JobLoader
from treeherder.etl.taskcluster_pulse.handler import handleMessage
from treeherder.model.models import (Job,
JobDetail,
JobLog,
@ -37,48 +35,6 @@ def transformed_pulse_jobs(sample_data, test_repository):
return jobs
def mock_artifact(taskId, runId, artifactName):
# Mock artifact with empty body
baseUrl = "https://queue.taskcluster.net/v1/task/{taskId}/runs/{runId}/artifacts/{artifactName}"
responses.add(
responses.GET,
baseUrl.format(taskId=taskId, runId=runId, artifactName=artifactName),
body="",
content_type='text/plain',
status=200)
@pytest.fixture
async def new_pulse_jobs(sample_data, test_repository, push_stored):
revision = push_stored[0]["revisions"][0]["revision"]
pulseMessages = copy.deepcopy(sample_data.taskcluster_pulse_messages)
tasks = copy.deepcopy(sample_data.taskcluster_tasks)
jobs = []
# Over here we transform the Pulse messages into the intermediary taskcluster-treeherder
# generated messages
for message in list(pulseMessages.values()):
taskId = message["payload"]["status"]["taskId"]
task = tasks[taskId]
# If we pass task to handleMessage we won't hit the network
taskRuns = await handleMessage(message, task)
# handleMessage returns [] when it is a task that is not meant for Treeherder
for run in reversed(taskRuns):
mock_artifact(taskId, run["runId"], "public/logs/live_backing.log")
run["origin"]["project"] = test_repository.name
run["origin"]["revision"] = revision
jobs.append(run)
return jobs
@pytest.fixture
def new_transformed_jobs(sample_data, test_repository, push_stored):
revision = push_stored[0]["revisions"][0]["revision"]
jobs = copy.deepcopy(sample_data.taskcluster_transformed_jobs)
for job in jobs.values():
job["revision"] = revision
return jobs
def test_job_transformation(pulse_jobs, transformed_pulse_jobs):
import json
jl = JobLoader()
@ -87,17 +43,6 @@ def test_job_transformation(pulse_jobs, transformed_pulse_jobs):
assert transformed_pulse_jobs[idx] == json.loads(json.dumps(jl.transform(pulse_job)))
@responses.activate
def test_new_job_transformation(new_pulse_jobs, new_transformed_jobs, failure_classifications):
jl = JobLoader()
for message in new_pulse_jobs:
taskId = message["realTaskId"]
transformed_job = jl.process_job(message)
# Not all messages from Taskcluster will be processed
if transformed_job:
assert new_transformed_jobs[taskId] == transformed_job
def test_ingest_pulse_jobs(pulse_jobs, test_repository, push_stored,
failure_classifications, mock_log_parser):
"""

Просмотреть файл

@ -1,7 +1,7 @@
import jsonschema
import pytest
from treeherder.etl.schema import get_json_schema
from treeherder.etl.schema import job_json_schema
# The test data in this file are a representative sample-set from
# production Treeherder
@ -16,7 +16,7 @@ def test_group_symbols(sample_data, group_symbol):
job["origin"]["project"] = "proj"
job["origin"]["revision"] = "1234567890123456789012345678901234567890"
job["display"]["groupSymbol"] = group_symbol
jsonschema.validate(job, get_json_schema("pulse-job.yml"))
jsonschema.validate(job, job_json_schema)
@pytest.mark.parametrize("job_symbol", ['1.1g', '1g', '20', 'A', 'GBI10', 'en-US-1'])
@ -28,4 +28,4 @@ def test_job_symbols(sample_data, job_symbol):
job["origin"]["project"] = "proj"
job["origin"]["revision"] = "1234567890123456789012345678901234567890"
job["display"]["jobSymbol"] = job_symbol
jsonschema.validate(job, get_json_schema("pulse-job.yml"))
jsonschema.validate(job, job_json_schema)

Просмотреть файл

@ -42,6 +42,8 @@
"timeCompleted": "2014-12-19T18:39:57-08:00",
"labels": ["debug"],
"version": 1,
"logs": [
{
"url": "http://ftp.mozilla.org/pub/mozilla.org/spidermonkey/tinderbox-builds/mozilla-inbound-linux64/mozilla-inbound_linux64_spidermonkey-warnaserr-bm57-build1-build352.txt.gz",
@ -66,7 +68,7 @@
{
"taskId": "5c909b6d-143a-4a9f-942f-594bfadc399e/0",
"runId": 0,
"retryId": 0,
"origin": {
"kind": "hg.mozilla.org",
"project": "set by test",
@ -100,12 +102,13 @@
"timeScheduled": "2014-12-19T16:39:57-08:00",
"timeStarted": "2014-12-19T17:39:57-08:00",
"timeCompleted": "2014-12-19T18:39:57-08:00",
"labels": ["debug"]
"labels": ["debug"],
"version": 1
},
{
"taskId": "5c909b6d-143a-4a9f-942f-594bfadc399e/1",
"runId": 1,
"retryId": 1,
"origin": {
"kind": "hg.mozilla.org",
"project": "set by test",
@ -139,12 +142,13 @@
"timeScheduled": "2014-12-19T16:39:57-08:00",
"timeStarted": "2014-12-19T17:39:57-08:00",
"timeCompleted": "2014-12-19T18:39:57-08:00",
"labels": ["debug"]
"labels": ["debug"],
"version": 1
},
{
"taskId": "66c4b325-0bb7-43ba-b631-f7a120103329/0",
"runId": 0,
"retryId": 0,
"isRetried": true,
"origin": {
"kind": "hg.mozilla.org",
@ -186,6 +190,8 @@
"timeStarted": "2014-12-19T17:39:57-08:00",
"timeCompleted": "2014-12-19T18:39:57-08:00",
"version": 1,
"logs": [
{
"url": "http://ftp.mozilla.org/pub/mozilla.org/spidermonkey/tinderbox-builds/mozilla-inbound-linux64/mozilla-inbound_linux64_spidermonkey-warnaserr-bm57-build1-build352.txt.gz",
@ -237,7 +243,7 @@
},
{
"taskId": "6c8cd566-df63-4102-a0bd-0603ddad8743/3",
"runId": 3,
"retryId": 3,
"origin": {
"kind": "hg.mozilla.org",
"project": "set by test",
@ -278,6 +284,8 @@
"timeStarted": "2014-12-19T17:39:57-08:00",
"timeCompleted": "2014-12-19T18:39:57-08:00",
"version": 1,
"logs": [
{
"url": "http://ftp.mozilla.org/pub/mozilla.org/spidermonkey/tinderbox-builds/mozilla-inbound-linux64/mozilla-inbound_linux64_spidermonkey-warnaserr-bm57-build1-build352.txt.gz",

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -31,18 +31,6 @@ class SampleData:
os.path.dirname(__file__))) as f:
self.text_log_summary = json.load(f)
with open("{0}/sample_data/pulse_consumer/taskcluster_pulse_messages.json".format(
os.path.dirname(__file__))) as f:
self.taskcluster_pulse_messages = json.load(f)
with open("{0}/sample_data/pulse_consumer/taskcluster_tasks.json".format(
os.path.dirname(__file__))) as f:
self.taskcluster_tasks = json.load(f)
with open("{0}/sample_data/pulse_consumer/taskcluster_transformed_jobs.json".format(
os.path.dirname(__file__))) as f:
self.taskcluster_transformed_jobs = json.load(f)
with open("{0}/sample_data/pulse_consumer/job_data.json".format(
os.path.dirname(__file__))) as f:
self.pulse_jobs = json.load(f)

Просмотреть файл

@ -8,7 +8,7 @@ from .utils import create_and_destroy_exchange
def test_bind_to(pulse_connection, pulse_exchange):
job_consumer = JobConsumer(pulse_connection)
exchange = pulse_exchange("exchange/taskcluster-queue/v1/task-running", create_exchange=True)
exchange = pulse_exchange("exchange/taskcluster-treeherder/v1/jobs", create_exchange=True)
routing_key = "test_routing_key"
binding = bind_to(job_consumer, exchange, routing_key)
@ -28,7 +28,7 @@ def test_bind_to(pulse_connection, pulse_exchange):
def test_prepare_consumer(pulse_connection, pulse_exchange):
# create the exchange in the local RabbitMQ instance
job_exchange = "exchange/taskcluster-queue/v1/task-running"
job_exchange = "exchange/taskcluster-treeherder/v1/jobs"
with create_and_destroy_exchange(pulse_connection, job_exchange):
job_consumer = prepare_consumer(
pulse_connection,
@ -42,7 +42,7 @@ def test_prepare_consumer(pulse_connection, pulse_exchange):
queue = job_consumer.consumers[0]["queues"]
assert queue.routing_key == "foo.test_project"
assert queue.exchange.name == "exchange/taskcluster-queue/v1/task-running"
assert queue.exchange.name == "exchange/taskcluster-treeherder/v1/jobs"
push_exchange = "exchange/taskcluster-github/v1/push"
with create_and_destroy_exchange(pulse_connection, push_exchange):

Просмотреть файл

@ -37,3 +37,18 @@ def test_retry_missing_revision_succeeds(sample_data, sample_push,
assert Job.objects.count() == 1
assert Job.objects.values()[0]["guid"] == job["taskId"]
assert thread_data.retries == 1
def test_retry_missing_revision_never_succeeds(sample_data, test_repository,
mock_log_parser, monkeypatch):
"""
Ensure that when the missing push exists after a retry, that the job
is then ingested.
"""
job = sample_data.pulse_jobs[0]
job["origin"]["project"] = test_repository.name
with pytest.raises(MissingPushException):
store_pulse_jobs.delay(job, "foo", "bar")
assert Job.objects.count() == 0

Просмотреть файл

@ -8,7 +8,7 @@ import slugid
from treeherder.etl.common import to_timestamp
from treeherder.etl.exceptions import MissingPushException
from treeherder.etl.jobs import store_job_data
from treeherder.etl.schema import get_json_schema
from treeherder.etl.schema import job_json_schema
from treeherder.model.models import (Push,
Repository)
@ -55,9 +55,7 @@ class JobLoader:
if pulse_job["state"] != "unscheduled":
try:
self.validate_revision(repository, pulse_job)
transformed_job = self.transform(pulse_job)
store_job_data(repository, [transformed_job])
return transformed_job
store_job_data(repository, [self.transform(pulse_job)])
except AttributeError:
logger.warning("Skipping job due to bad attribute", exc_info=1)
except Repository.DoesNotExist:
@ -133,7 +131,6 @@ class JobLoader:
platform_src = pulse_job[v] if v in pulse_job else default_platform
x["job"][k] = self._get_platform(platform_src)
# TODO: Improve the code https://bugzilla.mozilla.org/show_bug.cgi?id=1560596
# add some taskcluster metadata if it's available
# currently taskcluster doesn't pass the taskId directly, so we'll
# derive it from the guid, where it is stored in uncompressed
@ -310,7 +307,7 @@ class JobLoader:
def _is_valid_job(self, pulse_job):
try:
jsonschema.validate(pulse_job, get_json_schema("pulse-job.yml"))
jsonschema.validate(pulse_job, job_json_schema)
except (jsonschema.ValidationError, jsonschema.SchemaError) as e:
logger.error("JSON Schema validation error during job ingestion: %s", e)
return False

Просмотреть файл

@ -1,4 +1,3 @@
import copy
import logging
import os
import time
@ -373,7 +372,7 @@ def _schedule_log_parsing(job, job_logs, result):
args=[job.id, job_log_ids, priority])
def store_job_data(repository, originalData):
def store_job_data(repository, data):
"""
Store job data instances into jobs db
@ -430,7 +429,6 @@ def store_job_data(repository, originalData):
]
"""
data = copy.deepcopy(originalData)
# Ensure that we have job data to process
if not data:
return

Просмотреть файл

@ -1,142 +0,0 @@
import asyncio
import logging
import aiohttp
import taskcluster
import taskcluster.aio
from django.conf import settings
from django.core.management.base import BaseCommand
from treeherder.etl.job_loader import JobLoader
from treeherder.etl.pushlog import HgPushlogProcess
from treeherder.etl.taskcluster_pulse.handler import (EXCHANGE_EVENT_MAP,
handleMessage)
from treeherder.model.models import Repository
logger = logging.getLogger(__name__)
rootUrl = "https://taskcluster.net"
options = {"rootUrl": rootUrl}
loop = asyncio.get_event_loop()
# Limiting the connection pool just in case we have too many
conn = aiohttp.TCPConnector(limit=10)
# Remove default timeout limit of 5 minutes
timeout = aiohttp.ClientTimeout(total=0)
session = taskcluster.aio.createSession(loop=loop, connector=conn, timeout=timeout)
asyncQueue = taskcluster.aio.Queue(options, session=session)
stateToExchange = {}
for key, value in EXCHANGE_EVENT_MAP.items():
stateToExchange[value] = key
async def handleTaskId(taskId):
results = await asyncio.gather(asyncQueue.status(taskId), asyncQueue.task(taskId))
await handleTask({
"status": results[0]["status"],
"task": results[1],
})
async def handleTask(task):
taskId = task["status"]["taskId"]
runs = task["status"]["runs"]
# If we iterate in order of the runs, we will not be able to mark older runs as
# "retry" instead of exception
for run in reversed(runs):
message = {
"exchange": stateToExchange[run["state"]],
"payload": {
"status": {
"taskId": taskId,
"runs": runs,
},
"runId": run["runId"],
}
}
try:
taskRuns = await handleMessage(message, task["task"])
if taskRuns:
for run in taskRuns:
logger.info("Loading into DB:\t%s/%s", taskId, run["runId"])
# XXX: This seems our current bottleneck
JobLoader().process_job(run)
except Exception as e:
logger.exception(e)
async def fetchGroupTasks(taskGroupId):
tasks = []
query = {}
continuationToken = ""
while True:
if continuationToken:
query = {"continuationToken": continuationToken}
response = await asyncQueue.listTaskGroup(taskGroupId, query=query)
tasks.extend(response['tasks'])
continuationToken = response.get('continuationToken')
if continuationToken is None:
break
logger.info('Requesting more tasks. %s tasks so far...', len(tasks))
return tasks
async def processTasks(taskGroupId):
tasks = await fetchGroupTasks(taskGroupId)
asyncTasks = []
logger.info("We have %s tasks to process", len(tasks))
for task in tasks:
asyncTasks.append(asyncio.create_task(handleTask(task)))
await asyncio.gather(*asyncTasks)
class Command(BaseCommand):
"""Management command to ingest data from a single push."""
help = "Ingests a single push and tasks into Treeherder"
def add_arguments(self, parser):
parser.add_argument(
"--project",
help="repository to query"
)
parser.add_argument(
"--changeset",
nargs="?",
help="changeset to import"
)
parser.add_argument(
"--task-id",
dest="taskId",
nargs="?",
help="taskId to ingest"
)
def handle(self, *args, **options):
taskId = options["taskId"]
if taskId:
loop.run_until_complete(handleTaskId(taskId))
else:
project = options["project"]
changeset = options["changeset"]
# get reference to repo
repo = Repository.objects.get(name=project, active_status='active')
fetch_push_id = None
# make sure all tasks are run synchronously / immediately
settings.CELERY_TASK_ALWAYS_EAGER = True
# get hg pushlog
pushlog_url = '%s/json-pushes/?full=1&version=2' % repo.url
# ingest this particular revision for this project
process = HgPushlogProcess()
# Use the actual push SHA, in case the changeset specified was a tag
# or branch name (eg tip). HgPushlogProcess returns the full SHA.
process.run(pushlog_url, project, changeset=changeset, last_push_id=fetch_push_id)
# XXX: Need logic to get from project/revision to taskGroupId
taskGroupId = 'ZYnMSfwCS5Cc_Wi_e-ZlSA'
logger.info("## START ##")
loop.run_until_complete(processTasks(taskGroupId))
logger.info("## END ##")

Просмотреть файл

@ -1,69 +0,0 @@
import json
import logging
import os
from django.core.management.base import BaseCommand
from treeherder.etl.job_loader import JobLoader
from treeherder.etl.taskcluster_pulse.handler import (fetchTask,
handleMessage)
from treeherder.services.pulse import (UpdateJobFixtures,
job_sources,
prepare_consumer,
pulse_conn)
logger = logging.getLogger(__name__)
tests_path = os.path.join('tests', 'sample_data', 'pulse_consumer')
class Command(BaseCommand):
"""
Management command to read jobs from Pulse and store it as test fixtures
"""
help = "Read jobs and store it as test fixtures."
def handle(self, *args, **options):
UpdateJobFixtures.maxMessages = 100
self.stdout.write("The Pulse consumer will consume {number} messages".format(number=UpdateJobFixtures.maxMessages))
with pulse_conn as connection:
consumer = prepare_consumer(
connection,
UpdateJobFixtures,
job_sources,
lambda key: "#.{}".format(key),
)
try:
consumer.run()
except Exception:
tc_messages = {}
tc_tasks = {}
th_jobs = {}
jl = JobLoader()
for message in consumer.messages:
taskId = message["payload"]["status"]["taskId"]
task = fetchTask(taskId)
runs = handleMessage(message, task)
for run in runs:
try:
th_jobs[taskId] = jl.transform(run)
tc_messages[taskId] = message
tc_tasks[taskId] = task
except Exception:
logger.info('Issue validating this message: %s', run)
logger.info("Updating Taskcluster jobs: %s entries", len(tc_messages))
with open(os.path.join(tests_path, 'taskcluster_pulse_messages.json'), 'w') as fh:
# Write new line at the end to satisfy prettier
fh.write(json.dumps(tc_messages, sort_keys=True, indent=2) + "\n")
logger.info("Updating Taskcluster task: %s entries", len(tc_tasks))
with open(os.path.join(tests_path, 'taskcluster_tasks.json'), 'w') as fh:
# Write new line at the end to satisfy prettier
fh.write(json.dumps(tc_tasks, sort_keys=True, indent=2) + "\n")
logger.info("Updating transformed messages: %s entries", len(th_jobs))
with open(os.path.join(tests_path, 'taskcluster_transformed_jobs.json'), 'w') as fh:
# Write new line at the end to satisfy prettier
fh.write(json.dumps(th_jobs, sort_keys=True, indent=2) + "\n")
self.stdout.write("Pulse Job listening stopped...")

Просмотреть файл

@ -10,5 +10,8 @@ def get_json_schema(filename):
"""
file_path = os.path.join("schemas", filename)
with open(file_path) as f:
schema = yaml.load(f, Loader=yaml.FullLoader)
schema = yaml.load(f)
return schema
job_json_schema = get_json_schema("pulse-job.yml")

Просмотреть файл

Просмотреть файл

@ -1,340 +0,0 @@
# Code imported from https://github.com/taskcluster/taskcluster/blob/32629c562f8d6f5a6b608a3141a8ee2e0984619f/services/treeherder/src/handler.js
import asyncio
import logging
import os
import jsonschema
import slugid
import taskcluster
import taskcluster.aio
import taskcluster_urls
from treeherder.etl.schema import get_json_schema
from treeherder.etl.taskcluster_pulse.parse_route import parseRoute
logger = logging.getLogger(__name__)
root_url = "https://taskcluster.net"
options = {"rootUrl": root_url}
loop = asyncio.get_event_loop()
session = taskcluster.aio.createSession(loop=loop)
asyncQueue = taskcluster.aio.Queue(options, session=session)
# Build a mapping from exchange name to task status
EXCHANGE_EVENT_MAP = {
"exchange/taskcluster-queue/v1/task-pending": "pending",
"exchange/taskcluster-queue/v1/task-running": "running",
"exchange/taskcluster-queue/v1/task-completed": "completed",
"exchange/taskcluster-queue/v1/task-failed": "failed",
"exchange/taskcluster-queue/v1/task-exception": "exception",
}
class PulseHandlerError(Exception):
"""Base error"""
pass
def stateFromRun(jobRun):
return "completed" if jobRun["state"] in ("exception", "failed") else jobRun["state"]
def resultFromRun(jobRun):
RUN_TO_RESULT = {
"completed": "success",
"failed": "fail",
}
state = jobRun["state"]
if state in list(RUN_TO_RESULT.keys()):
return RUN_TO_RESULT[state]
elif state == "exception":
reasonResolved = jobRun.get("reasonResolved")
if reasonResolved in ["canceled", "superseded"]:
return reasonResolved
return "exception"
else:
return "unknown"
# Creates a log entry for Treeherder to retrieve and parse. This log is
# displayed on the Treeherder Log Viewer once parsed.
def createLogReference(taskId, runId):
logUrl = taskcluster_urls.api(
root_url,
"queue",
"v1",
"task/{taskId}/runs/{runId}/artifacts/public/logs/live_backing.log"
).format(taskId=taskId, runId=runId)
return {
# XXX: This is a magical name see 1147958 which enables the log viewer.
"name": "builds-4h",
"url": logUrl,
}
# Filters the task routes for the treeherder specific route. Once found,
# the route is parsed into distinct parts used for constructing the
# Treeherder job message.
# TODO: Refactor https://bugzilla.mozilla.org/show_bug.cgi?id=1560596
def parseRouteInfo(prefix, taskId, routes, task):
matchingRoutes = list(filter(lambda route: route.split(".")[0] == "tc-treeherder", routes))
if len(matchingRoutes) != 1:
raise PulseHandlerError(
"Could not determine Treeherder route. Either there is no route, " +
"or more than one matching route exists." +
"Task ID: {taskId} Routes: {routes}".format(taskId=taskId, routes=routes)
)
parsedRoute = parseRoute(matchingRoutes[0])
return parsedRoute
def validateTask(task):
treeherderMetadata = task.get("extra", {}).get("treeherder")
if not treeherderMetadata:
logger.debug("Task metadata is missing Treeherder job configuration.")
return False
try:
jsonschema.validate(treeherderMetadata, get_json_schema("task-treeherder-config.yml"))
except (jsonschema.ValidationError, jsonschema.SchemaError) as e:
logger.error("JSON Schema validation error during Taskcluser message ingestion: %s", e)
return False
return True
# Listens for Task event messages and invokes the appropriate handler
# for the type of message received.
# Only messages that contain the properly formatted routing key and contains
# treeherder job information in task.extra.treeherder are accepted
# This will generate a list of messages that need to be ingested by Treeherder
async def handleMessage(message, taskDefinition=None):
jobs = []
taskId = message["payload"]["status"]["taskId"]
task = (await asyncQueue.task(taskId)) if not taskDefinition else taskDefinition
try:
parsedRoute = parseRouteInfo("tc-treeherder", taskId, task["routes"], task)
except PulseHandlerError as e:
logger.warning("%s", str(e))
return jobs
logger.debug("Message received for task %s with route %s", taskId, str(task["routes"])[0:100])
# Validation failures are common and logged, so do nothing more.
if not validateTask(task):
return jobs
taskType = EXCHANGE_EVENT_MAP.get(message["exchange"])
# Originally this code was only within the "pending" case, however, in order to support
# ingesting all tasks at once which might not have "pending" case
# If the job is an automatic rerun we mark the previous run as "retry"
# This will only work if the previous run has not yet been processed by Treeherder
# since _remove_existing_jobs() will prevent it
if message["payload"]["runId"] > 0:
jobs.append(await handleTaskRerun(parsedRoute, task, message["payload"]))
if not taskType:
raise Exception("Unknown exchange: {exchange}".format(exchange=message["exchange"]))
elif taskType == "pending":
jobs.append(handleTaskPending(parsedRoute, task, message["payload"]))
elif taskType == "running":
jobs.append(handleTaskRunning(parsedRoute, task, message["payload"]))
elif taskType in ("completed", "failed"):
jobs.append(await handleTaskCompleted(parsedRoute, task, message["payload"]))
elif taskType == "exception":
jobs.append(await handleTaskException(parsedRoute, task, message["payload"]))
return jobs
# Builds the basic Treeherder job message that's universal for all
# messsage types.
#
# Specific handlers for each message type will add/remove information necessary
# for the type of task event..
def buildMessage(pushInfo, task, runId, message):
taskId = message["status"]["taskId"]
jobRun = message["status"]["runs"][runId]
treeherderConfig = task["extra"]["treeherder"]
job = {
"buildSystem": "taskcluster",
"owner": task["metadata"]["owner"],
"taskId": "{taskId}/{runId}".format(taskId=slugid.decode(taskId), runId=runId),
# TODO: In a follow up we will use the right name - bug 1560596
"realTaskId": taskId,
"runId": runId,
"isRetried": False,
"display": {
# jobSymbols could be an integer (i.e. Chunk ID) but need to be strings
# for treeherder
"jobSymbol": str(treeherderConfig["symbol"]),
"groupSymbol": treeherderConfig.get("groupSymbol", "?"),
# Maximum job name length is 100 chars...
"jobName": task["metadata"]["name"][0:99],
},
"state": stateFromRun(jobRun),
"result": resultFromRun(jobRun),
"tier": treeherderConfig.get("tier", 1),
"timeScheduled": task["created"],
"jobKind": treeherderConfig.get("jobKind", "other"),
"reason": treeherderConfig.get("reason", "scheduled"),
"jobInfo": {
"links": [],
"summary": task["metadata"]["description"],
},
}
job["origin"] = {
"kind": pushInfo["origin"],
"project": pushInfo["project"],
"revision": pushInfo["revision"],
}
if pushInfo["origin"] == "hg.mozilla.org":
job["origin"]["pushLogID"] = pushInfo["id"]
else:
job["origin"]["pullRequestID"] = pushInfo["id"]
job["origin"]["owner"] = pushInfo["owner"]
# Transform "collection" into an array of labels if task doesn't
# define "labels".
labels = treeherderConfig.get("labels", [])
if not labels:
if not treeherderConfig.get("collection"):
labels = ["opt"]
else:
labels = list(treeherderConfig["collection"].keys())
job["labels"] = labels
machine = treeherderConfig.get("machine", {})
job["buildMachine"] = {
"name": jobRun.get("workerId", "unknown"),
"platform": machine.get("platform", task["workerType"]),
"os": machine.get("os", "-"),
"architecture": machine.get("architecture", "-"),
}
if treeherderConfig.get("productName"):
job["productName"] = treeherderConfig["productName"]
if treeherderConfig.get("groupName"):
job["display"]["groupName"] = treeherderConfig["groupName"]
return job
def handleTaskPending(pushInfo, task, message):
return buildMessage(pushInfo, task, message["runId"], message)
async def handleTaskRerun(pushInfo, task, message):
job = buildMessage(pushInfo, task, message["runId"]-1, message)
job["state"] = "completed"
job["result"] = "fail"
job["isRetried"] = True
# reruns often have no logs, so in the interest of not linking to a 404'ing artifact,
# don't include a link
job["logs"] = []
job = await addArtifactUploadedLinks(
message["status"]["taskId"],
message["runId"]-1,
job)
return job
def handleTaskRunning(pushInfo, task, message):
job = buildMessage(pushInfo, task, message["runId"], message)
job["timeStarted"] = message["status"]["runs"][message["runId"]]["started"]
return job
async def handleTaskCompleted(pushInfo, task, message):
jobRun = message["status"]["runs"][message["runId"]]
job = buildMessage(pushInfo, task, message["runId"], message)
job["timeStarted"] = jobRun["started"]
job["timeCompleted"] = jobRun["resolved"]
job["logs"] = [createLogReference(message["status"]["taskId"], jobRun["runId"])]
job = await addArtifactUploadedLinks(
message["status"]["taskId"],
message["runId"],
job)
return job
async def handleTaskException(pushInfo, task, message):
jobRun = message["status"]["runs"][message["runId"]]
# Do not report runs that were created as an exception. Such cases
# are deadline-exceeded
if jobRun["reasonCreated"] == "exception":
return
job = buildMessage(pushInfo, task, message["runId"], message)
job["timeStarted"] = jobRun["started"]
job["timeCompleted"] = jobRun["resolved"]
# exceptions generally have no logs, so in the interest of not linking to a 404'ing artifact,
# don't include a link
job["logs"] = []
job = await addArtifactUploadedLinks(
message["status"]["taskId"],
message["runId"],
job)
return job
async def fetchArtifacts(taskId, runId):
res = await asyncQueue.listArtifacts(taskId, runId)
artifacts = res["artifacts"]
continuationToken = res.get("continuationToken")
while continuationToken is not None:
continuation = {
"continuationToken": res["continuationToken"]
}
try:
res = await asyncQueue.listArtifacts(taskId, runId, continuation)
except Exception:
break
artifacts = artifacts.concat(res["artifacts"])
continuationToken = res.get("continuationToken")
return artifacts
async def addArtifactUploadedLinks(taskId, runId, job):
artifacts = []
try:
artifacts = await fetchArtifacts(taskId, runId)
except Exception:
logger.warning("Artifacts could not be found for task: %s run: %s", taskId, runId)
return job
seen = {}
links = []
for artifact in artifacts:
name = os.path.basename(artifact["name"])
if not seen.get(name):
seen[name] = [artifact["name"]]
else:
seen[name].append(artifact["name"])
name = "{name} ({length})".format(name=name, length=len(seen[name])-1)
links.append({
"label": "artifact uploaded",
"linkText": name,
"url": taskcluster_urls.api(
root_url,
"queue",
"v1",
"task/{taskId}/runs/{runId}/artifacts/{artifact_name}".format(
taskId=taskId, runId=runId, artifact_name=artifact["name"]
)),
})
job["jobInfo"]["links"] = links
return job

Просмотреть файл

@ -1,39 +0,0 @@
# Code imported from https://github.com/taskcluster/taskcluster/blob/32629c562f8d6f5a6b608a3141a8ee2e0984619f/services/treeherder/src/util/route_parser.js
# A Taskcluster routing key will be in the form:
# treeherder.<version>.<user/project>|<project>.<revision>.<pushLogId/pullRequestId>
# [0] Routing key prefix used for listening to only treeherder relevant messages
# [1] Routing key version
# [2] In the form of user/project for github repos and just project for hg.mozilla.org
# [3] Top level revision for the push
# [4] Pull Request ID (github) or Push Log ID (hg.mozilla.org) of the push
# Note: pushes on a branch on Github would not have a PR ID
# Function extracted from
# https://github.com/taskcluster/taskcluster/blob/32629c562f8d6f5a6b608a3141a8ee2e0984619f/services/treeherder/src/util/route_parser.js
def parseRoute(route):
id = None
parsedRoute = route.split('.')
parsedProject = None
pushInfo = {
"destination": parsedRoute[0],
"revision": parsedRoute[3],
}
project = parsedRoute[2]
if len(project.split('/')) == 2:
[owner, parsedProject] = project.split('/')
pushInfo.owner = owner
pushInfo.origin = 'github.com'
else:
parsedProject = project
pushInfo["origin"] = 'hg.mozilla.org'
pushInfo["project"] = parsedProject
if len(parsedRoute) == 5:
id = parsedRoute[4]
pushInfo["id"] = int(id) if id else None
return pushInfo

Просмотреть файл

@ -1,13 +1,10 @@
"""
This module contains tasks related to pulse job ingestion
"""
import asyncio
import newrelic.agent
from treeherder.etl.job_loader import JobLoader
from treeherder.etl.push_loader import PushLoader
from treeherder.etl.taskcluster_pulse.handler import handleMessage
from treeherder.workers.task import retryable_task
@ -16,16 +13,10 @@ def store_pulse_jobs(pulse_job, exchange, routing_key):
"""
Fetches the jobs pending from pulse exchanges and loads them.
"""
loop = asyncio.get_event_loop()
newrelic.agent.add_custom_parameter("exchange", exchange)
newrelic.agent.add_custom_parameter("routing_key", routing_key)
# handleMessage expects messages in this format
runs = loop.run_until_complete(handleMessage({
"exchange": exchange,
"payload": pulse_job,
}))
for run in runs:
JobLoader().process_job(run)
JobLoader().process_job(pulse_job)
@retryable_task(name='store-pulse-pushes', max_retries=10)

Просмотреть файл

@ -1,7 +1,6 @@
from .connection import pulse_conn
from .consumers import (JobConsumer,
PushConsumer,
UpdateJobFixtures,
prepare_consumer)
from .sources import (job_sources,
push_sources)
@ -9,7 +8,6 @@ from .sources import (job_sources,
__all__ = [
"JobConsumer",
"PushConsumer",
"UpdateJobFixtures",
"job_sources",
"prepare_consumer",
"pulse_conn",

Просмотреть файл

@ -105,30 +105,6 @@ class JobConsumer(PulseConsumer):
message.ack()
class UpdateJobFixtures(PulseConsumer):
processedMessages = 0
maxMessages = 5
# This is the name of your queue on Pulse Guardian
queue_suffix = "foo"
messages = []
def on_message(self, body, message):
exchange = message.delivery_info['exchange']
routing_key = message.delivery_info['routing_key']
logger.debug('received job message from %s', exchange)
# handleMessage expects messages in this format
self.messages.append({
"exchange": exchange,
"routes": routing_key,
"payload": body,
})
message.ack()
self.processedMessages += 1
self.close()
if self.processedMessages > self.maxMessages:
raise Exception('We have processed {} and need to store them'.format(self.processedMessages))
class PushConsumer(PulseConsumer):
queue_suffix = "resultsets"

Просмотреть файл

@ -18,11 +18,8 @@ env = environ.Env()
job_sources = env.list(
"PULSE_JOB_SOURCES",
default=[
"exchange/taskcluster-queue/v1/task-pending.#",
"exchange/taskcluster-queue/v1/task-running.#",
"exchange/taskcluster-queue/v1/task-completed.#",
"exchange/taskcluster-queue/v1/task-failed.#",
"exchange/taskcluster-queue/v1/task-exception.#",
"exchange/taskcluster-treeherder/v1/jobs.#",
# ... other CI systems
],
)