This commit is contained in:
Kyle Lahnakoski 2017-10-31 05:28:36 -04:00
Родитель 220ccbec2a
Коммит cf7ff47974
12 изменённых файлов: 476 добавлений и 179 удалений

32
.editorconfig Normal file
Просмотреть файл

@ -0,0 +1,32 @@
# EditorConfig helps developers define and maintain consistent
# coding styles between different editors and IDEs
# editorconfig.org
root = true
[*]
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
indent_style = space
indent_size = 4
[*.js]
indent_style = space
indent_size = 2
[*.json]
indent_style = tab
indent_size = 4
[*.css]
indent_style = space
indent_size = 4
[*.html]
indent_style = space
indent_size = 4
[*.{diff,md}]
trim_trailing_whitespace = false

Просмотреть файл

@ -87,7 +87,7 @@ class ETL(Thread):
Log.error("Can not find {{path}} to transformer (are you sure you are pointing to a function? Do you have all dependencies?)", path=t_name)
elif isinstance(w._transformer, object.__class__) and issubclass(w._transformer, Transform):
# WE EXPECT A FUNCTION. THE Transform INSTANCES ARE, AT LEAST, CALLABLE
w._transformer = w._transformer.__new__()
w._transformer = w._transformer(w.config)
w._source = get_container(w.source)
w._destination = get_container(w.destination)
kwargs.workers.append(w)

Просмотреть файл

@ -9,15 +9,13 @@
from __future__ import division
from __future__ import unicode_literals
from mo_dots import Data
from mo_logs import Log, machine_metadata
from mo_threads import Signal
from activedata_etl.transforms import EtlHeadGenerator, verify_blobber_file
from activedata_etl.transforms.pulse_block_to_es import scrub_pulse_record, transform_buildbot
from activedata_etl.transforms.unittest_logs_to_sink import process_unittest
from mo_hg.hg_mozilla_org import minimize_repo
from mo_dots import Data
from mo_logs import Log, machine_metadata
from mo_logs.strings import utf82unicode
from mo_threads import Signal
from mo_times.timer import Timer
from pyLibrary.env import http

Просмотреть файл

@ -1,111 +0,0 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import division
from __future__ import unicode_literals
from activedata_etl.imports.task import minimize_task
from activedata_etl.transforms import EtlHeadGenerator
from mo_dots import listwrap, set_default
from mo_json import json2value, stream
from mo_logs import Log, machine_metadata
from mo_times.dates import Date
from pyLibrary.env import http
from pyLibrary.env.big_data import scompressed2ibytes
def process(source_key, source, destination, resources, please_stop=None):
"""
READ pulse_block AND GET THE FILE -> COMPONENT MAPS
"""
etl_header_gen = EtlHeadGenerator(source_key)
existing_keys = destination.keys(prefix=source_key)
for e in existing_keys:
destination.delete_key(e)
file_num = 0
lines = list(source.read_lines())
output = []
for i, line in enumerate(lines):
if please_stop:
Log.error("Shutdown detected. Stopping early")
task = json2value(line)
etl = task.etl
artifacts = listwrap(task.task.artifacts)
if "public/components.json.gz" not in artifacts.name or "public/missing.json.gz" not in artifacts.name:
continue
minimize_task(task)
# REVIEW THE ARTIFACTS, LOOK FOR
for a in artifacts:
if Date(a.expires) < Date.now():
Log.note("Expired url: expires={{date}} url={{url}}", date=Date(a.expires), url=a.url)
continue # ARTIFACT IS GONE
if "components.json.gz" in a.url:
dest_key, dest_etl = etl_header_gen.next(etl, a.name)
dest_etl.machine = machine_metadata
dest_etl.url = a.url
destination.extend(
normalize_property(source_key, data, task)
for data in stream.parse(
scompressed2ibytes(http.get(a.url).raw),
{"items": "."},
{"name", "value"}
)
)
file_num += 1
output.append(dest_key)
elif "missing.json.gz" in a.url:
dest_key, dest_etl = etl_header_gen.next(etl, a.name)
dest_etl.machine = machine_metadata
dest_etl.url = a.url
destination.extend(
normalize_missing(source_key, data, task)
for data in stream.parse(
scompressed2ibytes(http.get(a.url).raw),
"missing",
{"missing"}
)
)
file_num += 1
output.append(dest_key)
return output
def normalize_property(source_key, data, task):
return set_default(
{
"file": {"name": data.name},
"bug": {
"product": data.value[0],
"component": data.value[1]
}
},
task
)
def normalize_missing(source_key, data, task):
return set_default(
{
"file": {"name": data}
},
task
)

