allow alias' to load from file, or from ES. Prep for additional aliases coming from other fields

This commit is contained in:
Kyle Lahnakoski 2018-04-02 11:23:45 -04:00
Родитель a03d1f0801
Коммит dd8341b50a
13 изменённых файлов: 314 добавлений и 208 удалений

Просмотреть файл

@ -16,14 +16,22 @@ from bzETL.extract_bugzilla import get_all_cc_changes
from jx_python import jx from jx_python import jx
from mo_collections.multiset import Multiset from mo_collections.multiset import Multiset
from mo_dots import coalesce from mo_dots import coalesce
from mo_files import File
from mo_future import iteritems from mo_future import iteritems
from mo_json import value2json from mo_json import value2json, json2value
from mo_kwargs import override
from mo_logs import Log, startup, constants from mo_logs import Log, startup, constants
from pyLibrary.env import elasticsearch from mo_testing.fuzzytestcase import assertAlmostEqual
from pyLibrary.convert import zip2bytes, bytes2zip
from pyLibrary.env.elasticsearch import Cluster
from pyLibrary.sql.mysql import MySQL from pyLibrary.sql.mysql import MySQL
DEBUG = True
MINIMUM_DIFF_ROUGH = 7
MINIMUM_DIFF_FINE = 4
def full_analysis(settings, bug_list=None, please_stop=None):
def full_analysis(kwargs, bug_list=None, please_stop=None):
""" """
THE CC LISTS (AND REVIEWS) ARE EMAIL ADDRESSES THE BELONG TO PEOPLE. THE CC LISTS (AND REVIEWS) ARE EMAIL ADDRESSES THE BELONG TO PEOPLE.
SINCE THE EMAIL ADDRESS FOR A PERSON CAN CHANGE OVER TIME. THIS CODE SINCE THE EMAIL ADDRESS FOR A PERSON CAN CHANGE OVER TIME. THIS CODE
@ -31,30 +39,26 @@ def full_analysis(settings, bug_list=None, please_stop=None):
OVER THE LIFETIME OF THE BUGZILLA DATA. 'PERSON' IS ABSTRACT, AND SIMPLY OVER THE LIFETIME OF THE BUGZILLA DATA. 'PERSON' IS ABSTRACT, AND SIMPLY
ASSIGNED A CANONICAL EMAIL ADDRESS TO FACILITATE IDENTIFICATION ASSIGNED A CANONICAL EMAIL ADDRESS TO FACILITATE IDENTIFICATION
""" """
if settings.args.quick: if kwargs.args.quick:
Log.note("Alias analysis skipped (--quick was used)") Log.note("Alias analysis skipped (--quick was used)")
return return
analyzer = AliasAnalyzer(settings.alias) analyzer = AliasAnalyzer(kwargs.alias)
if bug_list: if bug_list:
with MySQL(kwargs=settings.bugzilla, readonly=True) as db: with MySQL(kwargs=kwargs.bugzilla, readonly=True) as db:
data = get_all_cc_changes(db, bug_list) data = get_all_cc_changes(db, bug_list)
analyzer.aggregator(data) analyzer.aggregator(data)
analyzer.analysis(True, please_stop) analyzer.analysis(True, please_stop)
return return
with MySQL(kwargs=settings.bugzilla, readonly=True) as db: with MySQL(kwargs=kwargs.bugzilla, readonly=True) as db:
start = coalesce(settings.alias.start, 0) start = coalesce(kwargs.alias.start, 0)
end = coalesce(settings.alias.end, db.query("SELECT max(bug_id)+1 bug_id FROM bugs")[0].bug_id) end = coalesce(kwargs.alias.end, db.query("SELECT max(bug_id)+1 bug_id FROM bugs")[0].bug_id)
#Perform analysis on blocks of bugs, in case we crash partway through #Perform analysis on blocks of bugs, in case we crash partway through
for s, e in jx.reverse(jx.intervals(start, end, settings.alias.increment)): for s, e in jx.intervals(start, end, kwargs.alias.increment):
Log.note( Log.note("Load range {{start}}-{{end}}", start=s, end=e)
"Load range {{start}}-{{end}}",
start=s,
end=e
)
if please_stop: if please_stop:
break break
data = get_all_cc_changes(db, range(s, e)) data = get_all_cc_changes(db, range(s, e))
@ -64,43 +68,24 @@ def full_analysis(settings, bug_list=None, please_stop=None):
class AliasAnalyzer(object): class AliasAnalyzer(object):
def __init__(self, settings): @override
self.bugs={} def __init__(
self.aliases={} self,
self.not_aliases={} # EXPLICIT LIST OF NON-MATCHES (HUMAN ADDED) elasticsearch=None, # ES INDEX TO STORE THE ALIASES
try: file=None, # FILE TO STORE ALIASES (IF ES DOES NOT EXIST, OR IS EMPTY)
self.es = elasticsearch.Cluster(settings.elasticsearch).get_or_create_index( start=0, # MINIMUM BUG NUMBER TO SCAN
kwargs=settings.elasticsearch, increment=100000, # NUMBER OF BUGS TO REVIEW IN ONE PASS
schema=ALIAS_SCHEMA, minimum_diff=MINIMUM_DIFF_ROUGH, # AMOUNT OF DISPARITY BETWEEN BEST AND SECOND-BEST MATCH
limit_replicas=True kwargs=None
) ):
self.es.add_alias(settings.elasticsearch.index) self.bugs = {}
self.aliases = {}
self.not_aliases = {} # EXPLICIT LIST OF NON-MATCHES (HUMAN ADDED)
self.kwargs = kwargs
self.es = None
self.esq = jx_elasticsearch.new_instance(self.es.settings) self.load_aliases()
result = self.esq.query({
"from": "bug_aliases",
"select": ["canonical", "alias"],
"where": {"missing": "ignore"},
"format": "list",
"limit": 50000
})
for r in result.data:
self.aliases[r.alias] = {"canonical": r.canonical, "dirty": False}
Log.note("{{num}} aliases loaded", num=len(self.aliases.keys()))
# LOAD THE NON-MATCHES
result = self.esq.query({
"from": "bug_aliases",
"select": ["canonical", "alias"],
"where": {"exists": "ignore"},
"format": "list"
})
for r in result.data:
self.not_aliases[r.alias] = r.canonical
except Exception as e:
Log.error("Can not init aliases", cause=e)
def aggregator(self, data): def aggregator(self, data):
""" """
@ -118,11 +103,13 @@ class AliasAnalyzer(object):
self.bugs[d.bug_id] = agg self.bugs[d.bug_id] = agg
def analysis(self, last_run, please_stop): def analysis(self, last_run, please_stop):
minimum_diff = 7 minimum_diff = self.kwargs.minimum_diff
if last_run: if last_run:
minimum_diff = 4 #ONCE WE HAVE ALL THE DATA IN WE CAN BE LESS DISCRIMINATING minimum_diff = min(minimum_diff, MINIMUM_DIFF_FINE) #ONCE WE HAVE ALL THE DATA IN WE CAN BE LESS DISCRIMINATING
try_again = True try_again = True
Log.note("running analysis with minimum_diff=={{minimum_diff}}", minimum_diff=minimum_diff)
while try_again and not please_stop: while try_again and not please_stop:
#FIND EMAIL MOST NEEDING REPLACEMENT #FIND EMAIL MOST NEEDING REPLACEMENT
problem_agg = Multiset(allow_negative=True) problem_agg = Multiset(allow_negative=True)
@ -171,20 +158,9 @@ class AliasAnalyzer(object):
self.save_aliases() self.save_aliases()
def alias(self, email): def alias(self, email):
canonical = self.aliases.get(email, None) canonical = self.aliases.get(email)
if not canonical: if not canonical:
canonical = self.esq.query({ return {"canonical":email, "dirty":False}
"from":"bug_aliases",
"select":"canonical",
"where":{"term":{"alias":email}}
})
if not canonical:
canonical = {"canonical":email, "dirty":False}
else:
canonical = {"canonical":canonical[0], "dirty":False}
self.aliases[email] = canonical
return canonical return canonical
def add_alias(self, lost, found): def add_alias(self, lost, found):
@ -221,9 +197,7 @@ class AliasAnalyzer(object):
reassign=[] reassign=[]
for k, v in self.aliases.items(): for k, v in self.aliases.items():
if v["canonical"] == old_email: if v["canonical"] == old_email:
if k == v["canonical"]: if k != v["canonical"]:
Log.note("ALIAS FOUND : {{alias}} -> {{new}}", alias=k, new=found)
else:
Log.note("ALIAS REMAPPED: {{alias}} -> {{old}} -> {{new}}", alias=k, old=v["canonical"], new=found) Log.note("ALIAS REMAPPED: {{alias}} -> {{old}} -> {{new}}", alias=k, old=v["canonical"], new=found)
reassign.append((k, found)) reassign.append((k, found))
@ -231,15 +205,83 @@ class AliasAnalyzer(object):
for k, found in reassign: for k, found in reassign:
self.aliases[k] = {"canonical":found, "dirty":True} self.aliases[k] = {"canonical":found, "dirty":True}
def save_aliases(self): def load_aliases(self):
records = [] try:
for k, v in self.aliases.items(): if self.kwargs.elasticsearch:
if v["dirty"]: self.es = Cluster(self.kwargs.elasticsearch).get_or_create_index(
records.append({"id": k, "value": {"canonical": v["canonical"], "alias": k}}) kwargs=self.kwargs.elasticsearch,
schema=ALIAS_SCHEMA,
limit_replicas=True
)
self.es.add_alias(self.kwargs.elasticsearch.index)
esq = jx_elasticsearch.new_instance(self.es.kwargs)
result = esq.query({
"from": "bug_aliases",
"select": ["canonical", "alias"],
"where": {"missing": "ignore"},
"format": "list",
"limit": 50000
})
for r in result.data:
self.aliases[r.alias] = {"canonical": r.canonical, "dirty": False}
num = len(self.aliases.keys())
if num<500:
# LOAD FROM FILE IF THE CLUSTER IS A BIT EMPTY
self._load_aliases_from_file()
return
Log.note("{{num}} aliases loaded from ES", num=num)
# LOAD THE NON-MATCHES
result = esq.query({
"from": "bug_aliases",
"select": ["canonical", "alias"],
"where": {"exists": "ignore"},
"format": "list"
})
for r in result.data:
self.not_aliases[r.alias] = r.canonical
else:
self._load_aliases_from_file()
except Exception as e:
Log.warning("Can not load aliases", cause=e)
def _load_aliases_from_file(self):
if self.kwargs.file:
data = json2value(zip2bytes(File(self.kwargs.file).read_bytes()).decode('utf8'), flexible=False, leaves=False)
self.aliases = {a: {"canonical": c, "dirty": True} for a, c in data.aliases.items()}
self.not_aliases = data.not_aliases
Log.note("{{num}} aliases loaded from file", num=len(self.aliases.keys()))
def save_aliases(self):
if self.es:
records = []
for k, v in self.aliases.items():
if v["dirty"]:
records.append({"id": k, "value": {"canonical": v["canonical"], "alias": k}})
if records:
Log.note("Net new aliases saved: {{num}}", num=len(records))
self.es.extend(records)
elif self.kwargs.file:
def compact():
return {
"aliases": {a: c['canonical'] for a, c in self.aliases.items() if c['canonical'] != a},
"mot_aliases": self.not_aliases
}
data = compact()
File(self.kwargs.file).write_bytes(bytes2zip(value2json(data, pretty=True).encode('utf8')))
if DEBUG:
Log.note("verify alias file")
self.load_aliases()
from_file = compact()
assertAlmostEqual(from_file, data)
assertAlmostEqual(data, from_file)
if records:
Log.note("Net new aliases saved: {{num}}", num=len(records))
self.es.extend(records)
def mapper(emails, aliases): def mapper(emails, aliases):
@ -264,10 +306,10 @@ def split_email(value):
def start(): def start():
try: try:
settings = startup.read_settings() kwargs = startup.read_settings()
constants.set(settings.constants) constants.set(kwargs.constants)
Log.start(settings.debug) Log.start(kwargs.debug)
full_analysis(settings) full_analysis(kwargs)
except Exception as e: except Exception as e:
Log.error("Can not start", e) Log.error("Can not start", e)
finally: finally:
@ -275,7 +317,7 @@ def start():
ALIAS_SCHEMA = { ALIAS_SCHEMA = {
"settings": {"index": { "kwargs": {"index": {
"number_of_shards": 3, "number_of_shards": 3,
"number_of_replicas": 0 "number_of_replicas": 0
}}, }},

Просмотреть файл

@ -115,6 +115,7 @@ def etl(db, output_queue, param, alias_config, please_stop):
for i, s in enumerate(sorted): for i, s in enumerate(sorted):
process.processRow(s) process.processRow(s)
process.processRow(wrap({"bug_id": parse_bug_history.STOP_BUG, "_merge_order": 1})) process.processRow(wrap({"bug_id": parse_bug_history.STOP_BUG, "_merge_order": 1}))
process.alias_analyzer.save_aliases()
def run_both_etl(db, output_queue, esq_comments, param, alias_config): def run_both_etl(db, output_queue, esq_comments, param, alias_config):

Просмотреть файл

@ -11,20 +11,18 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import unicode_literals from __future__ import unicode_literals
from bzETL.parse_bug_history import MAX_TIME
from bzETL.transform_bugzilla import NUMERIC_FIELDS
from jx_python import jx from jx_python import jx
from mo_dots.datas import Data from mo_dots.datas import Data
from mo_logs import Log from mo_logs import Log
from mo_times.timer import Timer from mo_times.timer import Timer
from pyLibrary import convert from pyLibrary import convert
from pyLibrary.queries.jx_usingMySQL import esfilter2sqlwhere from pyLibrary.queries.jx_usingMySQL import esfilter2sqlwhere
from pyLibrary.sql import SQL, SQL_ONE, SQL_NEG_ONE, sql_list, sql_iso from pyLibrary.sql import SQL
# USING THE TEXT DATETIME OF EPOCH THROWS A WARNING! USE ONE SECOND PAST EPOCH AS MINIMUM TIME.
from pyLibrary.sql.mysql import int_list_packer from pyLibrary.sql.mysql import int_list_packer
# USING THE TEXT DATETIME OF EPOCH THROWS A WARNING! USE ONE SECOND PAST EPOCH AS MINIMUM TIME.
MIN_TIMESTAMP = 1000 # MILLISECONDS SINCE EPOCH MIN_TIMESTAMP = 1000 # MILLISECONDS SINCE EPOCH
MAX_TIMESTAMP = 9999999999000
#ALL BUGS IN PRIVATE ETL HAVE SCREENED FIELDS #ALL BUGS IN PRIVATE ETL HAVE SCREENED FIELDS
SCREENED_FIELDDEFS = [ SCREENED_FIELDDEFS = [
@ -239,7 +237,7 @@ def get_bugs(db, param):
else: else:
return db.quote_column(col.column_name) return db.quote_column(col.column_name)
param.bugs_columns = jx.select(bugs_columns, "column_name") param.bugs_columns = bugs_columns.column_name
param.bugs_columns_SQL = SQL(",\n".join(lower(c) for c in bugs_columns)) param.bugs_columns_SQL = SQL(",\n".join(lower(c) for c in bugs_columns))
param.bug_filter = esfilter2sqlwhere(db, {"terms": {"b.bug_id": param.bug_list}}) param.bug_filter = esfilter2sqlwhere(db, {"terms": {"b.bug_id": param.bug_list}})
param.screened_whiteboard = esfilter2sqlwhere(db, {"and": [ param.screened_whiteboard = esfilter2sqlwhere(db, {"and": [
@ -449,7 +447,7 @@ def get_all_cc_changes(db, bug_list):
{{bug_filter}} {{bug_filter}}
""", """,
{ {
"max_time": MAX_TIME, "max_time": MAX_TIMESTAMP,
"cc_field_id": CC_FIELD_ID, "cc_field_id": CC_FIELD_ID,
"bug_filter": esfilter2sqlwhere(db, int_list_packer("bug_id", bug_list)) "bug_filter": esfilter2sqlwhere(db, int_list_packer("bug_id", bug_list))
}, },
@ -590,7 +588,7 @@ def get_new_activities(db, param):
a.id, a.id,
a.bug_id, a.bug_id,
UNIX_TIMESTAMP(bug_when)*1000 AS modified_ts, UNIX_TIMESTAMP(bug_when)*1000 AS modified_ts,
lower(login_name) AS modified_by, lower(p.login_name) AS modified_by,
replace(field.`name`, '.', '_') AS field_name, replace(field.`name`, '.', '_') AS field_name,
CAST( CAST(
CASE CASE
@ -598,6 +596,7 @@ def get_new_activities(db, param):
WHEN m.bug_id IS NOT NULL AND a.fieldid={{whiteboard_field}} AND added IS NOT NULL AND trim(added)<>'' THEN '[screened]' WHEN m.bug_id IS NOT NULL AND a.fieldid={{whiteboard_field}} AND added IS NOT NULL AND trim(added)<>'' THEN '[screened]'
WHEN a.fieldid IN {{mixed_case_fields}} THEN trim(added) WHEN a.fieldid IN {{mixed_case_fields}} THEN trim(added)
WHEN trim(added)='' THEN NULL WHEN trim(added)='' THEN NULL
WHEN qa_contact.userid IS NOT NULL THEN qa_contact.login_name
ELSE lower(trim(added)) ELSE lower(trim(added))
END END
AS CHAR CHARACTER SET utf8) AS new_value, AS CHAR CHARACTER SET utf8) AS new_value,
@ -620,6 +619,8 @@ def get_new_activities(db, param):
fielddefs field ON a.fieldid = field.`id` fielddefs field ON a.fieldid = field.`id`
LEFT JOIN LEFT JOIN
bug_group_map m on m.bug_id=a.bug_id AND {{screened_whiteboard}} bug_group_map m on m.bug_id=a.bug_id AND {{screened_whiteboard}}
LEFT JOIN
profiles qa_contact ON qa_contact.userid=CAST(a.added AS UNSIGNED) and a.fieldid=36 AND a.added REGEXP '^[0-9]+$'
WHERE WHERE
{{bug_filter}} {{bug_filter}}
# NEED TO QUERY ES TO GET bug_version_num OTHERWISE WE NEED ALL HISTORY # NEED TO QUERY ES TO GET bug_version_num OTHERWISE WE NEED ALL HISTORY

Просмотреть файл

@ -44,16 +44,16 @@ import math
import re import re
import jx_elasticsearch import jx_elasticsearch
import jx_python from bzETL.alias_analysis import AliasAnalyzer
from mo_future import text_type from bzETL.extract_bugzilla import MAX_TIMESTAMP
from bzETL.transform_bugzilla import normalize, NUMERIC_FIELDS, MULTI_FIELDS, DIFF_FIELDS, NULL_VALUES
from bzETL.transform_bugzilla import normalize, NUMERIC_FIELDS, MULTI_FIELDS, DIFF_FIELDS
from jx_python import jx, meta from jx_python import jx, meta
from mo_dots import inverse, coalesce, wrap, unwrap from mo_dots import inverse, coalesce, wrap, unwrap
from mo_dots.datas import Data from mo_dots.datas import Data
from mo_dots.lists import FlatList from mo_dots.lists import FlatList
from mo_dots.nones import Null from mo_dots.nones import Null
from mo_files import File from mo_files import File
from mo_future import text_type
from mo_json import json2value, value2json from mo_json import json2value, value2json
from mo_logs import Log, strings from mo_logs import Log, strings
from mo_logs.strings import apply_diff from mo_logs.strings import apply_diff
@ -66,9 +66,9 @@ from pyLibrary import convert
FLAG_PATTERN = re.compile("^(.*)([?+-])(\\([^)]*\\))?$") FLAG_PATTERN = re.compile("^(.*)([?+-])(\\([^)]*\\))?$")
DEBUG_CHANGES = False # SHOW ACTIVITY RECORDS BEING PROCESSED DEBUG_CHANGES = True # SHOW ACTIVITY RECORDS BEING PROCESSED
DEBUG_STATUS = False # SHOW CURRENT STATE OF PROCESSING DEBUG_STATUS = False # SHOW CURRENT STATE OF PROCESSING
DEBUG_CC_CHANGES = True # SHOW MISMATCHED CC CHANGES DEBUG_CC_CHANGES = False # SHOW MISMATCHED CC CHANGES
DEBUG_FLAG_MATCHES = False DEBUG_FLAG_MATCHES = False
USE_PREVIOUS_VALUE_OBJECTS = False USE_PREVIOUS_VALUE_OBJECTS = False
@ -79,8 +79,15 @@ KNOWN_MISSING_KEYWORDS = {
"dogfood", "beta1", "nsbeta1", "nsbeta2", "nsbeta3", "patch", "mozilla1.0", "correctness", "dogfood", "beta1", "nsbeta1", "nsbeta2", "nsbeta3", "patch", "mozilla1.0", "correctness",
"mozilla0.9", "mozilla0.9.9+", "nscatfood", "mozilla0.9.3", "fcc508", "nsbeta1+", "mostfreq" "mozilla0.9", "mozilla0.9.9+", "nscatfood", "mozilla0.9.3", "fcc508", "nsbeta1+", "mostfreq"
} }
KNOWN_INCONSISTENT_FIELDS = {
"cf_last_resolved" # CHANGES IN DATABASE TIMEZONE
}
FIELDS_CHANGED = { # SOME FIELD VALUES ARE CHANGED WITHOUT HISTORY BEING CHANGED TOO https://bugzilla.mozilla.org/show_bug.cgi?id=997228
"cf_blocking_b2g":{"1.5":"2.0"}
}
EMAIL_FIELDS = {'cc', 'assigned_to', 'modified_by', 'created_by', 'qa_contact', 'bug_mentor'}
STOP_BUG = 999999999 # AN UNFORTUNATE SIDE EFFECT OF DATAFLOW PROGRAMMING (http://en.wikipedia.org/wiki/Dataflow_programming) STOP_BUG = 999999999 # AN UNFORTUNATE SIDE EFFECT OF DATAFLOW PROGRAMMING (http://en.wikipedia.org/wiki/Dataflow_programming)
MAX_TIME = 9999999999000
@ -91,11 +98,7 @@ class BugHistoryParser(object):
self.prev_row = Null self.prev_row = Null
self.settings = settings self.settings = settings
self.output = output_queue self.output = output_queue
self.alias_analyzer = AliasAnalyzer(alias_config)
self.alias_config=alias_config
self.aliases = Null
self.initialize_aliases()
def processRow(self, row_in): def processRow(self, row_in):
if not row_in: if not row_in:
@ -236,7 +239,6 @@ class BugHistoryParser(object):
cause=e cause=e
) )
def processAttachmentsTableItem(self, row_in): def processAttachmentsTableItem(self, row_in):
currActivityID = BugHistoryParser.uid(self.currBugID, row_in.modified_ts) currActivityID = BugHistoryParser.uid(self.currBugID, row_in.modified_ts)
if currActivityID != self.prevActivityID: if currActivityID != self.prevActivityID:
@ -365,28 +367,56 @@ class BugHistoryParser(object):
total = self.removeValues(total, multi_field_new_value, "added", row_in.field_name, "currBugState", self.currBugState) total = self.removeValues(total, multi_field_new_value, "added", row_in.field_name, "currBugState", self.currBugState)
total = self.addValues(total, multi_field_old_value, "removed bug", row_in.field_name, self.currBugState) total = self.addValues(total, multi_field_old_value, "removed bug", row_in.field_name, self.currBugState)
self.currBugState[row_in.field_name] = total self.currBugState[row_in.field_name] = total
elif row_in.field_name in DIFF_FIELDS:
diff = row_in.new_value
expected_value = self.currBugState[row_in.field_name]
try:
old_value = ApplyDiff(row_in.modified_ts, expected_value, diff, reverse=True)
self.currBugState[row_in.field_name] = old_value
self.currActivity.changes.append({
"field_name": row_in.field_name,
"new_value": expected_value,
"old_value": old_value,
"attach_id": row_in.attach_id
})
except Exception as e:
Log.warning(
"[Bug {{bug_id}}]: PROBLEM Unable to process {{field_name}} diff:\n{{diff|indent}}",
bug_id=self.currBugID,
field_name=row_in.field_name,
diff=diff,
cause=e
)
else: else:
if row_in.field_name in DIFF_FIELDS: expected_value = self.canonical(self.currBugState[row_in.field_name])
diff = row_in.new_value new_value = self.canonical(row_in.field_name, row_in.new_value)
try:
new_value = self.currBugState[row_in.field_name] if text_type(new_value) != text_type(expected_value):
row_in.new_value = new_value if DEBUG_CHANGES and row_in.field_name not in KNOWN_INCONSISTENT_FIELDS:
row_in.old_value = ApplyDiff(row_in.modified_ts, new_value, diff, reverse=True) if row_in.field_name=='cc':
except Exception as e: self.alias_analyzer.add_alias(expected_value, new_value)
Log.warning( else:
"[Bug {{bug_id}}]: PROBLEM Unable to process {{field_name}} diff:\n{{diff|indent}}", lookup = FIELDS_CHANGED.setdefault(row_in.field_name, {})
if expected_value:
lookup[new_value] = expected_value
File("expected_values.json").write(value2json(FIELDS_CHANGED, pretty=True))
Log.note(
"[Bug {{bug_id}}]: PROBLEM inconsistent change: {{field}} was {{expecting|quote}} got {{observed|quote}}",
bug_id=self.currBugID, bug_id=self.currBugID,
field_name=row_in.field_name, field=row_in.field_name,
diff=diff, expecting=expected_value,
cause=e observed=new_value
) )
self.currBugState[row_in.field_name] = row_in.old_value
# WE DO NOT ATTEMPT TO CHANGE THE VALUES IN HISTORY TO BE CONSISTENT WITH THE FUTURE
self.currActivity.changes.append({ self.currActivity.changes.append({
"field_name": row_in.field_name, "field_name": row_in.field_name,
"new_value": row_in.new_value, "new_value": self.currBugState[row_in.field_name],
"old_value": row_in.old_value, "old_value": row_in.old_value,
"attach_id": row_in.attach_id "attach_id": row_in.attach_id
}) })
self.currBugState[row_in.field_name] = row_in.old_value
def populateIntermediateVersionObjects(self): def populateIntermediateVersionObjects(self):
# Make sure the self.bugVersions are in descending order by modification time. # Make sure the self.bugVersions are in descending order by modification time.
@ -445,7 +475,7 @@ class BugHistoryParser(object):
mergeBugVersion = True mergeBugVersion = True
# Link this version to the next one (if there is a next one) # Link this version to the next one (if there is a next one)
self.currBugState.expires_on = coalesce(nextVersion.modified_ts, MAX_TIME) self.currBugState.expires_on = coalesce(nextVersion.modified_ts, MAX_TIMESTAMP)
# Copy all attributes from the current version into self.currBugState # Copy all attributes from the current version into self.currBugState
for propName, propValue in currVersion.items(): for propName, propValue in currVersion.items():
@ -477,8 +507,6 @@ class BugHistoryParser(object):
changes[c] = Null changes[c] = Null
continue continue
if DEBUG_CHANGES:
Log.note("Processing change: " + value2json(change))
target = self.currBugState target = self.currBugState
targetName = "currBugState" targetName = "currBugState"
attach_id = change.attach_id attach_id = change.attach_id
@ -552,8 +580,8 @@ class BugHistoryParser(object):
deformat(f.request_type) == deformat(flag.request_type) and deformat(f.request_type) == deformat(flag.request_type) and
f.request_status == flag.request_status and f.request_status == flag.request_status and
( (
(f.request_status!='?' and self.alias(f.modified_by) == self.alias(flag.modified_by)) or (f.request_status!='?' and self.email_alias(f.modified_by) == self.email_alias(flag.modified_by)) or
(f.request_status=='?' and self.alias(f.requestee) == self.alias(flag.requestee)) (f.request_status=='?' and self.email_alias(f.requestee) == self.email_alias(flag.requestee))
) )
): ):
return f return f
@ -649,7 +677,7 @@ class BugHistoryParser(object):
matched_req = [ matched_req = [
element element
for element in candidates for element in candidates
if self.alias(added_flag["modified_by"]) == self.alias(element["requestee"]) if self.email_alias(added_flag["modified_by"]) == self.email_alias(element["requestee"])
] ]
if not matched_ts and not matched_req: if not matched_ts and not matched_req:
@ -679,7 +707,7 @@ class BugHistoryParser(object):
matched_both = [ matched_both = [
element element
for element in candidates for element in candidates
if added_flag.modified_ts == element.modified_ts and self.alias(added_flag["modified_by"]) == self.alias(element["requestee"]) if added_flag.modified_ts == element.modified_ts and self.email_alias(added_flag["modified_by"]) == self.email_alias(element["requestee"])
] ]
if matched_both: if matched_both:
@ -778,9 +806,9 @@ class BugHistoryParser(object):
if field_name == "flags": if field_name == "flags":
Log.error("use processFlags") Log.error("use processFlags")
elif field_name == "cc": elif field_name == "cc":
# MAP CANONICAL TO EXISTING (BETWEEN map_* AND self.aliases WE HAVE A BIJECTION) # MAP CANONICAL TO EXISTING (BETWEEN map_* AND self.email_aliases WE HAVE A BIJECTION)
map_total = inverse({t: self.alias(t) for t in total}) map_total = inverse({t: self.email_alias(t) for t in total})
map_remove = inverse({r: self.alias(r) for r in remove}) map_remove = inverse({r: self.email_alias(r) for r in remove})
# CANONICAL VALUES # CANONICAL VALUES
c_total = set(map_total.keys()) c_total = set(map_total.keys())
c_remove = set(map_remove.keys()) c_remove = set(map_remove.keys())
@ -797,7 +825,7 @@ class BugHistoryParser(object):
"field_name": field_name, "field_name": field_name,
"missing": jx.sort(jx.map2set(diff, map_remove)), "missing": jx.sort(jx.map2set(diff, map_remove)),
"existing": jx.sort(total), "existing": jx.sort(total),
"candidates": {d: self.aliases.get(d, None) for d in diff}, "candidates": {d: self.email_aliases.get(d, None) for d in diff},
"bug_id": self.currBugID "bug_id": self.currBugID
}) })
@ -893,7 +921,7 @@ class BugHistoryParser(object):
"existing": total "existing": total
}) })
if field_name == "keywords": if field_name == "keywords":
KNOWN_MISSING_KEYWORDS.extend(diff) KNOWN_MISSING_KEYWORDS.update(diff)
return output return output
@ -953,25 +981,17 @@ class BugHistoryParser(object):
return total return total
def alias(self, name): def canonical(self, field, value):
if value in NULL_VALUES:
return None
if field in EMAIL_FIELDS:
return self.email_alias(value)
return FIELDS_CHANGED.get(field, {}).get(value, value)
def email_alias(self, name):
if name == None: if name == None:
return Null return Null
return coalesce(self.aliases.get(name, Null), name) return self.alias_analyzer.aliases.get(name, name)
def initialize_aliases(self):
try:
if self.alias_config.elasticsearch:
esq = jx_elasticsearch.new_instance(self.alias_config.elasticsearch)
result = esq.query({"select": ["alias", "canonical"], "where": {"missing": "ignore"}, "limit": 10000, "format":"list"})
self.aliases = {d.alias:d.canonical for d in result.data}
else:
alias_json = File(self.alias_config.file).read()
self.aliases = {k: wrap(v) for k, v in json2value(alias_json).items()}
except Exception as e:
Log.warning("Could not load alias file", cause=e)
self.aliases = {}
Log.note("{{num}} aliases loaded", num=len(self.aliases.keys()))
def parse_flag(flag, modified_ts, modified_by): def parse_flag(flag, modified_ts, modified_by):

