From 1f9db13f0d72f0ef4d70db0e2b254d51d09bf29a Mon Sep 17 00:00:00 2001 From: Kyle Lahnakoski Date: Tue, 9 Jan 2018 10:06:17 -0500 Subject: [PATCH] tests running, but failing with multiple data problems --- bzETL/bz_etl.py | 6 ++-- bzETL/parse_bug_history.py | 20 ++++++----- bzETL/replicate.py | 7 ++-- bzETL/transform_bugzilla.py | 8 +++-- resources/config/test_one_settings.json | 2 +- resources/{json => schema}/bug_comments.json | 0 resources/{json => schema}/bug_version.json | 0 tests/resources/python/leak_check.py | 4 +-- tests/test_etl.py | 11 +++--- tests/test_one_etl.py | 6 ++-- tests/util/compare_es.py | 35 +++++++++++-------- .../containers/list_usingPythonList.py | 20 +++++++---- vendor/pyLibrary/testing/elasticsearch.py | 2 ++ 13 files changed, 75 insertions(+), 46 deletions(-) rename resources/{json => schema}/bug_comments.json (100%) rename resources/{json => schema}/bug_version.json (100%) diff --git a/bzETL/bz_etl.py b/bzETL/bz_etl.py index 950c035..cfb3112 100644 --- a/bzETL/bz_etl.py +++ b/bzETL/bz_etl.py @@ -11,6 +11,8 @@ from __future__ import absolute_import from __future__ import division from __future__ import unicode_literals +from mo_future import text_type + from bzETL import extract_bugzilla, alias_analysis, parse_bug_history from bzETL.extract_bugzilla import * from bzETL.parse_bug_history import BugHistoryParser @@ -175,7 +177,7 @@ def setup_es(settings, db, es, es_comments): settings.es_comments.index = Cluster.proto_name(settings.es_comments.alias) es_comments = Cluster.create_index(kwargs=settings.es_comments, limit_replicas=True) - File(settings.param.first_run_time).write(unicode(convert.datetime2milli(current_run_time))) + File(settings.param.first_run_time).write(text_type(convert.datetime2milli(current_run_time))) return current_run_time, es, es_comments, last_run_time @@ -401,7 +403,7 @@ def main(settings, es=None, es_comments=None): es.delete_all_but(settings.es_comments.alias, settings.es_comments.index) es_comments.add_alias(settings.es_comments.alias) - File(settings.param.last_run_time).write(unicode(convert.datetime2milli(current_run_time))) + File(settings.param.last_run_time).write(text_type(convert.datetime2milli(current_run_time))) except Exception as e: Log.error("Problem with main ETL loop", cause=e) finally: diff --git a/bzETL/parse_bug_history.py b/bzETL/parse_bug_history.py index 020c9d1..957db43 100644 --- a/bzETL/parse_bug_history.py +++ b/bzETL/parse_bug_history.py @@ -43,6 +43,8 @@ from __future__ import unicode_literals import math import re +from mo_future import text_type + from bzETL.transform_bugzilla import normalize, NUMERIC_FIELDS, MULTI_FIELDS, DIFF_FIELDS from jx_python import jx from mo_dots import inverse, coalesce, wrap, unwrap @@ -183,7 +185,7 @@ class BugHistoryParser(object): if modified_ts == None: Log.error("modified_ts can not be Null") - return unicode(bug_id) + "_" + unicode(modified_ts)[0:-3] + return text_type(bug_id) + "_" + text_type(modified_ts)[0:-3] def startNewBug(self, row_in): self.prevBugID = row_in.bug_id @@ -252,7 +254,7 @@ class BugHistoryParser(object): self.bugVersions.append(self.currActivity) self.bugVersionsMap[currActivityID] = self.currActivity - att = self.currBugAttachmentsMap[unicode(row_in.attach_id)] + att = self.currBugAttachmentsMap[text_type(row_in.attach_id)] if att == None: att = { "attach_id": row_in.attach_id, @@ -261,7 +263,7 @@ class BugHistoryParser(object): "modified_by": row_in.modified_by, "flags": FlatList() } - self.currBugAttachmentsMap[unicode(row_in.attach_id)] = att + self.currBugAttachmentsMap[text_type(row_in.attach_id)] = att att["created_ts"] = MIN([row_in.modified_ts, att["created_ts"]]) if row_in.field_name == "created_ts" and row_in.new_value == None: @@ -273,15 +275,15 @@ class BugHistoryParser(object): def processFlagsTableItem(self, row_in): flag = self.makeFlag(row_in.new_value, row_in.modified_ts, row_in.modified_by) if row_in.attach_id != None: - if self.currBugAttachmentsMap[unicode(row_in.attach_id)] == None: + if self.currBugAttachmentsMap[text_type(row_in.attach_id)] == None: Log.note("[Bug {{bug_id}}]: Unable to find attachment {{attach_id}} for bug_id {{bug_id}}", attach_id=row_in.attach_id, bug_id=self.currBugID ) else: - if self.currBugAttachmentsMap[unicode(row_in.attach_id)].flags == None: + if self.currBugAttachmentsMap[text_type(row_in.attach_id)].flags == None: Log.error("should never happen") - self.currBugAttachmentsMap[unicode(row_in.attach_id)].flags.append(flag) + self.currBugAttachmentsMap[text_type(row_in.attach_id)].flags.append(flag) else: self.currBugState.flags.append(flag) @@ -313,7 +315,7 @@ class BugHistoryParser(object): self.prevActivityID = currActivityID if row_in.attach_id != None: - attachment = self.currBugAttachmentsMap[unicode(row_in.attach_id)] + attachment = self.currBugAttachmentsMap[text_type(row_in.attach_id)] if attachment == None: #we are going backwards in time, no need to worry about these? maybe delete this change for public bugs Log.note( @@ -480,12 +482,12 @@ class BugHistoryParser(object): # Handle the special change record that signals the creation of the attachment if change.field_name == "attachment_added": # This change only exists when the attachment has been added to the map, so no missing case needed. - att = self.currBugAttachmentsMap[unicode(attach_id)] + att = self.currBugAttachmentsMap[text_type(attach_id)] self.currBugState.attachments.append(att) continue else: # Attachment change - target = self.currBugAttachmentsMap[unicode(attach_id)] + target = self.currBugAttachmentsMap[text_type(attach_id)] targetName = "attachment" if target == None: Log.note("[Bug {{bug_id}}]: Encountered a change to missing attachment: {{change}}", { diff --git a/bzETL/replicate.py b/bzETL/replicate.py index 6413e6a..4d8cc28 100644 --- a/bzETL/replicate.py +++ b/bzETL/replicate.py @@ -21,6 +21,9 @@ from __future__ import division from __future__ import absolute_import from datetime import datetime, timedelta + +from mo_future import text_type + from bzETL import transform_bugzilla from jx_python import jx from mo_collections.multiset import Multiset @@ -54,7 +57,7 @@ def extract_from_file(source_settings, destination): Log.note("add {{num}} records", num=len(d2)) destination.extend(d2) except Exception as e: - filename = "Error_" + unicode(g) + ".txt" + filename = "Error_" + text_type(g) + ".txt" File(filename).write(d) Log.warning("Can not convert block {{block}} (file={{host}})", { "block": g, @@ -239,7 +242,7 @@ def main(settings): replicate(source, data_sink, pending, last_updated) # RECORD LAST UPDATED - time_file.write(unicode(convert.datetime2milli(current_time))) + time_file.write(text_type(convert.datetime2milli(current_time))) def start(): diff --git a/bzETL/transform_bugzilla.py b/bzETL/transform_bugzilla.py index d149db0..b887157 100644 --- a/bzETL/transform_bugzilla.py +++ b/bzETL/transform_bugzilla.py @@ -14,6 +14,8 @@ from __future__ import unicode_literals import re from datetime import date +from mo_future import text_type + from jx_python import jx from mo_json import json2value, value2json from mo_logs import Log @@ -53,7 +55,7 @@ def rename_attachments(bug_version): #NORMALIZE BUG VERSION TO STANDARD FORM def normalize(bug, old_school=False): bug=bug.copy() - bug.id = unicode(bug.bug_id) + "_" + unicode(bug.modified_ts)[:-3] + bug.id = text_type(bug.bug_id) + "_" + text_type(bug.modified_ts)[:-3] bug._id = None #ENSURE STRUCTURES ARE SORTED @@ -106,9 +108,9 @@ def normalize(bug, old_school=False): try: if isinstance(v, date): bug[dateField] = convert.datetime2milli(v) - elif isinstance(v, (long, int, float)) and len(unicode(v)) in [12, 13]: + elif isinstance(v, (long, int, float)) and len(text_type(v)) in [12, 13]: bug[dateField] = v - elif not isinstance(v, basestring): + elif not isinstance(v, text_type): Log.error("situation not handled") elif DATE_PATTERN_STRICT.match(v): # Convert to "2012/01/01 00:00:00.000" diff --git a/resources/config/test_one_settings.json b/resources/config/test_one_settings.json index 4bbb4e2..41d4e44 100644 --- a/resources/config/test_one_settings.json +++ b/resources/config/test_one_settings.json @@ -36,7 +36,7 @@ "class": "logging.handlers.RotatingFileHandler", "filename": "./tests/results/logs/test_etl.log", "maxBytes": 10000000, - "backupCount": 200, + "backupCount": 20, "encoding": "utf8" }, { diff --git a/resources/json/bug_comments.json b/resources/schema/bug_comments.json similarity index 100% rename from resources/json/bug_comments.json rename to resources/schema/bug_comments.json diff --git a/resources/json/bug_version.json b/resources/schema/bug_version.json similarity index 100% rename from resources/json/bug_version.json rename to resources/schema/bug_version.json diff --git a/tests/resources/python/leak_check.py b/tests/resources/python/leak_check.py index 8a25600..f504e31 100644 --- a/tests/resources/python/leak_check.py +++ b/tests/resources/python/leak_check.py @@ -323,9 +323,9 @@ def main(): def error(results): content = [] for e in results.errors: - content.append("ERROR: "+unicode(e[0]._testMethodName)) + content.append("ERROR: "+text_type(e[0]._testMethodName)) for f in results.failures: - content.append("FAIL: "+unicode(f[0]._testMethodName)) + content.append("FAIL: "+text_type(f[0]._testMethodName)) Emailer(SETTINGS.email).send_email( text_data = "\n".join(content) diff --git a/tests/test_etl.py b/tests/test_etl.py index 452e9d1..c80a6e2 100644 --- a/tests/test_etl.py +++ b/tests/test_etl.py @@ -14,6 +14,8 @@ from __future__ import unicode_literals import unittest from datetime import datetime +from mo_future import text_type + from bzETL import extract_bugzilla, bz_etl from bzETL.bz_etl import etl, esfilter2sqlwhere from bzETL.extract_bugzilla import get_current_time, SCREENED_WHITEBOARD_BUG_GROUPS @@ -605,7 +607,8 @@ def compare_both(candidate, reference, settings, some_bugs): try: versions = jx.sort( get_all_bug_versions(candidate, bug_id, datetime.utcnow()), - "modified_ts") + "modified_ts" + ) # WE CAN NOT EXPECT candidate TO BE UP TO DATE BECAUSE IT IS USING AN OLD IMAGE if not versions: max_time = convert.milli2datetime(settings.bugzilla.expires_on) @@ -624,11 +627,11 @@ def compare_both(candidate, reference, settings, some_bugs): ref = value2json(ref_versions, pretty=True) if can != ref: found_errors = True - File(try_dir + unicode(bug_id) + ".txt").write(can) - File(ref_dir + unicode(bug_id) + ".txt").write(ref) + File(try_dir + text_type(bug_id) + ".txt").write(can) + File(ref_dir + text_type(bug_id) + ".txt").write(ref) except Exception as e: found_errors = True - Log.warning("Problem ETL'ing bug {{bug_id}}", {"bug_id": bug_id}, e) + Log.warning("Problem ETL'ing bug {{bug_id}}", bug_id=bug_id, cause=e) if found_errors: Log.error("DIFFERENCES FOUND (Differences shown in {{path}})", { diff --git a/tests/test_one_etl.py b/tests/test_one_etl.py index f71db18..c4a05b8 100644 --- a/tests/test_one_etl.py +++ b/tests/test_one_etl.py @@ -14,7 +14,7 @@ from __future__ import unicode_literals import unittest from mo_dots import Data -from mo_logs import startup, Log +from mo_logs import startup, Log, constants from bzETL import bz_etl, extract_bugzilla from bzETL.bz_etl import etl @@ -25,6 +25,7 @@ from pyLibrary.sql.mysql import all_db, MySQL from pyLibrary.testing import elasticsearch from pyLibrary.testing.elasticsearch import FakeES from test_etl import compare_both +from util.database import make_test_instance class TestOneETL(unittest.TestCase): @@ -34,9 +35,9 @@ class TestOneETL(unittest.TestCase): """ def setUp(self): self.settings = startup.read_settings(filename="test_one_settings.json") + constants.set(self.settings.constants) Log.start(self.settings.debug) - def tearDown(self): #CLOSE THE CACHED MySQL CONNECTIONS bz_etl.close_db_connections() @@ -55,6 +56,7 @@ class TestOneETL(unittest.TestCase): reference = FakeES(self.settings.reference) candidate = elasticsearch.make_test_instance("candidate", self.settings.elasticsearch) + make_test_instance(self.settings.bugzilla) with MySQL(self.settings.bugzilla) as db: #SETUP RUN PARAMETERS diff --git a/tests/util/compare_es.py b/tests/util/compare_es.py index f887c8d..7698c09 100644 --- a/tests/util/compare_es.py +++ b/tests/util/compare_es.py @@ -14,6 +14,10 @@ from __future__ import unicode_literals from datetime import datetime +from mo_logs import Log + +import jx_elasticsearch +import jx_python from bzETL import transform_bugzilla, parse_bug_history from jx_python import jx from mo_dots import coalesce, unwrap @@ -21,26 +25,29 @@ from mo_json import json2value, value2json from mo_math import Math from mo_times.timer import Timer from pyLibrary import convert +from pyLibrary.env import elasticsearch +from pyLibrary.testing.elasticsearch import FakeES def get_all_bug_versions(es, bug_id, max_time=None): max_time = coalesce(max_time, datetime.max) - data = es.search({ - "query": {"filtered": { - "query": {"match_all": {}}, - "filter": {"and": [ - {"term": {"bug_id": bug_id}}, - {"range": {"modified_ts": {"lte": convert.datetime2milli(max_time)}}} - ]} - }}, - "from": 0, - "size": 200000, - "sort": [] + if isinstance(es, elasticsearch.Index): + esq = jx_elasticsearch.new_instance(es.settings) + elif isinstance(es, FakeES): + esq = jx_python.wrap_from(es.data.values()) + else: + raise Log.error("unknown container") + + response = esq.query({ + "from": es.settings.alias, + "where": {"and": [ + {"eq": {"bug_id": bug_id}}, + {"lte": {"modified_ts": convert.datetime2milli(max_time)}} + ]}, + "format": "list" }) - - return jx.select(data.hits.hits, "_source") - + return response.data def get_private_bugs(es): """ diff --git a/vendor/jx_python/containers/list_usingPythonList.py b/vendor/jx_python/containers/list_usingPythonList.py index 94e2fdc..36eb328 100644 --- a/vendor/jx_python/containers/list_usingPythonList.py +++ b/vendor/jx_python/containers/list_usingPythonList.py @@ -63,9 +63,9 @@ class ListContainer(Container): def query(self, q): q = wrap(q) - frum = self + output = self if is_aggs(q): - frum = list_aggs(frum.data, q) + output = list_aggs(output.data, q) else: # SETOP try: if q.filter != None or q.esfilter != None: @@ -74,18 +74,24 @@ class ListContainer(Container): pass if q.where is not TRUE and not q.where is TRUE: - frum = frum.filter(q.where) + output = output.filter(q.where) if q.sort: - frum = frum.sort(q.sort) + output = output.sort(q.sort) if q.select: - frum = frum.select(q.select) + output = output.select(q.select) #TODO: ADD EXTRA COLUMN DESCRIPTIONS TO RESULTING SCHEMA for param in q.window: - frum.window(param) + output.window(param) - return frum + if q.format: + if q.format=="list": + return Data(data=output.data) + else: + Log.error("unknown format {{format}}", format=q.format) + else: + return output def update(self, command): """ diff --git a/vendor/pyLibrary/testing/elasticsearch.py b/vendor/pyLibrary/testing/elasticsearch.py index 069fe3d..cb142ec 100644 --- a/vendor/pyLibrary/testing/elasticsearch.py +++ b/vendor/pyLibrary/testing/elasticsearch.py @@ -51,6 +51,8 @@ def open_test_instance(name, settings): Log.error("unexpected", cause=e) es = cluster.create_index(limit_replicas=True, limit_replicas_warning=False, kwargs=settings) + es.delete_all_but_self() + es.add_alias(settings.index) return es