Просмотреть файл

@ -0,0 +1,181 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import division
from __future__ import unicode_literals
from activedata_etl import etl2key
from activedata_etl.imports.task import minimize_task
from activedata_etl.transforms import EtlHeadGenerator, Transform
from activedata_etl.transforms.grcov_to_es import download_file
from jx_python import jx
from jx_python.expressions import jx_expression_to_function
from mo_dots import listwrap, wrap, Data, Null
from mo_files import TempFile
from mo_json import json2value, stream, value2json
from mo_logs import Log, machine_metadata
from mo_times import Timer
from mo_times.dates import Date
from pyLibrary.env.big_data import scompressed2ibytes
class ETL(Transform):
def __init__(self, config):
self.filter = jx_expression_to_function(config.task_filter)
def __call__(self, source_key, source, destination, resources, please_stop=None):
"""
READ pulse_block AND GET THE FILE -> COMPONENT MAPS
"""
existing_keys = destination.keys(prefix=source_key)
for e in existing_keys:
destination.delete_key(e)
file_num = 0
lines = list(source.read_lines())
output = []
for i, line in enumerate(lines):
if please_stop:
Log.error("Shutdown detected. Stopping early")
task = json2value(line)
if not self.filter(task):
continue
etl = task.etl
etl_header_gen = EtlHeadGenerator(etl2key(etl))
artifacts = listwrap(task.task.artifacts)
if "public/components.json.gz" not in artifacts.name or "public/missing.json.gz" not in artifacts.name:
continue
minimize_task(task)
repo = task.repo
repo.branch = None
repo.push = None
# REVIEW THE ARTIFACTS, LOOK FOR
for a in artifacts:
if Date(a.expires) < Date.now():
Log.note("Expired url: expires={{date}} url={{url}}", date=Date(a.expires), url=a.url)
continue # ARTIFACT IS GONE
if "components.json.gz" in a.url:
pass
dest_key, dest_etl = etl_header_gen.next(etl, a.name)
dest_etl.machine = machine_metadata
dest_etl.url = a.url
with TempFile() as tempfile:
Log.note("download {{url}}", url=a.url)
download_file(a.url, tempfile.abspath)
with open(tempfile.abspath, b"rb") as fstream:
with Timer("process {{url}}", param={"url": a.url}):
destination.write_lines(
dest_key,
(
value2json(normalize_property(source_key, data, repo, dest_etl, i, please_stop))
for i, data in enumerate(stream.parse(
scompressed2ibytes(fstream),
{"items": "."},
{"name", "value"}
))
)
)
file_num += 1
output.append(dest_key)
elif "missing.json.gz" in a.url:
dest_key, dest_etl = etl_header_gen.next(etl, a.name)
dest_etl.machine = machine_metadata
dest_etl.url = a.url
with TempFile() as tempfile:
Log.note("download {{url}}", url=a.url)
download_file(a.url, tempfile.abspath)
with open(tempfile.abspath, b"rb") as fstream:
with Timer("process {{url}}", param={"url": a.url}):
destination.write_lines(
dest_key,
(
value2json(normalize_property(source_key, Data(name=data.missing, value=Null), repo, dest_etl, i, please_stop))
for i, data in enumerate(stream.parse(
scompressed2ibytes(fstream),
"missing",
{"missing"}
))
)
)
file_num += 1
output.append(dest_key)
return output
def normalize_property(source_key, data, repo, parent_etl, i, please_stop):
if please_stop:
Log.error("Shutdown detected. Stopping early")
etl = {
"id": i,
"source": parent_etl,
"type": "join"
}
repo.changeset.description = None
repo.branch = None
value = {
"_id": etl2key(wrap(etl)),
"file": {
"full_name": data.name,
"name": data.name.split('/')[-1],
"type": extension(data.name),
"path": path(data.name)
},
"bug": {
"product": data.value[0],
"component": data.value[1]
},
"repo": repo,
"etl": etl
}
return value
# def normalize_missing(source_key, data, repo, parent_etl, i):
# etl = {
# "id": i,
# "source": parent_etl,
# "type": "join"
# }
# value = {
# "_id": etl2key(wrap(etl)),
# "file": {"name": data.missing},
# "revision": repo.changeset.id,
# "etl": etl
# }
#
# return value
def path(name):
return ['/'.join(p) for p in jx.prefixes(name.split('/'))]
def extension(filename):
parts = filename.split('/')[-1].split('.')
if len(parts) == 1:
return None
else:
return parts[-1]