Просмотреть файл

@ -19,6 +19,7 @@ from mo_dots import listwrap
from mo_future import text_type, long from mo_future import text_type, long
from mo_json import json2value, value2json from mo_json import json2value, value2json
from mo_logs import Log from mo_logs import Log
from mo_times import Date
from pyLibrary import convert from pyLibrary import convert
from pyLibrary.env import elasticsearch from pyLibrary.env import elasticsearch
@ -39,7 +40,7 @@ NUMERIC_FIELDS=[
"remaining_time" "remaining_time"
] ]
NULL_VALUES = ['--', '---'] NULL_VALUES = ['--', '---', '']
# Used to reformat incoming dates into the expected form. # Used to reformat incoming dates into the expected form.
# Example match: "2012/01/01 00:00:00.000" # Example match: "2012/01/01 00:00:00.000"
@ -146,5 +147,7 @@ def normalize(bug, old_school=False):
bug.votes = None bug.votes = None
bug.exists = True bug.exists = True
bug.etl.timestamp = Date.now()
return elasticsearch.scrub(bug) return elasticsearch.scrub(bug)

Просмотреть файл

@ -1,13 +1,14 @@
{ {
"alias": { "alias": {
"start": 0, "start": 0,
"increment": 1000000, "increment": 100000,
"elasticsearch": { // "elasticsearch": {
"host": "http://localhost", // "host": "http://localhost",
"index": "bug_aliases" // "index": "bug_aliases"
}, // },
"minimum_diff": 7,
"file": { "file": {
"path": "../schema/bugzilla_aliases.json", "path": "resources/schema/bugzilla_aliases.json",
"$ref": "//~/private.json#alias_file" "$ref": "//~/private.json#alias_file"
} }
}, },
@ -39,7 +40,15 @@
{ {
"log_type": "stream", "log_type": "stream",
"stream": "sys.stdout" "stream": "sys.stdout"
},
{
"log_type": "email",
"from_address": "klahnakoski@mozilla.com",
"to_address": "klahnakoski@mozilla.com",
"subject": "[ALERT][DEV]Problem with ActiveData Server",
"$ref": "file://~/private.json#email"
} }
] ]
} }
} }

Просмотреть файл

@ -2,45 +2,45 @@
"param": { "param": {
"increment": 10000, "increment": 10000,
"bugs": [ "bugs": [
// 384, //minor email diff 384, //minor email diff
// 1108, //ok 1108, //ok
// 1045, //ok 1045, //ok
// 1046, //ok 1046, //ok
// 1157, //ok 1157, //ok
// 1877, //minor email diff 1877, //minor email diff
// 1865, //ok 1865, //ok
// 1869, //minor email diff, missing recent history 1869, //minor email diff, missing recent history
// 2586, //missing recent history 2586, //missing recent history
// 3140, //minor email 3140, //minor email
// 6810, //ok 6810, //ok
// 9622, //minor email diff 9622, //minor email diff
// 10575, //ok 10575, //ok
// 11040, //alias analysis problem 11040, //alias analysis problem
// 12911, //alias analysis problem 12911, //alias analysis problem
// 13534, // (REVIEW MOVES TO OTHER PERSON) 13534, // (REVIEW MOVES TO OTHER PERSON)
// 67742, //alias analysis problem 67742, //alias analysis problem
// 96421, //minor email diff 96421, //minor email diff
// 123203,//expiry only 123203,//expiry only
// 178960,//minor email 178960,//minor email
// 248970, // another cutoff review request 248970, // another cutoff review request
// 367518,//ok 367518,//ok
// 457765,//ok 457765,//ok
// 458397,//minor email 458397,//minor email
// 471427,//minor email 471427,//minor email
// 544327,//extra history 544327,//extra history
// 547727,//extra history 547727,//extra history
520943,//review flags bug 927494 520943,//review flags bug 927494
// 643420,//ok 643420,//ok
// 692436,//minor email 692436,//minor email
// 726635,//alias problem 726635,//alias problem
// 813650,//ERROR in blocked 813650,//ERROR in blocked
// 937428, // whitespace after comma in user story, complex diff 937428, // whitespace after comma in user story, complex diff
// 1165765, // VERY LONG short_desc 1165765, // VERY LONG short_desc
// NOT VERIFIED // NOT VERIFIED
// 248971, // another cutoff review request // 248971, // another cutoff review request
// 372836, // (REVIEW FLAGS TEST) // 372836, // (REVIEW FLAGS TEST)
// 393845, // added blocking1.9+ twice 393845, // added blocking1.9+ twice
// 671185, // *many* review requests // 671185, // *many* review requests
// 1007019, // does not have bug_status, or component, or product // 1007019, // does not have bug_status, or component, or product
], ],
@ -105,6 +105,13 @@
{ {
"log_type": "stream", "log_type": "stream",
"stream": "sys.stdout" "stream": "sys.stdout"
},
{
"log_type": "email",
"from_address": "klahnakoski@mozilla.com",
"to_address": "klahnakoski@mozilla.com",
"subject": "[ALERT][testing]Problem with Bugzilla-ETL",
"$ref": "file://~/private.json#email"
} }
] ]
} }

Просмотреть файл

@ -96,7 +96,7 @@ def old2new(bug, max_date):
bug = json2value(value2json(bug).replace("bugzilla: other b.m.o issues ", "bugzilla: other b.m.o issues")) bug = json2value(value2json(bug).replace("bugzilla: other b.m.o issues ", "bugzilla: other b.m.o issues"))
if bug.expires_on > max_date: if bug.expires_on > max_date:
bug.expires_on = parse_bug_history.MAX_TIME bug.expires_on = parse_bug_history.MAX_TIMESTAMP
if bug.votes != None: if bug.votes != None:
bug.votes = int(bug.votes) bug.votes = int(bug.votes)
bug.dupe_by = convert.value2intlist(bug.dupe_by) bug.dupe_by = convert.value2intlist(bug.dupe_by)

3
vendor/jx_python/containers/cube.py поставляемый
Просмотреть файл

@ -92,7 +92,8 @@ class Cube(Container):
if not self.edges: if not self.edges:
return 1 return 1
return len(self.data.values()[0]) for d in self.data.values():
return len(d)
def __iter__(self): def __iter__(self):
if self.is_value: if self.is_value:

2
vendor/mo_collections/multiset.py поставляемый
Просмотреть файл

@ -230,7 +230,7 @@ class _NegMultiset(Multiset):
return set(self.dic.keys()) return set(self.dic.keys())
def __len__(self): def __len__(self):
return sum(self.dic.values()) return sum(abs(v) for v in self.dic.values())
def __nonzero__(self): def __nonzero__(self):
if self.dic: if self.dic:

