Merge pull request #733 from mozilla/properly_kill_bulk_queue

Modify workers to stop bulk queue on errors
This commit is contained in:
A Smith 2018-08-08 14:06:10 -05:00 коммит произвёл GitHub
Родитель fa2a60af4f 80e3cc78b9
Коммит 371158e5db
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 47 добавлений и 29 удалений

Просмотреть файл

@ -505,4 +505,10 @@ if __name__ == '__main__':
es = esConnect()
pluginList = registerPlugins()
main()
try:
main()
except Exception as e:
if options.esbulksize != 0:
es.finish_bulk()
raise

Просмотреть файл

@ -362,4 +362,9 @@ if __name__ == '__main__':
pluginList = registerPlugins()
main()
try:
main()
except Exception as e:
if options.esbulksize != 0:
es.finish_bulk()
raise

Просмотреть файл

@ -267,11 +267,8 @@ class taskConsumer(object):
time.sleep(options.ptinterval)
except KeyboardInterrupt:
sys.exit(1)
except ValueError as e:
logger.exception('Exception while handling message: %r' % e)
sys.exit(1)
def on_message(self, body, message):
# print("RECEIVED MESSAGE: %r" % (body, ))
@ -417,4 +414,9 @@ if __name__ == '__main__':
pluginList = registerPlugins()
main()
try:
main()
except Exception as e:
if options.esbulksize != 0:
es.finish_bulk()
raise

Просмотреть файл

@ -61,24 +61,20 @@ class taskConsumer(object):
self.taskQueue.set_message_class(RawMessage)
while True:
try:
records = self.taskQueue.get_messages(self.options.prefetch)
for msg in records:
msg_body = msg.get_body()
try:
# get_body() should be json
message_json = json.loads(msg_body)
self.on_message(message_json)
# delete message from queue
self.taskQueue.delete_message(msg)
except ValueError:
logger.error('Invalid message, not JSON <dropping message and continuing>: %r' % msg_body)
self.taskQueue.delete_message(msg)
continue
time.sleep(.1)
except Exception as e:
logger.exception(e)
sys.exit(1)
records = self.taskQueue.get_messages(self.options.prefetch)
for msg in records:
msg_body = msg.get_body()
try:
# get_body() should be json
message_json = json.loads(msg_body)
self.on_message(message_json)
# delete message from queue
self.taskQueue.delete_message(msg)
except ValueError:
logger.error('Invalid message, not JSON <dropping message and continuing>: %r' % msg_body)
self.taskQueue.delete_message(msg)
continue
time.sleep(.1)
def on_message(self, message):
try:
@ -241,4 +237,10 @@ if __name__ == '__main__':
# open ES connection globally so we don't waste time opening it per message
es = esConnect()
main()
try:
main()
except Exception as e:
if options.esbulksize != 0:
es.finish_bulk()
raise

Просмотреть файл

@ -218,11 +218,9 @@ class taskConsumer(object):
self.taskQueue.delete_message(msg)
time.sleep(.1)
except KeyboardInterrupt:
sys.exit(1)
except ValueError as e:
logger.exception('Exception while handling message: %r' % e)
sys.exit(1)
self.taskQueue.delete_message(msg)
def on_message(self, body, message):
# print("RECEIVED MESSAGE: %r" % (body, ))
@ -399,4 +397,9 @@ if __name__ == '__main__':
pluginList = registerPlugins()
main()
try:
main()
except Exception as e:
if options.esbulksize != 0:
es.finish_bulk()
raise