revert sqs changes due to kombu issues

This commit is contained in:
Jeff Bryner 2015-10-12 13:59:32 -07:00
Родитель dad1ca412e
Коммит af526d6e4e
1 изменённых файлов: 38 добавлений и 58 удалений

Просмотреть файл

@ -258,10 +258,11 @@ def esConnect(conn):
class taskConsumer(ConsumerMixin): class taskConsumer(ConsumerMixin):
def __init__(self, mqConnection, taskQueue, esConnection): def __init__(self, mqConnection, taskQueue, topicExchange, esConnection):
self.connection = mqConnection self.connection = mqConnection
self.esConnection = esConnection self.esConnection = esConnection
self.taskQueue = taskQueue self.taskQueue = taskQueue
self.topicExchange = topicExchange
self.mqproducer = self.connection.Producer(serializer='json') self.mqproducer = self.connection.Producer(serializer='json')
if hasUWSGI: if hasUWSGI:
self.muleid = uwsgi.mule_id() self.muleid = uwsgi.mule_id()
@ -379,7 +380,10 @@ class taskConsumer(ConsumerMixin):
except kombu.exceptions.MessageStateError: except kombu.exceptions.MessageStateError:
# state may be already set. # state may be already set.
return return
# post the dict (kombu serializes it to json) to the events topic queue
# using the ensure function to shortcut connection/queue drops/stalls, etc.
# ensurePublish = self.connection.ensure(self.mqproducer, self.mqproducer.publish, max_retries=10)
# ensurePublish(normalizedDict, exchange=self.topicExchange, routing_key='mozdef.event')
message.ack() message.ack()
except ValueError as e: except ValueError as e:
sys.stderr.write("esworker exception in events queue %r\n" % e) sys.stderr.write("esworker exception in events queue %r\n" % e)
@ -530,53 +534,39 @@ def sendEventToPlugins(anevent, metadata, pluginList):
def main(): def main():
# connect and declare the message queue/kombu objects. # connect and declare the message queue/kombu objects.
# only py-amqp supports ssl and doesn't recognize amqps
# so fix up the connection string accordingly
connString = 'amqp://{0}:{1}@{2}:{3}/{4}'.format(options.mquser, options.mqpassword, options.mqserver, options.mqport, options.mqvhost)
if options.mqprotocol == 'amqps':
mqSSL = True
else:
mqSSL = False
mqConn = Connection(connString, ssl=mqSSL)
# Task Exchange for events sent via http for us to normalize and post to elastic search
if options.mqack:
# conservative, store msgs to disk, ack each message
eventTaskExchange = Exchange(name=options.taskexchange, type='direct', durable=True, delivery_mode=2)
else:
# fast, transient delivery, store in memory only, auto-ack messages
eventTaskExchange = Exchange(name=options.taskexchange, type='direct', durable=True, delivery_mode=1)
eventTaskExchange(mqConn).declare()
# Queue for the exchange
if options.mqack:
eventTaskQueue = Queue(options.taskexchange, exchange=eventTaskExchange, routing_key=options.taskexchange, durable=True, no_ack=False)
else:
eventTaskQueue = Queue(options.taskexchange, exchange=eventTaskExchange, routing_key=options.taskexchange, durable=True, no_ack=True)
eventTaskQueue(mqConn).declare()
# what sort of message queue are we talking to? # topic exchange for anyone who wants to queue and listen for mozdef.event
if options.mqprotocol in ('amqp', 'amqps'): eventTopicExchange = Exchange(name=options.eventexchange, type='topic', durable=False, delivery_mode=1)
eventTopicExchange(mqConn).declare()
# only py-amqp supports ssl and doesn't recognize amqps
# so fix up the connection string accordingly
connString = 'amqp://{0}:{1}@{2}:{3}/{4}'.format(options.mquser, options.mqpassword, options.mqserver, options.mqport, options.mqvhost)
if options.mqprotocol == 'amqps':
mqSSL = True
else:
mqSSL = False
mqConn = Connection(connString, ssl=mqSSL)
# Task Exchange for events sent via http for us to normalize and post to elastic search
if options.mqack:
# conservative, store msgs to disk, ack each message
eventTaskExchange = Exchange(name=options.taskexchange, type='direct', durable=True, delivery_mode=2)
else:
# fast, transient delivery, store in memory only, auto-ack messages
eventTaskExchange = Exchange(name=options.taskexchange, type='direct', durable=True, delivery_mode=1)
eventTaskExchange(mqConn).declare()
# Queue for the exchange
if options.mqack:
eventTaskQueue = Queue(options.taskexchange, exchange=eventTaskExchange, routing_key=options.taskexchange, durable=True, no_ack=False)
else:
eventTaskQueue = Queue(options.taskexchange, exchange=eventTaskExchange, routing_key=options.taskexchange, durable=True, no_ack=True)
eventTaskQueue(mqConn).declare()
# topic exchange for anyone who wants to queue and listen for mozdef.event
# commented out to begin deprecation for this feature
# eventTopicExchange = Exchange(name=options.eventexchange, type='topic', durable=False, delivery_mode=1)
# eventTopicExchange(mqConn).declare()
if options.mqprotocol in ('sqs'):
# amazon SQS
connString = 'sqs://%s:%s@' % (urllib.quote(options.accesskey, safe=''), urllib.quote(options.secretkey, safe=''))
mqConn = Connection(connString, transport_options=dict(region=options.region))
# for sqs, set taskexchange to the sqs queue name.
eventTaskQueue = mqConn.SimpleQueue(options.taskexchange)
if hasUWSGI: if hasUWSGI:
sys.stdout.write("started as uwsgi mule {0}\n".format(uwsgi.mule_id())) sys.stdout.write("started as uwsgi mule {0}\n".format(uwsgi.mule_id()))
else: else:
sys.stdout.write('started without uwsgi\n') sys.stdout.write('started without uwsgi\n')
# consume our queue # consume our queue and publish on the topic exchange
taskConsumer(mqConn, eventTaskQueue, es).run() taskConsumer(mqConn, eventTaskQueue, eventTopicExchange, es).run()
def initConfig(): def initConfig():
@ -591,33 +581,23 @@ def initConfig():
options.esbulksize = getConfig('esbulksize', 0, options.configfile) options.esbulksize = getConfig('esbulksize', 0, options.configfile)
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile) options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
# set to either amqp or amqps for rabbitmq without/with ssl # message queue options
# set to sqs for Amazon
options.mqprotocol = getConfig('mqprotocol', 'amqp', options.configfile)
# rabbit message queue options
options.mqserver = getConfig('mqserver', 'localhost', options.configfile) options.mqserver = getConfig('mqserver', 'localhost', options.configfile)
options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile) options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile)
options.eventexchange = getConfig('eventexchange', 'events', options.configfile) options.eventexchange = getConfig('eventexchange', 'events', options.configfile)
# rabbit: how many messages to ask for at once from the message queue # how many messages to ask for at once from the message queue
options.prefetch = getConfig('prefetch', 50, options.configfile) options.prefetch = getConfig('prefetch', 50, options.configfile)
# rabbit: user creds
options.mquser = getConfig('mquser', 'guest', options.configfile) options.mquser = getConfig('mquser', 'guest', options.configfile)
options.mqpassword = getConfig('mqpassword', 'guest', options.configfile) options.mqpassword = getConfig('mqpassword', 'guest', options.configfile)
# rabbit: port/vhost
options.mqport = getConfig('mqport', 5672, options.configfile) options.mqport = getConfig('mqport', 5672, options.configfile)
options.mqvhost = getConfig('mqvhost', '/', options.configfile) options.mqvhost = getConfig('mqvhost', '/', options.configfile)
# set to either amqp or amqps for ssl
# rabbit: run with message acking? options.mqprotocol = getConfig('mqprotocol', 'amqp', options.configfile)
# run with message acking?
# also toggles transient/persistant delivery (messages in memory only or stored on disk) # also toggles transient/persistant delivery (messages in memory only or stored on disk)
# ack=True sets persistant delivery, False sets transient delivery # ack=True sets persistant delivery, False sets transient delivery
options.mqack = getConfig('mqack', True, options.configfile) options.mqack = getConfig('mqack', True, options.configfile)
# aws options
options.accesskey = getConfig('accesskey', '', options.configfile)
options.secretkey = getConfig('secretkey', '', options.configfile)
options.region = getConfig('region', 'us-west-1', options.configfile)
# plugin options # plugin options
# secs to pass before checking for new/updated plugins # secs to pass before checking for new/updated plugins
# seems to cause memory leaks.. # seems to cause memory leaks..