This commit is contained in:
Kyle Lahnakoski 2018-07-17 08:16:36 -04:00
Родитель 756cbf150b
Коммит da2d32e043
6 изменённых файлов: 42 добавлений и 34 удалений

Просмотреть файл

@ -288,7 +288,7 @@ def full_etl(resume_from_last_run, param, db, esq, esq_comments, output_queue, k
end = coalesce(param.end, db.query("SELECT max(bug_id)+1 bug_id FROM bugs")[0].bug_id) end = coalesce(param.end, db.query("SELECT max(bug_id)+1 bug_id FROM bugs")[0].bug_id)
start = coalesce(param.start, 0) start = coalesce(param.start, 0)
if resume_from_last_run: if resume_from_last_run:
end = coalesce(param.end, Math.ceiling(get_max_bug_id(esq), param.increment), 0) end = coalesce(param.end, Math.ceiling(get_min_bug_id(esq), param.increment), 0)
############################################################# #############################################################
## MAIN ETL LOOP ## MAIN ETL LOOP
@ -449,7 +449,7 @@ def get_bug_ids(esq, filter):
) )
def get_max_bug_id(esq): def get_min_bug_id(esq):
try: try:
result = esq.query({"from":esq.name, "select": {"value": "bug_id", "aggregate": "max"}, "format": "cube"}) result = esq.query({"from":esq.name, "select": {"value": "bug_id", "aggregate": "max"}, "format": "cube"})
return result.data['bug_id'] return result.data['bug_id']

Просмотреть файл

@ -48,6 +48,8 @@
"timeout": 60 "timeout": 60
}, },
"constants":{ "constants":{
"jx_elasticsearch.meta.DEBUG": true,
"jx_elasticsearch.meta.ENABLE_META_SCAN": false,
"pyLibrary.sql.mysql.EXECUTE_TIMEOUT": 0, "pyLibrary.sql.mysql.EXECUTE_TIMEOUT": 0,
"pyLibrary.env.http.default_headers": {"Referer": "https://github.com/mozilla/Bugzilla-ETL"}, "pyLibrary.env.http.default_headers": {"Referer": "https://github.com/mozilla/Bugzilla-ETL"},
}, },

Просмотреть файл

@ -50,6 +50,8 @@
"timeout": 60 "timeout": 60
}, },
"constants":{ "constants":{
"jx_elasticsearch.meta.DEBUG": true,
"jx_elasticsearch.meta.ENABLE_META_SCAN": false,
"pyLibrary.sql.mysql.EXECUTE_TIMEOUT": 0, "pyLibrary.sql.mysql.EXECUTE_TIMEOUT": 0,
"pyLibrary.env.http.default_headers": {"Referer": "https://github.com/mozilla/Bugzilla-ETL"} "pyLibrary.env.http.default_headers": {"Referer": "https://github.com/mozilla/Bugzilla-ETL"}
}, },

40
vendor/jx_elasticsearch/meta.py поставляемый
Просмотреть файл

@ -360,7 +360,7 @@ class ElasticsearchMetadata(Namespace):
}) })
return return
elif column.es_type in elasticsearch.ES_NUMERIC_TYPES and cardinality > 30: elif column.es_type in elasticsearch.ES_NUMERIC_TYPES and cardinality > 30:
DEBUG and Log.note("{{field}} has {{num}} parts", field=column.es_index, num=cardinality) DEBUG and Log.note("{{table}}.{{field}} has {{num}} parts", table=column.es_index, field=column.es_column, num=cardinality)
self.meta.columns.update({ self.meta.columns.update({
"set": { "set": {
"count": count, "count": count,
@ -458,23 +458,23 @@ class ElasticsearchMetadata(Namespace):
if column is THREAD_STOP: if column is THREAD_STOP:
continue continue
DEBUG and Log.note("update {{table}}.{{column}}", table=column.es_index, column=column.es_column) with Timer("update {{table}}.{{column}}", param={"table":column.es_index, "column":column.es_column}, debug=DEBUG):
if column.es_index in self.index_does_not_exist: if column.es_index in self.index_does_not_exist:
self.meta.columns.update({ self.meta.columns.update({
"clear": ".", "clear": ".",
"where": {"eq": {"es_index": column.es_index}} "where": {"eq": {"es_index": column.es_index}}
}) })
continue continue
if column.jx_type in STRUCT or column.es_column.endswith("." + EXISTS_TYPE): if column.jx_type in STRUCT or column.es_column.endswith("." + EXISTS_TYPE):
column.last_updated = Date.now() column.last_updated = Date.now()
continue continue
elif column.last_updated >= Date.now()-TOO_OLD: elif column.last_updated >= Date.now()-TOO_OLD:
continue continue
try: try:
self._update_cardinality(column) self._update_cardinality(column)
(DEBUG and not column.es_index.startswith(TEST_TABLE_PREFIX)) and Log.note("updated {{column.name}}", column=column) (DEBUG and not column.es_index.startswith(TEST_TABLE_PREFIX)) and Log.note("updated {{column.name}}", column=column)
except Exception as e: except Exception as e:
Log.warning("problem getting cardinality for {{column.name}}", column=column, cause=e) Log.warning("problem getting cardinality for {{column.name}}", column=column, cause=e)
except Exception as e: except Exception as e:
Log.warning("problem in cardinality monitor", cause=e) Log.warning("problem in cardinality monitor", cause=e)
@ -545,8 +545,8 @@ class Snowflake(object):
""" """
RETURN ALL COLUMNS FROM ORIGIN OF FACT TABLE RETURN ALL COLUMNS FROM ORIGIN OF FACT TABLE
""" """
if any("verify_no_private_attachments" in t['method'] for t in extract_stack()):
pass
return self.namespace.get_columns(literal_field(self.alias)) return self.namespace.get_columns(literal_field(self.alias))

4
vendor/jx_python/meta.py поставляемый
Просмотреть файл

@ -116,13 +116,13 @@ class ColumnList(Table):
return [ return [
column column
for t, cs in self.data.items() for t, cs in self.data.items()
for c, css in cs.items() for _, css in cs.items()
for column in css for column in css
] ]
def __iter__(self): def __iter__(self):
self._update_meta()
with self.locker: with self.locker:
self._update_meta()
return iter(self._all_columns()) return iter(self._all_columns())
def __len__(self): def __len__(self):

24
vendor/mo_logs/strings.py поставляемый
Просмотреть файл

@ -484,16 +484,20 @@ _SNIP = "...<snip>..."
@formatter @formatter
def limit(value, length): def limit(value, length):
# LIMIT THE STRING value TO GIVEN LENGTH, CHOPPING OUT THE MIDDLE IF REQUIRED try:
if len(value) <= length: # LIMIT THE STRING value TO GIVEN LENGTH, CHOPPING OUT THE MIDDLE IF REQUIRED
return value if len(value) <= length:
elif length < len(_SNIP) * 2: return value
return value[0:length] elif length < len(_SNIP) * 2:
else: return value[0:length]
lhs = int(round((length - len(_SNIP)) / 2, 0)) else:
rhs = length - len(_SNIP) - lhs lhs = int(round((length - len(_SNIP)) / 2, 0))
return value[:lhs] + _SNIP + value[-rhs:] rhs = length - len(_SNIP) - lhs
return value[:lhs] + _SNIP + value[-rhs:]
except Exception as e:
if not _Duration:
_late_import()
_Log.error("Not expected", cause=e)
@formatter @formatter
def split(value, sep="\n"): def split(value, sep="\n"):