4
vendor/mo_dots/__init__.py поставляемый
Просмотреть файл

@ -334,11 +334,11 @@ def _get_attr(obj, path):
# WE CAN STILL PUT THE PATH TO THE FILE IN THE from CLAUSE # WE CAN STILL PUT THE PATH TO THE FILE IN THE from CLAUSE
if len(path) == 1: if len(path) == 1:
# GET MODULE OBJECT # GET MODULE OBJECT
output = __import__(obj.__name__ + b"." + attr_name.decode('utf8'), globals(), locals(), [attr_name.decode('utf8')], 0) output = __import__(obj.__name__ + str(".") + str(attr_name), globals(), locals(), [str(attr_name)], 0)
return output return output
else: else:
# GET VARIABLE IN MODULE # GET VARIABLE IN MODULE
output = __import__(obj.__name__ + b"." + attr_name.decode('utf8'), globals(), locals(), [path[1].decode('utf8')], 0) output = __import__(obj.__name__ + str(".") + str(attr_name), globals(), locals(), [str(path[1])], 0)
return _get_attr(output, path[1:]) return _get_attr(output, path[1:])
except Exception as e: except Exception as e:
Except = get_module("mo_logs.exceptions.Except") Except = get_module("mo_logs.exceptions.Except")

21
vendor/mo_files/__init__.py поставляемый
Просмотреть файл