Просмотреть файл

@ -1040,6 +1040,15 @@ def intervals(_min, _max=None, size=1):
return output
def prefixes(vals):
"""
:param vals: iterable
:return: vals[:1], vals[:1], ... , vals[:n]
"""
for i in range(len(vals)):
yield vals[:i + 1]
def accumulate(vals):
"""
RETURN PAIRS IN FORM (sum(vals[0:i-1]), vals[i])

Просмотреть файл

@ -15,11 +15,12 @@ import json
from collections import Mapping
from types import GeneratorType
from mo_dots import split_field, startswith_field, relative_field, Data, join_field, Null, wrap
from mo_logs import Log
from mo_dots import split_field, startswith_field, relative_field, concat_field, Data, join_field, Null, wrap
DEBUG = True
MIN_READ_SIZE = 8*1024
DEBUG = False
WHITESPACE = b" \n\r\t"
CLOSE = {
b"{": b"}",
@ -42,13 +43,15 @@ def parse(json, query_path, expected_vars=NO_VARS):
LARGE MANY-PROPERTY OBJECTS CAN BE HANDLED BY `items()`
:param json: SOME STRING-LIKE STRUCTURE THAT CAN ASSUME WE LOOK AT ONE
CHARACTER AT A TIME, IN ORDER
:param query_path: AN ARRAY OF DOT-SEPARATED STRINGS INDICATING THE
NESTED ARRAY BEING ITERATED.
:param json: SOME STRING-LIKE STRUCTURE THAT CAN ASSUME WE LOOK AT
ONE CHARACTER AT A TIME, IN ORDER
:param query_path: A DOT-SEPARATED STRING INDICATING THE PATH TO THE
NESTED ARRAY OPTIONALLY, {"items":query_path} TO
FURTHER ITERATE OVER PROPERTIES OF OBJECTS FOUND AT
query_path
:param expected_vars: REQUIRED PROPERTY NAMES, USED TO DETERMINE IF
MORE-THAN-ONE PASS IS REQUIRED
:return: RETURNS AN ITERATOR OVER ALL OBJECTS FROM NESTED path IN LEAF FORM
:return: RETURNS AN ITERATOR OVER ALL OBJECTS FROM ARRAY LOCATED AT query_path
"""
if hasattr(json, "read"):
# ASSUME IT IS A STREAM
@ -161,6 +164,7 @@ def parse(json, query_path, expected_vars=NO_VARS):
ITERATE THROUGH THE PROPERTIES OF AN OBJECT
"""
c, index = skip_whitespace(index)
num_items = 0
while True:
if c == b',':
c, index = skip_whitespace(index)
@ -177,9 +181,12 @@ def parse(json, query_path, expected_vars=NO_VARS):
c, index = skip_whitespace(index)
child_expected = needed("value", expected_vars)
_assign_token(index, c, child_expected)
index = _assign_token(index, c, child_expected)
c, index = skip_whitespace(index)
if DEBUG and not num_items % 1000:
Log.note("{{num}} items iterated", num=num_items)
yield index
num_items += 1
elif c == "}":
break

Просмотреть файл

@ -0,0 +1,63 @@
{
"settings": {
"index.number_of_replicas": 1,
"index.number_of_shards": 20
},
"mappings": {
"files": {
"_source": {
"compress": true
},
"dynamic_templates": [
{
"default_ids": {
"mapping": {
"index": "not_analyzed",
"type": "string",
"doc_values": true
},
"match": "id"
}
},
{
"default_strings": {
"mapping": {
"index": "not_analyzed",
"type": "string",
"doc_values": true
},
"match_mapping_type": "string",
"match": "*"
}
},
{
"default_doubles": {
"mapping": {
"index": "not_analyzed",
"type": "double",
"doc_values": true
},
"match_mapping_type": "double",
"match": "*"
}
},
{
"default_longs": {
"mapping": {
"index": "not_analyzed",
"type": "long",
"doc_values": true
},
"match_mapping_type": "long|integer",
"match_pattern": "regex",
"path_match": ".*"
}
}
],
"_all": {
"enabled": false
}
}
}
}

Просмотреть файл

@ -46,6 +46,32 @@
}
},
"workers": [
{
"name": "firefox files",
"source": {
"bucket": "active-data-task-cluster-normalized",
"key_format": "t.a:b",
"$ref": "file://~/private.json#aws_credentials"
},
"destination": {
"bucket": "active-data-firefox-files",
"public": true,
"key_format": "t.a:b.c.d",
"$ref": "file://~/private.json#aws_credentials"
},
"notify": [
{
"name": "active-data-index-dev",
"$ref": "file://~/private.json#aws_credentials"
}
],
"transformer": "activedata_etl.transforms.task_cluster_to_firefox_files.process",
"type": "join",
"config": {
"task_filter": {"repo.branch.name": "mozilla-central"}
}
}
// {
// "name": "fx_test to normalized",
// "source": {
@ -82,8 +108,7 @@
// "index": "pulse",
// "type": "pulse_log",
// "timeout": 300,
// "consistency": "one",
// // WE ONLY NEED ONE SHARD TO BE AVAILABLE
// "consistency": "one", // WE ONLY NEED ONE SHARD TO BE AVAILABLE
// "schema": {
// "$ref": "//../../schema/pulse_logs.json"
// },
@ -246,27 +271,27 @@
// "transformer": "activedata_etl.transforms.pulse_block_to_test_result_logs.process",
// "type": "join"
// },
{
"name": "tc_normalized to codecoverage",
"source": {
"$ref": "file://~/private.json#aws_credentials",
"bucket": "active-data-task-cluster-normalized",
"key_format": "t.a:b"
},
"destination": {
"$ref": "file://~/private.json#aws_credentials",
"bucket": "active-data-codecoverage-dev",
"public": true,
"key_format": "t.a:b.c.d"
},
"notify": {
"name": "active-data-index",
"$ref": "file://~/private.json#aws_credentials"
},
"transform_type": "bulk",
"transformer": "activedata_etl.transforms.cov_to_es.process",
"type": "join"
}
// {
// "name": "tc_normalized to codecoverage",
// "source": {
// "$ref": "file://~/private.json#aws_credentials",
// "bucket": "active-data-task-cluster-normalized",
// "key_format": "t.a:b"
// },
// "destination": {
// "$ref": "file://~/private.json#aws_credentials",
// "bucket": "active-data-codecoverage-dev",
// "public": true,
// "key_format": "t.a:b.c.d"
// },
// "notify": {
// "name": "active-data-index",
// "$ref": "file://~/private.json#aws_credentials"
// },
// "transform_type": "bulk",
// "transformer": "activedata_etl.transforms.cov_to_es.process",
// "type": "join"
// }
],
"debug": {
"trace": true,

Просмотреть файл

@ -6,7 +6,41 @@
"$ref": "file://~/private.json#aws_credentials"
},
"workers":[
// {
{
"name": "firefox-files",
"batch_size": 1000,
"skip": 0,
"rollover": {
"field": "etl.source.source.timestamp",
"interval": "year",
"max": "year"
},
"sample_only": [],
"elasticsearch": {
"host": "http://localhost",
"port": 9200,
"index": "firefox-files",
"type": "files",
"timeout": 300,
"consistency": "one",
"schema": {
"$ref": "//../../schema/files.json"
},
"debug": true,
"limit_replicas": false
},
"source": {
"$ref": "file://~/private.json#aws_credentials",
"bucket": "active-data-firefox-files",
"key_format": "t.a:b.c.d"
},
"backfill": {
"prime_key": "etl.source.source.source.id",
"source_key": "etl.source.source.source.source.source.code"
}
}
// {
// "name": "fx_test",
// "batch_size": 1000,
// "skip": 0,
@ -153,36 +187,36 @@
// "key_format": "a.b"
// }
// },
{
"name":"code coverage",
"skip": 0.0,
"queue_size": 200,
"batch_size": 100,
"rollover": {
"field": "repo.push.date",
"interval": "week",
"max": "2week"
},
"sample_only": [],
"elasticsearch": {
"host": "http://localhost",
"port": 9200,
"index": "coverage",
"type": "code_coverage",
"consistency": "one", // WE ONLY NEED ONE SHARD TO BE AVAILABLE
"timeout": 300,
"schema": {
"$ref": "//../../schema/code_coverage.json"
},
"debug": false,
"limit_replicas": false
},
"source": {
"bucket": "active-data-codecoverage",
"$ref": "file://~/private.json#aws_credentials",
"key_format": "t.a:b.c.d"
}
},
// {
// "name":"code coverage",
// "skip": 0.0,
// "queue_size": 200,
// "batch_size": 100,
// "rollover": {
// "field": "repo.push.date",
// "interval": "week",
// "max": "2week"
// },
// "sample_only": [],
// "elasticsearch": {
// "host": "http://localhost",
// "port": 9200,
// "index": "coverage",
// "type": "code_coverage",
// "consistency": "one", // WE ONLY NEED ONE SHARD TO BE AVAILABLE
// "timeout": 300,
// "schema": {
// "$ref": "//../../schema/code_coverage.json"
// },
// "debug": false,
// "limit_replicas": false
// },
// "source": {
// "bucket": "active-data-codecoverage",
// "$ref": "file://~/private.json#aws_credentials",
// "key_format": "t.a:b.c.d"
// }
// },
// {
// "name": "task",
// "queue_size": 2000,

Просмотреть файл

@ -47,6 +47,32 @@
}
},
"workers": [
{
"name": "firefox files",
"source": {
"bucket": "active-data-task-cluster-normalized",
"key_format": "t.a:b",
"$ref": "file://~/private.json#aws_credentials"
},
"destination": {
"bucket": "active-data-firefox-files",
"public": true,
"key_format": "t.a:b.c.d",
"$ref": "file://~/private.json#aws_credentials"
},
"notify": [
{
"name": "active-data-index",
"$ref": "file://~/private.json#aws_credentials"
}
],
"type": "join",
"transformer": "activedata_etl.transforms.task_cluster_to_firefox_files.ETL",
"config": {
"task_filter": {"eq":{"repo.branch.name": "mozilla-central"}}
}
},
{
"name": "fx_test to normalized",
"source": {

Просмотреть файл

@ -5,6 +5,39 @@
"$ref": "file://~/private.json#aws_credentials"
},
"workers":[
{
"name": "firefox-files",
"batch_size": 1000,
"skip": 0,
"rollover":{
"field": "etl.source.source.timestamp",
"interval": "year",
"max": "year"
},
"sample_only": [],
"elasticsearch": {
"host": "http://activedata.allizom.org",
"port": 9200,
"index": "firefox-files",
"type": "files",
"timeout": 300,
"consistency": "one",
"schema": {
"$ref": "//../../schema/files.json"
},
"debug": true,
"limit_replicas": false
},
"source": {
"$ref": "file://~/private.json#aws_credentials",
"bucket": "active-data-firefox-files",
"key_format": "t.a:b.c.d"
},
"backfill":{
"prime_key": "etl.source.source.source.id",
"source_key": "etl.source.source.source.source.source.code"
}
},
{
"name": "fx_test",
"batch_size": 1000,