tests running, but failing with multiple data problems
This commit is contained in:
Родитель
99653e0865
Коммит
1f9db13f0d
|
@ -11,6 +11,8 @@ from __future__ import absolute_import
|
|||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from mo_future import text_type
|
||||
|
||||
from bzETL import extract_bugzilla, alias_analysis, parse_bug_history
|
||||
from bzETL.extract_bugzilla import *
|
||||
from bzETL.parse_bug_history import BugHistoryParser
|
||||
|
@ -175,7 +177,7 @@ def setup_es(settings, db, es, es_comments):
|
|||
settings.es_comments.index = Cluster.proto_name(settings.es_comments.alias)
|
||||
es_comments = Cluster.create_index(kwargs=settings.es_comments, limit_replicas=True)
|
||||
|
||||
File(settings.param.first_run_time).write(unicode(convert.datetime2milli(current_run_time)))
|
||||
File(settings.param.first_run_time).write(text_type(convert.datetime2milli(current_run_time)))
|
||||
|
||||
return current_run_time, es, es_comments, last_run_time
|
||||
|
||||
|
@ -401,7 +403,7 @@ def main(settings, es=None, es_comments=None):
|
|||
es.delete_all_but(settings.es_comments.alias, settings.es_comments.index)
|
||||
es_comments.add_alias(settings.es_comments.alias)
|
||||
|
||||
File(settings.param.last_run_time).write(unicode(convert.datetime2milli(current_run_time)))
|
||||
File(settings.param.last_run_time).write(text_type(convert.datetime2milli(current_run_time)))
|
||||
except Exception as e:
|
||||
Log.error("Problem with main ETL loop", cause=e)
|
||||
finally:
|
||||
|
|
|
@ -43,6 +43,8 @@ from __future__ import unicode_literals
|
|||
import math
|
||||
import re
|
||||
|
||||
from mo_future import text_type
|
||||
|
||||
from bzETL.transform_bugzilla import normalize, NUMERIC_FIELDS, MULTI_FIELDS, DIFF_FIELDS
|
||||
from jx_python import jx
|
||||
from mo_dots import inverse, coalesce, wrap, unwrap
|
||||
|
@ -183,7 +185,7 @@ class BugHistoryParser(object):
|
|||
if modified_ts == None:
|
||||
Log.error("modified_ts can not be Null")
|
||||
|
||||
return unicode(bug_id) + "_" + unicode(modified_ts)[0:-3]
|
||||
return text_type(bug_id) + "_" + text_type(modified_ts)[0:-3]
|
||||
|
||||
def startNewBug(self, row_in):
|
||||
self.prevBugID = row_in.bug_id
|
||||
|
@ -252,7 +254,7 @@ class BugHistoryParser(object):
|
|||
self.bugVersions.append(self.currActivity)
|
||||
self.bugVersionsMap[currActivityID] = self.currActivity
|
||||
|
||||
att = self.currBugAttachmentsMap[unicode(row_in.attach_id)]
|
||||
att = self.currBugAttachmentsMap[text_type(row_in.attach_id)]
|
||||
if att == None:
|
||||
att = {
|
||||
"attach_id": row_in.attach_id,
|
||||
|
@ -261,7 +263,7 @@ class BugHistoryParser(object):
|
|||
"modified_by": row_in.modified_by,
|
||||
"flags": FlatList()
|
||||
}
|
||||
self.currBugAttachmentsMap[unicode(row_in.attach_id)] = att
|
||||
self.currBugAttachmentsMap[text_type(row_in.attach_id)] = att
|
||||
|
||||
att["created_ts"] = MIN([row_in.modified_ts, att["created_ts"]])
|
||||
if row_in.field_name == "created_ts" and row_in.new_value == None:
|
||||
|
@ -273,15 +275,15 @@ class BugHistoryParser(object):
|
|||
def processFlagsTableItem(self, row_in):
|
||||
flag = self.makeFlag(row_in.new_value, row_in.modified_ts, row_in.modified_by)
|
||||
if row_in.attach_id != None:
|
||||
if self.currBugAttachmentsMap[unicode(row_in.attach_id)] == None:
|
||||
if self.currBugAttachmentsMap[text_type(row_in.attach_id)] == None:
|
||||
Log.note("[Bug {{bug_id}}]: Unable to find attachment {{attach_id}} for bug_id {{bug_id}}",
|
||||
attach_id=row_in.attach_id,
|
||||
bug_id=self.currBugID
|
||||
)
|
||||
else:
|
||||
if self.currBugAttachmentsMap[unicode(row_in.attach_id)].flags == None:
|
||||
if self.currBugAttachmentsMap[text_type(row_in.attach_id)].flags == None:
|
||||
Log.error("should never happen")
|
||||
self.currBugAttachmentsMap[unicode(row_in.attach_id)].flags.append(flag)
|
||||
self.currBugAttachmentsMap[text_type(row_in.attach_id)].flags.append(flag)
|
||||
else:
|
||||
self.currBugState.flags.append(flag)
|
||||
|
||||
|
@ -313,7 +315,7 @@ class BugHistoryParser(object):
|
|||
self.prevActivityID = currActivityID
|
||||
|
||||
if row_in.attach_id != None:
|
||||
attachment = self.currBugAttachmentsMap[unicode(row_in.attach_id)]
|
||||
attachment = self.currBugAttachmentsMap[text_type(row_in.attach_id)]
|
||||
if attachment == None:
|
||||
#we are going backwards in time, no need to worry about these? maybe delete this change for public bugs
|
||||
Log.note(
|
||||
|
@ -480,12 +482,12 @@ class BugHistoryParser(object):
|
|||
# Handle the special change record that signals the creation of the attachment
|
||||
if change.field_name == "attachment_added":
|
||||
# This change only exists when the attachment has been added to the map, so no missing case needed.
|
||||
att = self.currBugAttachmentsMap[unicode(attach_id)]
|
||||
att = self.currBugAttachmentsMap[text_type(attach_id)]
|
||||
self.currBugState.attachments.append(att)
|
||||
continue
|
||||
else:
|
||||
# Attachment change
|
||||
target = self.currBugAttachmentsMap[unicode(attach_id)]
|
||||
target = self.currBugAttachmentsMap[text_type(attach_id)]
|
||||
targetName = "attachment"
|
||||
if target == None:
|
||||
Log.note("[Bug {{bug_id}}]: Encountered a change to missing attachment: {{change}}", {
|
||||
|
|
|
@ -21,6 +21,9 @@ from __future__ import division
|
|||
from __future__ import absolute_import
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from mo_future import text_type
|
||||
|
||||
from bzETL import transform_bugzilla
|
||||
from jx_python import jx
|
||||
from mo_collections.multiset import Multiset
|
||||
|
@ -54,7 +57,7 @@ def extract_from_file(source_settings, destination):
|
|||
Log.note("add {{num}} records", num=len(d2))
|
||||
destination.extend(d2)
|
||||
except Exception as e:
|
||||
filename = "Error_" + unicode(g) + ".txt"
|
||||
filename = "Error_" + text_type(g) + ".txt"
|
||||
File(filename).write(d)
|
||||
Log.warning("Can not convert block {{block}} (file={{host}})", {
|
||||
"block": g,
|
||||
|
@ -239,7 +242,7 @@ def main(settings):
|
|||
replicate(source, data_sink, pending, last_updated)
|
||||
|
||||
# RECORD LAST UPDATED
|
||||
time_file.write(unicode(convert.datetime2milli(current_time)))
|
||||
time_file.write(text_type(convert.datetime2milli(current_time)))
|
||||
|
||||
|
||||
def start():
|
||||
|
|
|
@ -14,6 +14,8 @@ from __future__ import unicode_literals
|
|||
import re
|
||||
from datetime import date
|
||||
|
||||
from mo_future import text_type
|
||||
|
||||
from jx_python import jx
|
||||
from mo_json import json2value, value2json
|
||||
from mo_logs import Log
|
||||
|
@ -53,7 +55,7 @@ def rename_attachments(bug_version):
|
|||
#NORMALIZE BUG VERSION TO STANDARD FORM
|
||||
def normalize(bug, old_school=False):
|
||||
bug=bug.copy()
|
||||
bug.id = unicode(bug.bug_id) + "_" + unicode(bug.modified_ts)[:-3]
|
||||
bug.id = text_type(bug.bug_id) + "_" + text_type(bug.modified_ts)[:-3]
|
||||
bug._id = None
|
||||
|
||||
#ENSURE STRUCTURES ARE SORTED
|
||||
|
@ -106,9 +108,9 @@ def normalize(bug, old_school=False):
|
|||
try:
|
||||
if isinstance(v, date):
|
||||
bug[dateField] = convert.datetime2milli(v)
|
||||
elif isinstance(v, (long, int, float)) and len(unicode(v)) in [12, 13]:
|
||||
elif isinstance(v, (long, int, float)) and len(text_type(v)) in [12, 13]:
|
||||
bug[dateField] = v
|
||||
elif not isinstance(v, basestring):
|
||||
elif not isinstance(v, text_type):
|
||||
Log.error("situation not handled")
|
||||
elif DATE_PATTERN_STRICT.match(v):
|
||||
# Convert to "2012/01/01 00:00:00.000"
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
"class": "logging.handlers.RotatingFileHandler",
|
||||
"filename": "./tests/results/logs/test_etl.log",
|
||||
"maxBytes": 10000000,
|
||||
"backupCount": 200,
|
||||
"backupCount": 20,
|
||||
"encoding": "utf8"
|
||||
},
|
||||
{
|
||||
|
|
|
@ -323,9 +323,9 @@ def main():
|
|||
def error(results):
|
||||
content = []
|
||||
for e in results.errors:
|
||||
content.append("ERROR: "+unicode(e[0]._testMethodName))
|
||||
content.append("ERROR: "+text_type(e[0]._testMethodName))
|
||||
for f in results.failures:
|
||||
content.append("FAIL: "+unicode(f[0]._testMethodName))
|
||||
content.append("FAIL: "+text_type(f[0]._testMethodName))
|
||||
|
||||
Emailer(SETTINGS.email).send_email(
|
||||
text_data = "\n".join(content)
|
||||
|
|
|
@ -14,6 +14,8 @@ from __future__ import unicode_literals
|
|||
import unittest
|
||||
from datetime import datetime
|
||||
|
||||
from mo_future import text_type
|
||||
|
||||
from bzETL import extract_bugzilla, bz_etl
|
||||
from bzETL.bz_etl import etl, esfilter2sqlwhere
|
||||
from bzETL.extract_bugzilla import get_current_time, SCREENED_WHITEBOARD_BUG_GROUPS
|
||||
|
@ -605,7 +607,8 @@ def compare_both(candidate, reference, settings, some_bugs):
|
|||
try:
|
||||
versions = jx.sort(
|
||||
get_all_bug_versions(candidate, bug_id, datetime.utcnow()),
|
||||
"modified_ts")
|
||||
"modified_ts"
|
||||
)
|
||||
# WE CAN NOT EXPECT candidate TO BE UP TO DATE BECAUSE IT IS USING AN OLD IMAGE
|
||||
if not versions:
|
||||
max_time = convert.milli2datetime(settings.bugzilla.expires_on)
|
||||
|
@ -624,11 +627,11 @@ def compare_both(candidate, reference, settings, some_bugs):
|
|||
ref = value2json(ref_versions, pretty=True)
|
||||
if can != ref:
|
||||
found_errors = True
|
||||
File(try_dir + unicode(bug_id) + ".txt").write(can)
|
||||
File(ref_dir + unicode(bug_id) + ".txt").write(ref)
|
||||
File(try_dir + text_type(bug_id) + ".txt").write(can)
|
||||
File(ref_dir + text_type(bug_id) + ".txt").write(ref)
|
||||
except Exception as e:
|
||||
found_errors = True
|
||||
Log.warning("Problem ETL'ing bug {{bug_id}}", {"bug_id": bug_id}, e)
|
||||
Log.warning("Problem ETL'ing bug {{bug_id}}", bug_id=bug_id, cause=e)
|
||||
|
||||
if found_errors:
|
||||
Log.error("DIFFERENCES FOUND (Differences shown in {{path}})", {
|
||||
|
|
|
@ -14,7 +14,7 @@ from __future__ import unicode_literals
|
|||
import unittest
|
||||
|
||||
from mo_dots import Data
|
||||
from mo_logs import startup, Log
|
||||
from mo_logs import startup, Log, constants
|
||||
|
||||
from bzETL import bz_etl, extract_bugzilla
|
||||
from bzETL.bz_etl import etl
|
||||
|
@ -25,6 +25,7 @@ from pyLibrary.sql.mysql import all_db, MySQL
|
|||
from pyLibrary.testing import elasticsearch
|
||||
from pyLibrary.testing.elasticsearch import FakeES
|
||||
from test_etl import compare_both
|
||||
from util.database import make_test_instance
|
||||
|
||||
|
||||
class TestOneETL(unittest.TestCase):
|
||||
|
@ -34,9 +35,9 @@ class TestOneETL(unittest.TestCase):
|
|||
"""
|
||||
def setUp(self):
|
||||
self.settings = startup.read_settings(filename="test_one_settings.json")
|
||||
constants.set(self.settings.constants)
|
||||
Log.start(self.settings.debug)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
#CLOSE THE CACHED MySQL CONNECTIONS
|
||||
bz_etl.close_db_connections()
|
||||
|
@ -55,6 +56,7 @@ class TestOneETL(unittest.TestCase):
|
|||
reference = FakeES(self.settings.reference)
|
||||
candidate = elasticsearch.make_test_instance("candidate", self.settings.elasticsearch)
|
||||
|
||||
make_test_instance(self.settings.bugzilla)
|
||||
with MySQL(self.settings.bugzilla) as db:
|
||||
|
||||
#SETUP RUN PARAMETERS
|
||||
|
|
|
@ -14,6 +14,10 @@ from __future__ import unicode_literals
|
|||
|
||||
from datetime import datetime
|
||||
|
||||
from mo_logs import Log
|
||||
|
||||
import jx_elasticsearch
|
||||
import jx_python
|
||||
from bzETL import transform_bugzilla, parse_bug_history
|
||||
from jx_python import jx
|
||||
from mo_dots import coalesce, unwrap
|
||||
|
@ -21,26 +25,29 @@ from mo_json import json2value, value2json
|
|||
from mo_math import Math
|
||||
from mo_times.timer import Timer
|
||||
from pyLibrary import convert
|
||||
from pyLibrary.env import elasticsearch
|
||||
from pyLibrary.testing.elasticsearch import FakeES
|
||||
|
||||
|
||||
def get_all_bug_versions(es, bug_id, max_time=None):
|
||||
max_time = coalesce(max_time, datetime.max)
|
||||
|
||||
data = es.search({
|
||||
"query": {"filtered": {
|
||||
"query": {"match_all": {}},
|
||||
"filter": {"and": [
|
||||
{"term": {"bug_id": bug_id}},
|
||||
{"range": {"modified_ts": {"lte": convert.datetime2milli(max_time)}}}
|
||||
]}
|
||||
}},
|
||||
"from": 0,
|
||||
"size": 200000,
|
||||
"sort": []
|
||||
if isinstance(es, elasticsearch.Index):
|
||||
esq = jx_elasticsearch.new_instance(es.settings)
|
||||
elif isinstance(es, FakeES):
|
||||
esq = jx_python.wrap_from(es.data.values())
|
||||
else:
|
||||
raise Log.error("unknown container")
|
||||
|
||||
response = esq.query({
|
||||
"from": es.settings.alias,
|
||||
"where": {"and": [
|
||||
{"eq": {"bug_id": bug_id}},
|
||||
{"lte": {"modified_ts": convert.datetime2milli(max_time)}}
|
||||
]},
|
||||
"format": "list"
|
||||
})
|
||||
|
||||
return jx.select(data.hits.hits, "_source")
|
||||
|
||||
return response.data
|
||||
|
||||
def get_private_bugs(es):
|
||||
"""
|
||||
|
|
|
@ -63,9 +63,9 @@ class ListContainer(Container):
|
|||
|
||||
def query(self, q):
|
||||
q = wrap(q)
|
||||
frum = self
|
||||
output = self
|
||||
if is_aggs(q):
|
||||
frum = list_aggs(frum.data, q)
|
||||
output = list_aggs(output.data, q)
|
||||
else: # SETOP
|
||||
try:
|
||||
if q.filter != None or q.esfilter != None:
|
||||
|
@ -74,18 +74,24 @@ class ListContainer(Container):
|
|||
pass
|
||||
|
||||
if q.where is not TRUE and not q.where is TRUE:
|
||||
frum = frum.filter(q.where)
|
||||
output = output.filter(q.where)
|
||||
|
||||
if q.sort:
|
||||
frum = frum.sort(q.sort)
|
||||
output = output.sort(q.sort)
|
||||
|
||||
if q.select:
|
||||
frum = frum.select(q.select)
|
||||
output = output.select(q.select)
|
||||
#TODO: ADD EXTRA COLUMN DESCRIPTIONS TO RESULTING SCHEMA
|
||||
for param in q.window:
|
||||
frum.window(param)
|
||||
output.window(param)
|
||||
|
||||
return frum
|
||||
if q.format:
|
||||
if q.format=="list":
|
||||
return Data(data=output.data)
|
||||
else:
|
||||
Log.error("unknown format {{format}}", format=q.format)
|
||||
else:
|
||||
return output
|
||||
|
||||
def update(self, command):
|
||||
"""
|
||||
|
|
|
@ -51,6 +51,8 @@ def open_test_instance(name, settings):
|
|||
Log.error("unexpected", cause=e)
|
||||
|
||||
es = cluster.create_index(limit_replicas=True, limit_replicas_warning=False, kwargs=settings)
|
||||
es.delete_all_but_self()
|
||||
es.add_alias(settings.index)
|
||||
return es
|
||||
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче