зеркало из https://github.com/mozilla/MozDef.git
Add SNS SQS mq worker
This commit is contained in:
Родитель
fc3e0e397e
Коммит
9e734175e7
|
@ -0,0 +1,23 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
# Copyright (c) 2017 Mozilla Corporation
|
||||
#
|
||||
# Contributors:
|
||||
# Guillaume Destuynder kang@mozilla.com
|
||||
# Brandon Myers bmyers@mozilla.com
|
||||
|
||||
|
||||
class DotDict(dict):
|
||||
'''dict.item notation for dict()'s'''
|
||||
__getattr__ = dict.__getitem__
|
||||
__setattr__ = dict.__setitem__
|
||||
__delattr__ = dict.__delitem__
|
||||
|
||||
def __init__(self, dct={}):
|
||||
for key, value in dct.items():
|
||||
if hasattr(value, 'keys'):
|
||||
value = DotDict(value)
|
||||
self[key] = value
|
|
@ -0,0 +1,230 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
# Copyright (c) 2017 Mozilla Corporation
|
||||
#
|
||||
# Contributors:
|
||||
# Brandon Myers bmyers@mozilla.com
|
||||
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
import sys
|
||||
import socket
|
||||
import time
|
||||
from configlib import getConfig, OptionParser
|
||||
from datetime import datetime, timedelta
|
||||
import pytz
|
||||
|
||||
import boto.sqs
|
||||
from boto.sqs.message import RawMessage
|
||||
import kombu
|
||||
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../lib'))
|
||||
from utilities.toUTC import toUTC
|
||||
from elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
|
||||
from lib.plugins import sendEventToPlugins, checkPlugins
|
||||
|
||||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
hasUWSGI = True
|
||||
except ImportError as e:
|
||||
hasUWSGI = False
|
||||
|
||||
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(object):
|
||||
|
||||
def __init__(self, mqConnection, taskQueue, esConnection, options):
|
||||
self.connection = mqConnection
|
||||
self.esConnection = esConnection
|
||||
self.taskQueue = taskQueue
|
||||
|
||||
self.pluginList = list()
|
||||
lastPluginCheck = datetime.now() - timedelta(minutes=60)
|
||||
self.pluginList, lastPluginCheck = checkPlugins(self.pluginList, lastPluginCheck, options.plugincheckfrequency)
|
||||
|
||||
self.options = options
|
||||
|
||||
if self.options.esbulksize != 0:
|
||||
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
|
||||
# to prevent events from sticking around an idle worker
|
||||
self.esConnection.start_bulk_timer()
|
||||
|
||||
def run(self):
|
||||
self.taskQueue.set_message_class(RawMessage)
|
||||
|
||||
while True:
|
||||
try:
|
||||
records = self.taskQueue.get_messages(self.options.prefetch)
|
||||
for msg in records:
|
||||
msg_body = msg.get_body()
|
||||
try:
|
||||
# get_body() should be json
|
||||
message_json = json.loads(msg_body)
|
||||
event = self.on_message(message_json)
|
||||
# delete message from queue
|
||||
self.taskQueue.delete_message(msg)
|
||||
except ValueError:
|
||||
sys.stdout.write('Invalid message, not JSON <dropping message and continuing>: %r\n' % msg_body)
|
||||
self.taskQueue.delete_message(msg)
|
||||
continue
|
||||
time.sleep(.1)
|
||||
except ValueError as e:
|
||||
sys.stdout.write('Exception while handling message: %r' % e)
|
||||
sys.exit(1)
|
||||
|
||||
def on_message(self, message):
|
||||
# default elastic search metadata for an event
|
||||
metadata = {
|
||||
'index': 'events',
|
||||
'doc_type': 'event',
|
||||
'id': None
|
||||
}
|
||||
event = {}
|
||||
|
||||
event['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
event['mozdefhostname'] = self.options.mozdefhostname
|
||||
|
||||
if 'tags' in event:
|
||||
event['tags'].extend([self.options.taskexchange])
|
||||
else:
|
||||
event['tags'] = [self.options.taskexchange]
|
||||
|
||||
event['severity'] = 'INFO'
|
||||
|
||||
for message_key, message_value in message.iteritems():
|
||||
if 'Message' == message_key:
|
||||
try:
|
||||
message_json = json.loads(message_value)
|
||||
for inside_message_key, inside_message_value in message_json.iteritems():
|
||||
if inside_message_key in ('processid'):
|
||||
event['processid'] = inside_message_value
|
||||
elif inside_message_key in ('pname', 'pid'):
|
||||
event['processname'] = inside_message_value
|
||||
elif inside_message_key in ('hostname'):
|
||||
event['hostname'] = inside_message_value
|
||||
elif inside_message_key in ('time', 'timestamp'):
|
||||
event['timestamp'] = toUTC(inside_message_value).isoformat()
|
||||
event['utctimestamp'] = toUTC(event['timestamp']).astimezone(pytz.utc).isoformat()
|
||||
elif inside_message_key in ('type'):
|
||||
event['category'] = inside_message_value
|
||||
elif inside_message_key in ('payload', 'message'):
|
||||
event['summary'] = inside_message_value
|
||||
else:
|
||||
if 'details' not in event:
|
||||
event['details'] = {}
|
||||
event['details'][inside_message_key] = inside_message_value
|
||||
except ValueError:
|
||||
event['summary'] = message_value
|
||||
(event, metadata) = sendEventToPlugins(event, metadata, self.pluginList)
|
||||
self.save_event(event, metadata)
|
||||
|
||||
return event
|
||||
|
||||
def save_event(self, event, metadata):
|
||||
try:
|
||||
# drop the message if a plug in set it to None
|
||||
# signaling a discard
|
||||
if event is None:
|
||||
return
|
||||
|
||||
# make a json version for posting to elastic search
|
||||
jbody = json.JSONEncoder().encode(event)
|
||||
|
||||
try:
|
||||
bulk = False
|
||||
if self.options.esbulksize != 0:
|
||||
bulk = True
|
||||
|
||||
self.esConnection.save_object(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
return
|
||||
except ElasticsearchException as e:
|
||||
sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
return
|
||||
except ValueError as e:
|
||||
sys.stderr.write("esworker.sqs exception in events queue %r\n" % e)
|
||||
|
||||
|
||||
def main():
|
||||
if hasUWSGI:
|
||||
sys.stdout.write("started as uwsgi mule {0}\n".format(uwsgi.mule_id()))
|
||||
else:
|
||||
sys.stdout.write('started without uwsgi\n')
|
||||
|
||||
if options.mqprotocol not in ('sqs'):
|
||||
sys.stdout.write('Can only process SQS queues, terminating\n')
|
||||
sys.exit(1)
|
||||
|
||||
mqConn = boto.sqs.connect_to_region(options.region, aws_access_key_id=options.accesskey, aws_secret_access_key=options.secretkey)
|
||||
# attach to the queue
|
||||
eventTaskQueue = mqConn.get_queue(options.taskexchange)
|
||||
|
||||
# consume our queue
|
||||
taskConsumer(mqConn, eventTaskQueue, es, options).run()
|
||||
|
||||
|
||||
def initConfig():
|
||||
# capture the hostname
|
||||
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
|
||||
|
||||
# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
|
||||
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
|
||||
options.esbulksize = getConfig('esbulksize', 0, options.configfile)
|
||||
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
|
||||
|
||||
# set to sqs for Amazon
|
||||
options.mqprotocol = getConfig('mqprotocol', 'sqs', options.configfile)
|
||||
|
||||
# rabbit message queue options
|
||||
options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile)
|
||||
# rabbit: how many messages to ask for at once from the message queue
|
||||
options.prefetch = getConfig('prefetch', 10, options.configfile)
|
||||
|
||||
# aws options
|
||||
options.accesskey = getConfig('accesskey', '', options.configfile)
|
||||
options.secretkey = getConfig('secretkey', '', options.configfile)
|
||||
options.region = getConfig('region', 'us-west-1', options.configfile)
|
||||
|
||||
# plugin options
|
||||
# secs to pass before checking for new/updated plugins
|
||||
# seems to cause memory leaks..
|
||||
# regular updates are disabled for now,
|
||||
# though we set the frequency anyway.
|
||||
options.plugincheckfrequency = getConfig('plugincheckfrequency', 120, options.configfile)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# configure ourselves
|
||||
parser = OptionParser()
|
||||
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
|
||||
(options, args) = parser.parse_args()
|
||||
initConfig()
|
||||
|
||||
# open ES connection globally so we don't waste time opening it per message
|
||||
es = esConnect()
|
||||
main()
|
|
@ -0,0 +1,85 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
# Copyright (c) 2017 Mozilla Corporation
|
||||
#
|
||||
# Contributors:
|
||||
# Brandon Myers bmyers@mozilla.com
|
||||
|
||||
|
||||
import sys
|
||||
import os
|
||||
from operator import itemgetter
|
||||
from datetime import datetime
|
||||
import pynsive
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../lib'))
|
||||
from utilities.dict2List import dict2List
|
||||
|
||||
|
||||
def sendEventToPlugins(anevent, metadata, pluginList):
|
||||
'''compare the event to the plugin registrations.
|
||||
plugins register with a list of keys or values
|
||||
or values they want to match on
|
||||
this function compares that registration list
|
||||
to the current event and sends the event to plugins
|
||||
in order
|
||||
'''
|
||||
if not isinstance(anevent, dict):
|
||||
raise TypeError('event is type {0}, should be a dict'.format(type(anevent)))
|
||||
|
||||
# expecting tuple of module,criteria,priority in pluginList
|
||||
# sort the plugin list by priority
|
||||
for plugin in sorted(pluginList, key=itemgetter(2), reverse=False):
|
||||
# assume we don't run this event through the plugin
|
||||
send = False
|
||||
if isinstance(plugin[1], list):
|
||||
try:
|
||||
if (set(plugin[1]).intersection([e for e in dict2List(anevent)])):
|
||||
send = True
|
||||
except TypeError:
|
||||
sys.stderr.write('TypeError on set intersection for dict {0}'.format(anevent))
|
||||
return (anevent, metadata)
|
||||
if send:
|
||||
(anevent, metadata) = plugin[0].onMessage(anevent, metadata)
|
||||
if anevent is None:
|
||||
# plug-in is signalling to drop this message
|
||||
# early exit
|
||||
return (anevent, metadata)
|
||||
|
||||
return (anevent, metadata)
|
||||
|
||||
|
||||
def registerPlugins():
|
||||
pluginList = list() # tuple of module,registration dict,priority
|
||||
if os.path.exists('plugins'):
|
||||
modules = pynsive.list_modules('plugins')
|
||||
for mname in modules:
|
||||
module = pynsive.import_module(mname)
|
||||
reload(module)
|
||||
if not module:
|
||||
raise ImportError('Unable to load module {}'.format(mname))
|
||||
else:
|
||||
if 'message' in dir(module):
|
||||
mclass = module.message()
|
||||
mreg = mclass.registration
|
||||
if 'priority' in dir(mclass):
|
||||
mpriority = mclass.priority
|
||||
else:
|
||||
mpriority = 100
|
||||
if isinstance(mreg, list):
|
||||
print('[*] plugin {0} registered to receive messages with {1}'.format(mname, mreg))
|
||||
pluginList.append((mclass, mreg, mpriority))
|
||||
return pluginList
|
||||
|
||||
|
||||
def checkPlugins(pluginList, lastPluginCheck, checkFrequency):
|
||||
if abs(datetime.now() - lastPluginCheck).seconds > checkFrequency:
|
||||
# print('[*] checking plugins')
|
||||
lastPluginCheck = datetime.now()
|
||||
pluginList = registerPlugins()
|
||||
return pluginList, lastPluginCheck
|
||||
else:
|
||||
return pluginList, lastPluginCheck
|
|
@ -0,0 +1,47 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
# Copyright (c) 2017 Mozilla Corporation
|
||||
#
|
||||
# Contributors:
|
||||
# Brandon Myers bmyers@mozilla.com
|
||||
|
||||
|
||||
import pytest
|
||||
|
||||
import os
|
||||
import sys
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../../../lib"))
|
||||
from utilities.dot_dict import DotDict
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
|
||||
from unit_test_suite import UnitTestSuite
|
||||
|
||||
|
||||
class TestDotDict(UnitTestSuite):
|
||||
def test_blank_init(self):
|
||||
dct = DotDict()
|
||||
assert dct.keys() == []
|
||||
|
||||
def test_nonexisting_key(self):
|
||||
dct = DotDict()
|
||||
with pytest.raises(KeyError):
|
||||
dct.abcd
|
||||
|
||||
def test_basic_init(self):
|
||||
dct = DotDict({'key1': 'value1', 'key2': 'value2'})
|
||||
assert sorted(dct.keys()) == sorted(['key1', 'key2'])
|
||||
assert dct.key1 == 'value1'
|
||||
assert dct.key2 == 'value2'
|
||||
|
||||
def test_complex_init(self):
|
||||
original_dct = {
|
||||
'details': {
|
||||
'key1': 'value1'
|
||||
}
|
||||
}
|
||||
dct = DotDict(original_dct)
|
||||
assert dct.details == {'key1': 'value1'}
|
||||
assert dct.details.key1 == 'value1'
|
|
@ -0,0 +1,79 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
# Copyright (c) 2017 Mozilla Corporation
|
||||
#
|
||||
# Contributors:
|
||||
# Brandon Myers bmyers@mozilla.com
|
||||
|
||||
|
||||
import os
|
||||
import sys
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../../mq"))
|
||||
from esworker_sns_sqs import taskConsumer
|
||||
from lib.plugins import checkPlugins
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../../lib"))
|
||||
from utilities.dot_dict import DotDict
|
||||
from query_models import SearchQuery, ExistsMatch
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
|
||||
from unit_test_suite import UnitTestSuite
|
||||
|
||||
|
||||
class TestEsworkerSNSSQS(UnitTestSuite):
|
||||
def setup(self):
|
||||
super(TestEsworkerSNSSQS, self).setup()
|
||||
mq_conn = 'abc'
|
||||
task_queue = 'example-logs-mozdef'
|
||||
es_connection = self.es_client
|
||||
options = DotDict(
|
||||
{
|
||||
"esbulksize": 0,
|
||||
"mozdefhostname": "unittest.hostname",
|
||||
"taskexchange": task_queue,
|
||||
'plugincheckfrequency': 120,
|
||||
}
|
||||
)
|
||||
self.consumer = taskConsumer(mq_conn, task_queue, es_connection, options)
|
||||
|
||||
def search_and_verify_event(self, expected_event):
|
||||
self.es_client.flush('events')
|
||||
search_query = SearchQuery(minutes=5)
|
||||
search_query.add_must(ExistsMatch('tags'))
|
||||
results = search_query.execute(self.es_client)
|
||||
assert len(results['hits']) == 1
|
||||
saved_event = results['hits'][0]['_source']
|
||||
self.verify_event(saved_event, expected_event)
|
||||
|
||||
def test_event1(self):
|
||||
event = {
|
||||
"Type": "Notification",
|
||||
"MessageId": "abcdefg",
|
||||
"TopicArn": "arn:aws:sns:us-west-2:123456789:example-logs-mozdef",
|
||||
"Subject": "Fluentd-Notification",
|
||||
"Message": "{\"time\":\"2017-05-25 07:14:15 +0000\",\"timestamp\":\"2017-05-25T07:14:15+00:00\",\"hostname\":\"abcdefghostname\",\"pname\":\"dhclient\",\"processid\":\"\",\"type\":\"syslog\",\"logger\":\"systemslogs\",\"payload\":\"DHCPREQUEST of 1.2.3.4 on eth0 to 5.6.7.8 port 67 (xid=0x123456)\"}",
|
||||
"Timestamp": "2017-05-25T07:14:16.103Z",
|
||||
"SignatureVersion": "1",
|
||||
"Signature": "examplesignatureabcd",
|
||||
"SigningCertURL": "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-12345.pem",
|
||||
"UnsubscribeURL": "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789:example-logs-mozdef:adsf0laser013"
|
||||
}
|
||||
self.consumer.on_message(event)
|
||||
expected_event = {
|
||||
u'category': u'syslog',
|
||||
u'details': {u'logger': u'systemslogs'},
|
||||
u'hostname': u'abcdefghostname',
|
||||
u'mozdefhostname': u'unittest.hostname',
|
||||
u'processid': u'',
|
||||
u'processname': u'dhclient',
|
||||
u'receivedtimestamp': u'2017-05-26T17:47:17.813876+00:00',
|
||||
u'severity': u'INFO',
|
||||
u'summary': u'DHCPREQUEST of 1.2.3.4 on eth0 to 5.6.7.8 port 67 (xid=0x123456)',
|
||||
u'tags': [u'example-logs-mozdef'],
|
||||
u'timestamp': u'2017-05-25T07:14:15+00:00',
|
||||
u'utctimestamp': u'2017-05-25T07:14:15+00:00'
|
||||
}
|
||||
self.search_and_verify_event(expected_event)
|
|
@ -1,3 +1,14 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
# Copyright (c) 2017 Mozilla Corporation
|
||||
#
|
||||
# Contributors:
|
||||
# Brandon Myers bmyers@mozilla.com
|
||||
|
||||
|
||||
import os
|
||||
import sys
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../alerts/lib"))
|
||||
|
@ -86,6 +97,14 @@ class UnitTestSuite(object):
|
|||
|
||||
return event
|
||||
|
||||
def verify_event(self, event, expected_event):
|
||||
assert sorted(event.keys()) == sorted(expected_event.keys())
|
||||
for key, value in expected_event.iteritems():
|
||||
if key == 'receivedtimestamp':
|
||||
assert type(event[key]) == unicode
|
||||
else:
|
||||
assert event[key] == value, 'Incorrect match for {0}, expected: {1}'.format(key, value)
|
||||
|
||||
@staticmethod
|
||||
def current_timestamp():
|
||||
return toUTC(datetime.now()).isoformat()
|
||||
|
|
Загрузка…
Ссылка в новой задаче