@ -199,11 +199,15 @@ class File(object):
return File.add_suffix(self._filename, suffix) return File.add_suffix(self._filename, suffix)
def read(self, encoding="utf8"): def read(self, encoding="utf8"):
"""
:param encoding:
:return:
"""
with open(self._filename, "rb") as f: with open(self._filename, "rb") as f:
content = f.read().decode(encoding)
if self.key: if self.key:
return get_module(u"mo_math.crypto").decrypt(content, self.key) return get_module("mo_math.crypto").decrypt(f.read(), self.key)
else: else:
content = f.read().decode(encoding)
return content return content
def read_lines(self, encoding="utf8"): def read_lines(self, encoding="utf8"):
@ -227,7 +231,10 @@ class File(object):
if not self.parent.exists: if not self.parent.exists:
self.parent.create() self.parent.create()
with open(self._filename, "rb") as f: with open(self._filename, "rb") as f:
return f.read() if self.key:
return get_module("mo_math.crypto").decrypt(f.read(), self.key)
else:
return f.read()
except Exception as e: except Exception as e:
Log.error(u"Problem reading file {{filename}}", filename=self.abspath, cause=e) Log.error(u"Problem reading file {{filename}}", filename=self.abspath, cause=e)
@ -235,7 +242,10 @@ class File(object):
if not self.parent.exists: if not self.parent.exists:
self.parent.create() self.parent.create()
with open(self._filename, "wb") as f: with open(self._filename, "wb") as f:
f.write(content) if self.key:
f.write(get_module("mo_math.crypto").encrypt(content, self.key))
else:
f.write(content)
def write(self, data): def write(self, data):
if not self.parent.exists: if not self.parent.exists:
@ -255,7 +265,8 @@ class File(object):
if not isinstance(d, text_type): if not isinstance(d, text_type):
Log.error(u"Expecting unicode data only") Log.error(u"Expecting unicode data only")
if self.key: if self.key:
f.write(get_module(u"crypto").encrypt(d, self.key).encode("utf8")) from mo_math.crypto import encrypt
f.write(encrypt(d, self.key).encode("utf8"))
else: else:
f.write(d.encode("utf8")) f.write(d.encode("utf8"))

