diff --git a/bzETL/alias_analysis.py b/bzETL/alias_analysis.py index 543a487..d9b10ac 100644 --- a/bzETL/alias_analysis.py +++ b/bzETL/alias_analysis.py @@ -16,14 +16,22 @@ from bzETL.extract_bugzilla import get_all_cc_changes from jx_python import jx from mo_collections.multiset import Multiset from mo_dots import coalesce +from mo_files import File from mo_future import iteritems -from mo_json import value2json +from mo_json import value2json, json2value +from mo_kwargs import override from mo_logs import Log, startup, constants -from pyLibrary.env import elasticsearch +from mo_testing.fuzzytestcase import assertAlmostEqual +from pyLibrary.convert import zip2bytes, bytes2zip +from pyLibrary.env.elasticsearch import Cluster from pyLibrary.sql.mysql import MySQL +DEBUG = True +MINIMUM_DIFF_ROUGH = 7 +MINIMUM_DIFF_FINE = 4 -def full_analysis(settings, bug_list=None, please_stop=None): + +def full_analysis(kwargs, bug_list=None, please_stop=None): """ THE CC LISTS (AND REVIEWS) ARE EMAIL ADDRESSES THE BELONG TO PEOPLE. SINCE THE EMAIL ADDRESS FOR A PERSON CAN CHANGE OVER TIME. THIS CODE @@ -31,30 +39,26 @@ def full_analysis(settings, bug_list=None, please_stop=None): OVER THE LIFETIME OF THE BUGZILLA DATA. 'PERSON' IS ABSTRACT, AND SIMPLY ASSIGNED A CANONICAL EMAIL ADDRESS TO FACILITATE IDENTIFICATION """ - if settings.args.quick: + if kwargs.args.quick: Log.note("Alias analysis skipped (--quick was used)") return - analyzer = AliasAnalyzer(settings.alias) + analyzer = AliasAnalyzer(kwargs.alias) if bug_list: - with MySQL(kwargs=settings.bugzilla, readonly=True) as db: + with MySQL(kwargs=kwargs.bugzilla, readonly=True) as db: data = get_all_cc_changes(db, bug_list) analyzer.aggregator(data) analyzer.analysis(True, please_stop) return - with MySQL(kwargs=settings.bugzilla, readonly=True) as db: - start = coalesce(settings.alias.start, 0) - end = coalesce(settings.alias.end, db.query("SELECT max(bug_id)+1 bug_id FROM bugs")[0].bug_id) + with MySQL(kwargs=kwargs.bugzilla, readonly=True) as db: + start = coalesce(kwargs.alias.start, 0) + end = coalesce(kwargs.alias.end, db.query("SELECT max(bug_id)+1 bug_id FROM bugs")[0].bug_id) #Perform analysis on blocks of bugs, in case we crash partway through - for s, e in jx.reverse(jx.intervals(start, end, settings.alias.increment)): - Log.note( - "Load range {{start}}-{{end}}", - start=s, - end=e - ) + for s, e in jx.intervals(start, end, kwargs.alias.increment): + Log.note("Load range {{start}}-{{end}}", start=s, end=e) if please_stop: break data = get_all_cc_changes(db, range(s, e)) @@ -64,43 +68,24 @@ def full_analysis(settings, bug_list=None, please_stop=None): class AliasAnalyzer(object): - def __init__(self, settings): - self.bugs={} - self.aliases={} - self.not_aliases={} # EXPLICIT LIST OF NON-MATCHES (HUMAN ADDED) - try: - self.es = elasticsearch.Cluster(settings.elasticsearch).get_or_create_index( - kwargs=settings.elasticsearch, - schema=ALIAS_SCHEMA, - limit_replicas=True - ) - self.es.add_alias(settings.elasticsearch.index) + @override + def __init__( + self, + elasticsearch=None, # ES INDEX TO STORE THE ALIASES + file=None, # FILE TO STORE ALIASES (IF ES DOES NOT EXIST, OR IS EMPTY) + start=0, # MINIMUM BUG NUMBER TO SCAN + increment=100000, # NUMBER OF BUGS TO REVIEW IN ONE PASS + minimum_diff=MINIMUM_DIFF_ROUGH, # AMOUNT OF DISPARITY BETWEEN BEST AND SECOND-BEST MATCH + kwargs=None + ): + self.bugs = {} + self.aliases = {} + self.not_aliases = {} # EXPLICIT LIST OF NON-MATCHES (HUMAN ADDED) + self.kwargs = kwargs + self.es = None - self.esq = jx_elasticsearch.new_instance(self.es.settings) - result = self.esq.query({ - "from": "bug_aliases", - "select": ["canonical", "alias"], - "where": {"missing": "ignore"}, - "format": "list", - "limit": 50000 - }) - for r in result.data: - self.aliases[r.alias] = {"canonical": r.canonical, "dirty": False} + self.load_aliases() - Log.note("{{num}} aliases loaded", num=len(self.aliases.keys())) - - # LOAD THE NON-MATCHES - result = self.esq.query({ - "from": "bug_aliases", - "select": ["canonical", "alias"], - "where": {"exists": "ignore"}, - "format": "list" - }) - for r in result.data: - self.not_aliases[r.alias] = r.canonical - - except Exception as e: - Log.error("Can not init aliases", cause=e) def aggregator(self, data): """ @@ -118,11 +103,13 @@ class AliasAnalyzer(object): self.bugs[d.bug_id] = agg def analysis(self, last_run, please_stop): - minimum_diff = 7 + minimum_diff = self.kwargs.minimum_diff if last_run: - minimum_diff = 4 #ONCE WE HAVE ALL THE DATA IN WE CAN BE LESS DISCRIMINATING + minimum_diff = min(minimum_diff, MINIMUM_DIFF_FINE) #ONCE WE HAVE ALL THE DATA IN WE CAN BE LESS DISCRIMINATING try_again = True + Log.note("running analysis with minimum_diff=={{minimum_diff}}", minimum_diff=minimum_diff) + while try_again and not please_stop: #FIND EMAIL MOST NEEDING REPLACEMENT problem_agg = Multiset(allow_negative=True) @@ -171,20 +158,9 @@ class AliasAnalyzer(object): self.save_aliases() def alias(self, email): - canonical = self.aliases.get(email, None) + canonical = self.aliases.get(email) if not canonical: - canonical = self.esq.query({ - "from":"bug_aliases", - "select":"canonical", - "where":{"term":{"alias":email}} - }) - if not canonical: - canonical = {"canonical":email, "dirty":False} - else: - canonical = {"canonical":canonical[0], "dirty":False} - - self.aliases[email] = canonical - + return {"canonical":email, "dirty":False} return canonical def add_alias(self, lost, found): @@ -221,9 +197,7 @@ class AliasAnalyzer(object): reassign=[] for k, v in self.aliases.items(): if v["canonical"] == old_email: - if k == v["canonical"]: - Log.note("ALIAS FOUND : {{alias}} -> {{new}}", alias=k, new=found) - else: + if k != v["canonical"]: Log.note("ALIAS REMAPPED: {{alias}} -> {{old}} -> {{new}}", alias=k, old=v["canonical"], new=found) reassign.append((k, found)) @@ -231,15 +205,83 @@ class AliasAnalyzer(object): for k, found in reassign: self.aliases[k] = {"canonical":found, "dirty":True} - def save_aliases(self): - records = [] - for k, v in self.aliases.items(): - if v["dirty"]: - records.append({"id": k, "value": {"canonical": v["canonical"], "alias": k}}) + def load_aliases(self): + try: + if self.kwargs.elasticsearch: + self.es = Cluster(self.kwargs.elasticsearch).get_or_create_index( + kwargs=self.kwargs.elasticsearch, + schema=ALIAS_SCHEMA, + limit_replicas=True + ) + self.es.add_alias(self.kwargs.elasticsearch.index) + + esq = jx_elasticsearch.new_instance(self.es.kwargs) + result = esq.query({ + "from": "bug_aliases", + "select": ["canonical", "alias"], + "where": {"missing": "ignore"}, + "format": "list", + "limit": 50000 + }) + for r in result.data: + self.aliases[r.alias] = {"canonical": r.canonical, "dirty": False} + + num = len(self.aliases.keys()) + if num<500: + # LOAD FROM FILE IF THE CLUSTER IS A BIT EMPTY + self._load_aliases_from_file() + return + + Log.note("{{num}} aliases loaded from ES", num=num) + + # LOAD THE NON-MATCHES + result = esq.query({ + "from": "bug_aliases", + "select": ["canonical", "alias"], + "where": {"exists": "ignore"}, + "format": "list" + }) + for r in result.data: + self.not_aliases[r.alias] = r.canonical + else: + self._load_aliases_from_file() + except Exception as e: + Log.warning("Can not load aliases", cause=e) + + def _load_aliases_from_file(self): + if self.kwargs.file: + data = json2value(zip2bytes(File(self.kwargs.file).read_bytes()).decode('utf8'), flexible=False, leaves=False) + self.aliases = {a: {"canonical": c, "dirty": True} for a, c in data.aliases.items()} + self.not_aliases = data.not_aliases + Log.note("{{num}} aliases loaded from file", num=len(self.aliases.keys())) + + def save_aliases(self): + if self.es: + records = [] + for k, v in self.aliases.items(): + if v["dirty"]: + records.append({"id": k, "value": {"canonical": v["canonical"], "alias": k}}) + + if records: + Log.note("Net new aliases saved: {{num}}", num=len(records)) + self.es.extend(records) + elif self.kwargs.file: + def compact(): + return { + "aliases": {a: c['canonical'] for a, c in self.aliases.items() if c['canonical'] != a}, + "mot_aliases": self.not_aliases + } + + data = compact() + File(self.kwargs.file).write_bytes(bytes2zip(value2json(data, pretty=True).encode('utf8'))) + if DEBUG: + Log.note("verify alias file") + self.load_aliases() + from_file = compact() + assertAlmostEqual(from_file, data) + assertAlmostEqual(data, from_file) + - if records: - Log.note("Net new aliases saved: {{num}}", num=len(records)) - self.es.extend(records) def mapper(emails, aliases): @@ -264,10 +306,10 @@ def split_email(value): def start(): try: - settings = startup.read_settings() - constants.set(settings.constants) - Log.start(settings.debug) - full_analysis(settings) + kwargs = startup.read_settings() + constants.set(kwargs.constants) + Log.start(kwargs.debug) + full_analysis(kwargs) except Exception as e: Log.error("Can not start", e) finally: @@ -275,7 +317,7 @@ def start(): ALIAS_SCHEMA = { - "settings": {"index": { + "kwargs": {"index": { "number_of_shards": 3, "number_of_replicas": 0 }}, diff --git a/bzETL/bz_etl.py b/bzETL/bz_etl.py index da77a58..4054dbf 100644 --- a/bzETL/bz_etl.py +++ b/bzETL/bz_etl.py @@ -115,6 +115,7 @@ def etl(db, output_queue, param, alias_config, please_stop): for i, s in enumerate(sorted): process.processRow(s) process.processRow(wrap({"bug_id": parse_bug_history.STOP_BUG, "_merge_order": 1})) + process.alias_analyzer.save_aliases() def run_both_etl(db, output_queue, esq_comments, param, alias_config): diff --git a/bzETL/extract_bugzilla.py b/bzETL/extract_bugzilla.py index 9587181..8486b77 100644 --- a/bzETL/extract_bugzilla.py +++ b/bzETL/extract_bugzilla.py @@ -11,20 +11,18 @@ from __future__ import absolute_import from __future__ import division from __future__ import unicode_literals -from bzETL.parse_bug_history import MAX_TIME -from bzETL.transform_bugzilla import NUMERIC_FIELDS from jx_python import jx from mo_dots.datas import Data from mo_logs import Log from mo_times.timer import Timer from pyLibrary import convert from pyLibrary.queries.jx_usingMySQL import esfilter2sqlwhere -from pyLibrary.sql import SQL, SQL_ONE, SQL_NEG_ONE, sql_list, sql_iso - -# USING THE TEXT DATETIME OF EPOCH THROWS A WARNING! USE ONE SECOND PAST EPOCH AS MINIMUM TIME. +from pyLibrary.sql import SQL from pyLibrary.sql.mysql import int_list_packer +# USING THE TEXT DATETIME OF EPOCH THROWS A WARNING! USE ONE SECOND PAST EPOCH AS MINIMUM TIME. MIN_TIMESTAMP = 1000 # MILLISECONDS SINCE EPOCH +MAX_TIMESTAMP = 9999999999000 #ALL BUGS IN PRIVATE ETL HAVE SCREENED FIELDS SCREENED_FIELDDEFS = [ @@ -239,7 +237,7 @@ def get_bugs(db, param): else: return db.quote_column(col.column_name) - param.bugs_columns = jx.select(bugs_columns, "column_name") + param.bugs_columns = bugs_columns.column_name param.bugs_columns_SQL = SQL(",\n".join(lower(c) for c in bugs_columns)) param.bug_filter = esfilter2sqlwhere(db, {"terms": {"b.bug_id": param.bug_list}}) param.screened_whiteboard = esfilter2sqlwhere(db, {"and": [ @@ -449,7 +447,7 @@ def get_all_cc_changes(db, bug_list): {{bug_filter}} """, { - "max_time": MAX_TIME, + "max_time": MAX_TIMESTAMP, "cc_field_id": CC_FIELD_ID, "bug_filter": esfilter2sqlwhere(db, int_list_packer("bug_id", bug_list)) }, @@ -590,7 +588,7 @@ def get_new_activities(db, param): a.id, a.bug_id, UNIX_TIMESTAMP(bug_when)*1000 AS modified_ts, - lower(login_name) AS modified_by, + lower(p.login_name) AS modified_by, replace(field.`name`, '.', '_') AS field_name, CAST( CASE @@ -598,6 +596,7 @@ def get_new_activities(db, param): WHEN m.bug_id IS NOT NULL AND a.fieldid={{whiteboard_field}} AND added IS NOT NULL AND trim(added)<>'' THEN '[screened]' WHEN a.fieldid IN {{mixed_case_fields}} THEN trim(added) WHEN trim(added)='' THEN NULL + WHEN qa_contact.userid IS NOT NULL THEN qa_contact.login_name ELSE lower(trim(added)) END AS CHAR CHARACTER SET utf8) AS new_value, @@ -620,6 +619,8 @@ def get_new_activities(db, param): fielddefs field ON a.fieldid = field.`id` LEFT JOIN bug_group_map m on m.bug_id=a.bug_id AND {{screened_whiteboard}} + LEFT JOIN + profiles qa_contact ON qa_contact.userid=CAST(a.added AS UNSIGNED) and a.fieldid=36 AND a.added REGEXP '^[0-9]+$' WHERE {{bug_filter}} # NEED TO QUERY ES TO GET bug_version_num OTHERWISE WE NEED ALL HISTORY diff --git a/bzETL/parse_bug_history.py b/bzETL/parse_bug_history.py index b24e0ca..f2c9619 100644 --- a/bzETL/parse_bug_history.py +++ b/bzETL/parse_bug_history.py @@ -44,16 +44,16 @@ import math import re import jx_elasticsearch -import jx_python -from mo_future import text_type - -from bzETL.transform_bugzilla import normalize, NUMERIC_FIELDS, MULTI_FIELDS, DIFF_FIELDS +from bzETL.alias_analysis import AliasAnalyzer +from bzETL.extract_bugzilla import MAX_TIMESTAMP +from bzETL.transform_bugzilla import normalize, NUMERIC_FIELDS, MULTI_FIELDS, DIFF_FIELDS, NULL_VALUES from jx_python import jx, meta from mo_dots import inverse, coalesce, wrap, unwrap from mo_dots.datas import Data from mo_dots.lists import FlatList from mo_dots.nones import Null from mo_files import File +from mo_future import text_type from mo_json import json2value, value2json from mo_logs import Log, strings from mo_logs.strings import apply_diff @@ -66,9 +66,9 @@ from pyLibrary import convert FLAG_PATTERN = re.compile("^(.*)([?+-])(\\([^)]*\\))?$") -DEBUG_CHANGES = False # SHOW ACTIVITY RECORDS BEING PROCESSED +DEBUG_CHANGES = True # SHOW ACTIVITY RECORDS BEING PROCESSED DEBUG_STATUS = False # SHOW CURRENT STATE OF PROCESSING -DEBUG_CC_CHANGES = True # SHOW MISMATCHED CC CHANGES +DEBUG_CC_CHANGES = False # SHOW MISMATCHED CC CHANGES DEBUG_FLAG_MATCHES = False USE_PREVIOUS_VALUE_OBJECTS = False @@ -79,8 +79,15 @@ KNOWN_MISSING_KEYWORDS = { "dogfood", "beta1", "nsbeta1", "nsbeta2", "nsbeta3", "patch", "mozilla1.0", "correctness", "mozilla0.9", "mozilla0.9.9+", "nscatfood", "mozilla0.9.3", "fcc508", "nsbeta1+", "mostfreq" } +KNOWN_INCONSISTENT_FIELDS = { + "cf_last_resolved" # CHANGES IN DATABASE TIMEZONE +} +FIELDS_CHANGED = { # SOME FIELD VALUES ARE CHANGED WITHOUT HISTORY BEING CHANGED TOO https://bugzilla.mozilla.org/show_bug.cgi?id=997228 + "cf_blocking_b2g":{"1.5":"2.0"} +} +EMAIL_FIELDS = {'cc', 'assigned_to', 'modified_by', 'created_by', 'qa_contact', 'bug_mentor'} + STOP_BUG = 999999999 # AN UNFORTUNATE SIDE EFFECT OF DATAFLOW PROGRAMMING (http://en.wikipedia.org/wiki/Dataflow_programming) -MAX_TIME = 9999999999000 @@ -91,11 +98,7 @@ class BugHistoryParser(object): self.prev_row = Null self.settings = settings self.output = output_queue - - self.alias_config=alias_config - self.aliases = Null - self.initialize_aliases() - + self.alias_analyzer = AliasAnalyzer(alias_config) def processRow(self, row_in): if not row_in: @@ -236,7 +239,6 @@ class BugHistoryParser(object): cause=e ) - def processAttachmentsTableItem(self, row_in): currActivityID = BugHistoryParser.uid(self.currBugID, row_in.modified_ts) if currActivityID != self.prevActivityID: @@ -365,28 +367,56 @@ class BugHistoryParser(object): total = self.removeValues(total, multi_field_new_value, "added", row_in.field_name, "currBugState", self.currBugState) total = self.addValues(total, multi_field_old_value, "removed bug", row_in.field_name, self.currBugState) self.currBugState[row_in.field_name] = total + elif row_in.field_name in DIFF_FIELDS: + diff = row_in.new_value + expected_value = self.currBugState[row_in.field_name] + try: + old_value = ApplyDiff(row_in.modified_ts, expected_value, diff, reverse=True) + self.currBugState[row_in.field_name] = old_value + self.currActivity.changes.append({ + "field_name": row_in.field_name, + "new_value": expected_value, + "old_value": old_value, + "attach_id": row_in.attach_id + }) + except Exception as e: + Log.warning( + "[Bug {{bug_id}}]: PROBLEM Unable to process {{field_name}} diff:\n{{diff|indent}}", + bug_id=self.currBugID, + field_name=row_in.field_name, + diff=diff, + cause=e + ) else: - if row_in.field_name in DIFF_FIELDS: - diff = row_in.new_value - try: - new_value = self.currBugState[row_in.field_name] - row_in.new_value = new_value - row_in.old_value = ApplyDiff(row_in.modified_ts, new_value, diff, reverse=True) - except Exception as e: - Log.warning( - "[Bug {{bug_id}}]: PROBLEM Unable to process {{field_name}} diff:\n{{diff|indent}}", + expected_value = self.canonical(self.currBugState[row_in.field_name]) + new_value = self.canonical(row_in.field_name, row_in.new_value) + + if text_type(new_value) != text_type(expected_value): + if DEBUG_CHANGES and row_in.field_name not in KNOWN_INCONSISTENT_FIELDS: + if row_in.field_name=='cc': + self.alias_analyzer.add_alias(expected_value, new_value) + else: + lookup = FIELDS_CHANGED.setdefault(row_in.field_name, {}) + if expected_value: + lookup[new_value] = expected_value + File("expected_values.json").write(value2json(FIELDS_CHANGED, pretty=True)) + + Log.note( + "[Bug {{bug_id}}]: PROBLEM inconsistent change: {{field}} was {{expecting|quote}} got {{observed|quote}}", bug_id=self.currBugID, - field_name=row_in.field_name, - diff=diff, - cause=e + field=row_in.field_name, + expecting=expected_value, + observed=new_value ) - self.currBugState[row_in.field_name] = row_in.old_value + + # WE DO NOT ATTEMPT TO CHANGE THE VALUES IN HISTORY TO BE CONSISTENT WITH THE FUTURE self.currActivity.changes.append({ "field_name": row_in.field_name, - "new_value": row_in.new_value, + "new_value": self.currBugState[row_in.field_name], "old_value": row_in.old_value, "attach_id": row_in.attach_id }) + self.currBugState[row_in.field_name] = row_in.old_value def populateIntermediateVersionObjects(self): # Make sure the self.bugVersions are in descending order by modification time. @@ -445,7 +475,7 @@ class BugHistoryParser(object): mergeBugVersion = True # Link this version to the next one (if there is a next one) - self.currBugState.expires_on = coalesce(nextVersion.modified_ts, MAX_TIME) + self.currBugState.expires_on = coalesce(nextVersion.modified_ts, MAX_TIMESTAMP) # Copy all attributes from the current version into self.currBugState for propName, propValue in currVersion.items(): @@ -477,8 +507,6 @@ class BugHistoryParser(object): changes[c] = Null continue - if DEBUG_CHANGES: - Log.note("Processing change: " + value2json(change)) target = self.currBugState targetName = "currBugState" attach_id = change.attach_id @@ -552,8 +580,8 @@ class BugHistoryParser(object): deformat(f.request_type) == deformat(flag.request_type) and f.request_status == flag.request_status and ( - (f.request_status!='?' and self.alias(f.modified_by) == self.alias(flag.modified_by)) or - (f.request_status=='?' and self.alias(f.requestee) == self.alias(flag.requestee)) + (f.request_status!='?' and self.email_alias(f.modified_by) == self.email_alias(flag.modified_by)) or + (f.request_status=='?' and self.email_alias(f.requestee) == self.email_alias(flag.requestee)) ) ): return f @@ -649,7 +677,7 @@ class BugHistoryParser(object): matched_req = [ element for element in candidates - if self.alias(added_flag["modified_by"]) == self.alias(element["requestee"]) + if self.email_alias(added_flag["modified_by"]) == self.email_alias(element["requestee"]) ] if not matched_ts and not matched_req: @@ -679,7 +707,7 @@ class BugHistoryParser(object): matched_both = [ element for element in candidates - if added_flag.modified_ts == element.modified_ts and self.alias(added_flag["modified_by"]) == self.alias(element["requestee"]) + if added_flag.modified_ts == element.modified_ts and self.email_alias(added_flag["modified_by"]) == self.email_alias(element["requestee"]) ] if matched_both: @@ -778,9 +806,9 @@ class BugHistoryParser(object): if field_name == "flags": Log.error("use processFlags") elif field_name == "cc": - # MAP CANONICAL TO EXISTING (BETWEEN map_* AND self.aliases WE HAVE A BIJECTION) - map_total = inverse({t: self.alias(t) for t in total}) - map_remove = inverse({r: self.alias(r) for r in remove}) + # MAP CANONICAL TO EXISTING (BETWEEN map_* AND self.email_aliases WE HAVE A BIJECTION) + map_total = inverse({t: self.email_alias(t) for t in total}) + map_remove = inverse({r: self.email_alias(r) for r in remove}) # CANONICAL VALUES c_total = set(map_total.keys()) c_remove = set(map_remove.keys()) @@ -797,7 +825,7 @@ class BugHistoryParser(object): "field_name": field_name, "missing": jx.sort(jx.map2set(diff, map_remove)), "existing": jx.sort(total), - "candidates": {d: self.aliases.get(d, None) for d in diff}, + "candidates": {d: self.email_aliases.get(d, None) for d in diff}, "bug_id": self.currBugID }) @@ -893,7 +921,7 @@ class BugHistoryParser(object): "existing": total }) if field_name == "keywords": - KNOWN_MISSING_KEYWORDS.extend(diff) + KNOWN_MISSING_KEYWORDS.update(diff) return output @@ -953,25 +981,17 @@ class BugHistoryParser(object): return total - def alias(self, name): + def canonical(self, field, value): + if value in NULL_VALUES: + return None + if field in EMAIL_FIELDS: + return self.email_alias(value) + return FIELDS_CHANGED.get(field, {}).get(value, value) + + def email_alias(self, name): if name == None: return Null - return coalesce(self.aliases.get(name, Null), name) - - def initialize_aliases(self): - try: - if self.alias_config.elasticsearch: - esq = jx_elasticsearch.new_instance(self.alias_config.elasticsearch) - result = esq.query({"select": ["alias", "canonical"], "where": {"missing": "ignore"}, "limit": 10000, "format":"list"}) - self.aliases = {d.alias:d.canonical for d in result.data} - else: - alias_json = File(self.alias_config.file).read() - self.aliases = {k: wrap(v) for k, v in json2value(alias_json).items()} - except Exception as e: - Log.warning("Could not load alias file", cause=e) - self.aliases = {} - - Log.note("{{num}} aliases loaded", num=len(self.aliases.keys())) + return self.alias_analyzer.aliases.get(name, name) def parse_flag(flag, modified_ts, modified_by): diff --git a/bzETL/transform_bugzilla.py b/bzETL/transform_bugzilla.py index 9d24310..20377ab 100644 --- a/bzETL/transform_bugzilla.py +++ b/bzETL/transform_bugzilla.py @@ -19,6 +19,7 @@ from mo_dots import listwrap from mo_future import text_type, long from mo_json import json2value, value2json from mo_logs import Log +from mo_times import Date from pyLibrary import convert from pyLibrary.env import elasticsearch @@ -39,7 +40,7 @@ NUMERIC_FIELDS=[ "remaining_time" ] -NULL_VALUES = ['--', '---'] +NULL_VALUES = ['--', '---', ''] # Used to reformat incoming dates into the expected form. # Example match: "2012/01/01 00:00:00.000" @@ -146,5 +147,7 @@ def normalize(bug, old_school=False): bug.votes = None bug.exists = True + bug.etl.timestamp = Date.now() + return elasticsearch.scrub(bug) diff --git a/resources/config/alias_analysis.json b/resources/config/alias_analysis.json index 20b4754..a7d7507 100644 --- a/resources/config/alias_analysis.json +++ b/resources/config/alias_analysis.json @@ -1,13 +1,14 @@ { "alias": { "start": 0, - "increment": 1000000, - "elasticsearch": { - "host": "http://localhost", - "index": "bug_aliases" - }, + "increment": 100000, +// "elasticsearch": { +// "host": "http://localhost", +// "index": "bug_aliases" +// }, + "minimum_diff": 7, "file": { - "path": "../schema/bugzilla_aliases.json", + "path": "resources/schema/bugzilla_aliases.json", "$ref": "//~/private.json#alias_file" } }, @@ -39,7 +40,15 @@ { "log_type": "stream", "stream": "sys.stdout" + }, + { + "log_type": "email", + "from_address": "klahnakoski@mozilla.com", + "to_address": "klahnakoski@mozilla.com", + "subject": "[ALERT][DEV]Problem with ActiveData Server", + "$ref": "file://~/private.json#email" } + ] } } diff --git a/tests/resources/config/test_one.json b/tests/resources/config/test_one.json index 41d4c60..8e24c84 100644 --- a/tests/resources/config/test_one.json +++ b/tests/resources/config/test_one.json @@ -2,45 +2,45 @@ "param": { "increment": 10000, "bugs": [ -// 384, //minor email diff -// 1108, //ok -// 1045, //ok -// 1046, //ok -// 1157, //ok -// 1877, //minor email diff -// 1865, //ok -// 1869, //minor email diff, missing recent history -// 2586, //missing recent history -// 3140, //minor email -// 6810, //ok -// 9622, //minor email diff -// 10575, //ok -// 11040, //alias analysis problem -// 12911, //alias analysis problem -// 13534, // (REVIEW MOVES TO OTHER PERSON) -// 67742, //alias analysis problem -// 96421, //minor email diff -// 123203,//expiry only -// 178960,//minor email -// 248970, // another cutoff review request -// 367518,//ok -// 457765,//ok -// 458397,//minor email -// 471427,//minor email -// 544327,//extra history -// 547727,//extra history + 384, //minor email diff + 1108, //ok + 1045, //ok + 1046, //ok + 1157, //ok + 1877, //minor email diff + 1865, //ok + 1869, //minor email diff, missing recent history + 2586, //missing recent history + 3140, //minor email + 6810, //ok + 9622, //minor email diff + 10575, //ok + 11040, //alias analysis problem + 12911, //alias analysis problem + 13534, // (REVIEW MOVES TO OTHER PERSON) + 67742, //alias analysis problem + 96421, //minor email diff + 123203,//expiry only + 178960,//minor email + 248970, // another cutoff review request + 367518,//ok + 457765,//ok + 458397,//minor email + 471427,//minor email + 544327,//extra history + 547727,//extra history 520943,//review flags bug 927494 -// 643420,//ok -// 692436,//minor email -// 726635,//alias problem -// 813650,//ERROR in blocked -// 937428, // whitespace after comma in user story, complex diff -// 1165765, // VERY LONG short_desc + 643420,//ok + 692436,//minor email + 726635,//alias problem + 813650,//ERROR in blocked + 937428, // whitespace after comma in user story, complex diff + 1165765, // VERY LONG short_desc // NOT VERIFIED // 248971, // another cutoff review request // 372836, // (REVIEW FLAGS TEST) -// 393845, // added blocking1.9+ twice + 393845, // added blocking1.9+ twice // 671185, // *many* review requests // 1007019, // does not have bug_status, or component, or product ], @@ -105,6 +105,13 @@ { "log_type": "stream", "stream": "sys.stdout" + }, + { + "log_type": "email", + "from_address": "klahnakoski@mozilla.com", + "to_address": "klahnakoski@mozilla.com", + "subject": "[ALERT][testing]Problem with Bugzilla-ETL", + "$ref": "file://~/private.json#email" } ] } diff --git a/tests/util/compare_es.py b/tests/util/compare_es.py index 06aa94c..2a72f01 100644 --- a/tests/util/compare_es.py +++ b/tests/util/compare_es.py @@ -96,7 +96,7 @@ def old2new(bug, max_date): bug = json2value(value2json(bug).replace("bugzilla: other b.m.o issues ", "bugzilla: other b.m.o issues")) if bug.expires_on > max_date: - bug.expires_on = parse_bug_history.MAX_TIME + bug.expires_on = parse_bug_history.MAX_TIMESTAMP if bug.votes != None: bug.votes = int(bug.votes) bug.dupe_by = convert.value2intlist(bug.dupe_by) diff --git a/vendor/jx_python/containers/cube.py b/vendor/jx_python/containers/cube.py index 11a849b..b41371d 100644 --- a/vendor/jx_python/containers/cube.py +++ b/vendor/jx_python/containers/cube.py @@ -92,7 +92,8 @@ class Cube(Container): if not self.edges: return 1 - return len(self.data.values()[0]) + for d in self.data.values(): + return len(d) def __iter__(self): if self.is_value: diff --git a/vendor/mo_collections/multiset.py b/vendor/mo_collections/multiset.py index e4ea3bc..d10dc62 100644 --- a/vendor/mo_collections/multiset.py +++ b/vendor/mo_collections/multiset.py @@ -230,7 +230,7 @@ class _NegMultiset(Multiset): return set(self.dic.keys()) def __len__(self): - return sum(self.dic.values()) + return sum(abs(v) for v in self.dic.values()) def __nonzero__(self): if self.dic: diff --git a/vendor/mo_dots/__init__.py b/vendor/mo_dots/__init__.py index a7114c1..14913a0 100644 --- a/vendor/mo_dots/__init__.py +++ b/vendor/mo_dots/__init__.py @@ -334,11 +334,11 @@ def _get_attr(obj, path): # WE CAN STILL PUT THE PATH TO THE FILE IN THE from CLAUSE if len(path) == 1: # GET MODULE OBJECT - output = __import__(obj.__name__ + b"." + attr_name.decode('utf8'), globals(), locals(), [attr_name.decode('utf8')], 0) + output = __import__(obj.__name__ + str(".") + str(attr_name), globals(), locals(), [str(attr_name)], 0) return output else: # GET VARIABLE IN MODULE - output = __import__(obj.__name__ + b"." + attr_name.decode('utf8'), globals(), locals(), [path[1].decode('utf8')], 0) + output = __import__(obj.__name__ + str(".") + str(attr_name), globals(), locals(), [str(path[1])], 0) return _get_attr(output, path[1:]) except Exception as e: Except = get_module("mo_logs.exceptions.Except") diff --git a/vendor/mo_files/__init__.py b/vendor/mo_files/__init__.py index 7964521..4832989 100644 --- a/vendor/mo_files/__init__.py +++ b/vendor/mo_files/__init__.py @@ -199,11 +199,15 @@ class File(object): return File.add_suffix(self._filename, suffix) def read(self, encoding="utf8"): + """ + :param encoding: + :return: + """ with open(self._filename, "rb") as f: - content = f.read().decode(encoding) if self.key: - return get_module(u"mo_math.crypto").decrypt(content, self.key) + return get_module("mo_math.crypto").decrypt(f.read(), self.key) else: + content = f.read().decode(encoding) return content def read_lines(self, encoding="utf8"): @@ -227,7 +231,10 @@ class File(object): if not self.parent.exists: self.parent.create() with open(self._filename, "rb") as f: - return f.read() + if self.key: + return get_module("mo_math.crypto").decrypt(f.read(), self.key) + else: + return f.read() except Exception as e: Log.error(u"Problem reading file {{filename}}", filename=self.abspath, cause=e) @@ -235,7 +242,10 @@ class File(object): if not self.parent.exists: self.parent.create() with open(self._filename, "wb") as f: - f.write(content) + if self.key: + f.write(get_module("mo_math.crypto").encrypt(content, self.key)) + else: + f.write(content) def write(self, data): if not self.parent.exists: @@ -255,7 +265,8 @@ class File(object): if not isinstance(d, text_type): Log.error(u"Expecting unicode data only") if self.key: - f.write(get_module(u"crypto").encrypt(d, self.key).encode("utf8")) + from mo_math.crypto import encrypt + f.write(encrypt(d, self.key).encode("utf8")) else: f.write(d.encode("utf8")) diff --git a/vendor/mo_math/crypto.py b/vendor/mo_math/crypto.py index 7168cc7..8371096 100644 --- a/vendor/mo_math/crypto.py +++ b/vendor/mo_math/crypto.py @@ -14,10 +14,9 @@ from __future__ import unicode_literals import base64 -from mo_future import text_type, binary_type from mo_dots import Data, get_module +from mo_future import text_type, binary_type, PY2 from mo_logs import Log - from mo_math.randoms import Random from mo_math.vendor.aespython import key_expander, aes_cipher, cbc_mode @@ -26,19 +25,26 @@ DEBUG = False def encrypt(text, _key, salt=None): """ - RETURN JSON OF ENCRYPTED DATA {"salt":s, "length":l, "data":d} + RETURN {"salt":s, "length":l, "data":d} -> JSON -> UTF8 """ - if not isinstance(text, text_type): - Log.error("only unicode is encrypted") + + if isinstance(text, text_type): + encoding = 'utf8' + data = bytearray(text.encode("utf8")) + elif isinstance(text, binary_type): + encoding = None + if PY2: + data = bytearray(text) + else: + data = text + if _key is None: Log.error("Expecting a key") - if isinstance(_key, str): + if isinstance(_key, binary_type): _key = bytearray(_key) if salt is None: salt = Random.bytes(16) - data = bytearray(text.encode("utf8")) - # Initialize encryption using key and iv key_expander_256 = key_expander.KeyExpander(256) expanded_key = key_expander_256.expand(_key) @@ -50,12 +56,13 @@ def encrypt(text, _key, salt=None): output.type = "AES256" output.salt = bytes2base64(salt) output.length = len(data) + output.encoding = encoding encrypted = bytearray() for _, d in _groupby16(data): encrypted.extend(aes_cbc_256.encrypt_block(d)) output.data = bytes2base64(encrypted) - json = get_module("mo_json").value2json(output) + json = get_module("mo_json").value2json(output, pretty=True).encode('utf8') if DEBUG: test = decrypt(json, _key) @@ -67,13 +74,13 @@ def encrypt(text, _key, salt=None): def decrypt(data, _key): """ - ACCEPT JSON OF ENCRYPTED DATA {"salt":s, "length":l, "data":d} + ACCEPT BYTES -> UTF8 -> JSON -> {"salt":s, "length":l, "data":d} """ # Key and iv have not been generated or provided, bail out if _key is None: Log.error("Expecting a key") - _input = get_module("mo_json").json2value(data) + _input = get_module("mo_json").json2value(data.decode('utf8'), leaves=False, flexible=False) # Initialize encryption using key and iv key_expander_256 = key_expander.KeyExpander(256) @@ -87,7 +94,11 @@ def decrypt(data, _key): for _, e in _groupby16(raw): out_data.extend(aes_cbc_256.decrypt_block(e)) - return binary_type(out_data[:_input.length:]).decode("utf8") + if _input.encoding: + return binary_type(out_data[:_input.length:]).decode(_input.encoding) + else: + return binary_type(out_data[:_input.length:]) + def bytes2base64(value):