This commit is contained in:
Kyle Lahnakoski 2019-02-14 11:54:45 -05:00
Родитель e50929c41a
Коммит 0b2337857f
1 изменённых файлов: 25 добавлений и 18 удалений

43
vendor/jx_elasticsearch/meta_columns.py поставляемый
Просмотреть файл

@ -20,12 +20,15 @@ from mo_json.typed_encoder import unnest_path, untype_path, untyped
from mo_logs import Log
from mo_math import MAX
from mo_threads import Lock, Queue, Thread, Till, MAIN_THREAD
from mo_times import Timer
from mo_times.dates import Date
DEBUG = False
singlton = None
META_INDEX_NAME = "meta.columns"
META_TYPE_NAME = "column"
COLUMN_LOAD_PERIOD = 10
COLUMN_EXTRACT_PERIOD = 2*60
class ColumnList(Table, jx_base.Container):
@ -48,7 +51,7 @@ class ColumnList(Table, jx_base.Container):
self._db_load()
Thread.run(
"update " + META_INDEX_NAME,
self._push_to_es_worker,
self._synch_with_es,
parent_thread=MAIN_THREAD,
)
@ -119,26 +122,30 @@ class ColumnList(Table, jx_base.Container):
)
self._db_create()
def _push_to_es_worker(self, please_stop):
def _synch_with_es(self, please_stop):
try:
last_extract = Date.now()
while not please_stop:
now =Date.now()
try:
result = self.es_index.search(
{
"query": {
"range": {"last_updated.~n~": {"gt": self.last_load}}
},
"sort": ["es_index.~s~", "name.~s~", "es_column.~s~"],
"from": 0,
"size": 10000,
}
)
if (now-last_extract).seconds > COLUMN_EXTRACT_PERIOD:
result = self.es_index.search(
{
"query": {
"range": {"last_updated.~n~": {"gt": self.last_load}}
},
"sort": ["es_index.~s~", "name.~s~", "es_column.~s~"],
"from": 0,
"size": 10000,
}
)
last_extract = now
with self.locker:
for r in result.hits.hits._source:
c = doc_to_column(r)
self._add(c)
self.last_load = MAX((self.last_load, c.last_updated))
with self.locker:
for r in result.hits.hits._source:
c = doc_to_column(r)
self._add(c)
self.last_load = MAX((self.last_load, c.last_updated))
while not please_stop:
updates = self.todo.pop_all()
@ -154,7 +161,7 @@ class ColumnList(Table, jx_base.Container):
except Exception as e:
Log.warning("problem updating database", cause=e)
(Till(seconds=10) | please_stop).wait()
(Till(seconds=COLUMN_LOAD_PERIOD) | please_stop).wait()
finally:
Log.note("done")