35
vendor/mo_math/crypto.py поставляемый
Просмотреть файл

@ -14,10 +14,9 @@ from __future__ import unicode_literals
import base64 import base64
from mo_future import text_type, binary_type
from mo_dots import Data, get_module from mo_dots import Data, get_module
from mo_future import text_type, binary_type, PY2
from mo_logs import Log from mo_logs import Log
from mo_math.randoms import Random from mo_math.randoms import Random
from mo_math.vendor.aespython import key_expander, aes_cipher, cbc_mode from mo_math.vendor.aespython import key_expander, aes_cipher, cbc_mode
@ -26,19 +25,26 @@ DEBUG = False
def encrypt(text, _key, salt=None): def encrypt(text, _key, salt=None):
""" """
RETURN JSON OF ENCRYPTED DATA {"salt":s, "length":l, "data":d} RETURN {"salt":s, "length":l, "data":d} -> JSON -> UTF8
""" """
if not isinstance(text, text_type):
Log.error("only unicode is encrypted") if isinstance(text, text_type):
encoding = 'utf8'
data = bytearray(text.encode("utf8"))
elif isinstance(text, binary_type):
encoding = None
if PY2:
data = bytearray(text)
else:
data = text
if _key is None: if _key is None:
Log.error("Expecting a key") Log.error("Expecting a key")
if isinstance(_key, str): if isinstance(_key, binary_type):
_key = bytearray(_key) _key = bytearray(_key)
if salt is None: if salt is None:
salt = Random.bytes(16) salt = Random.bytes(16)
data = bytearray(text.encode("utf8"))
# Initialize encryption using key and iv # Initialize encryption using key and iv
key_expander_256 = key_expander.KeyExpander(256) key_expander_256 = key_expander.KeyExpander(256)
expanded_key = key_expander_256.expand(_key) expanded_key = key_expander_256.expand(_key)
@ -50,12 +56,13 @@ def encrypt(text, _key, salt=None):
output.type = "AES256" output.type = "AES256"
output.salt = bytes2base64(salt) output.salt = bytes2base64(salt)
output.length = len(data) output.length = len(data)
output.encoding = encoding
encrypted = bytearray() encrypted = bytearray()
for _, d in _groupby16(data): for _, d in _groupby16(data):
encrypted.extend(aes_cbc_256.encrypt_block(d)) encrypted.extend(aes_cbc_256.encrypt_block(d))
output.data = bytes2base64(encrypted) output.data = bytes2base64(encrypted)
json = get_module("mo_json").value2json(output) json = get_module("mo_json").value2json(output, pretty=True).encode('utf8')
if DEBUG: if DEBUG:
test = decrypt(json, _key) test = decrypt(json, _key)
@ -67,13 +74,13 @@ def encrypt(text, _key, salt=None):
def decrypt(data, _key): def decrypt(data, _key):
""" """
ACCEPT JSON OF ENCRYPTED DATA {"salt":s, "length":l, "data":d} ACCEPT BYTES -> UTF8 -> JSON -> {"salt":s, "length":l, "data":d}
""" """
# Key and iv have not been generated or provided, bail out # Key and iv have not been generated or provided, bail out
if _key is None: if _key is None:
Log.error("Expecting a key") Log.error("Expecting a key")
_input = get_module("mo_json").json2value(data) _input = get_module("mo_json").json2value(data.decode('utf8'), leaves=False, flexible=False)
# Initialize encryption using key and iv # Initialize encryption using key and iv
key_expander_256 = key_expander.KeyExpander(256) key_expander_256 = key_expander.KeyExpander(256)
@ -87,7 +94,11 @@ def decrypt(data, _key):
for _, e in _groupby16(raw): for _, e in _groupby16(raw):
out_data.extend(aes_cbc_256.decrypt_block(e)) out_data.extend(aes_cbc_256.decrypt_block(e))
return binary_type(out_data[:_input.length:]).decode("utf8") if _input.encoding:
return binary_type(out_data[:_input.length:]).decode(_input.encoding)
else:
return binary_type(out_data[:_input.length:])
def bytes2base64(value): def bytes2base64(value):