Merge pull request #554 from mozilla/improve_cloudtrail_worker

Improve cloudtrail worker
This commit is contained in:
A Smith 2017-11-27 14:41:21 -08:00 коммит произвёл GitHub
Родитель b09c700cb9 4190ef43d6
Коммит 501819cfb5
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 370 добавлений и 146 удалений

Просмотреть файл

@ -18,10 +18,10 @@ class AlertCloudtrailLoggingDisabled(AlertTask):
search_query.add_must([
TermMatch('_type', 'cloudtrail'),
TermMatch('eventName', 'StopLogging'),
TermMatch('details.eventName', 'StopLogging'),
])
search_query.add_must_not(TermMatch('errorCode', 'AccessDenied'))
search_query.add_must_not(TermMatch('details.errorCode', 'AccessDenied'))
self.filtersManual(search_query)
self.searchEventsSimple()
@ -32,6 +32,6 @@ class AlertCloudtrailLoggingDisabled(AlertTask):
tags = ['cloudtrail', 'aws', 'cloudtrailpagerduty']
severity = 'CRITICAL'
summary = 'Cloudtrail Logging Disabled: ' + event['_source']['requestParameters']['name']
summary = 'Cloudtrail Logging Disabled: ' + event['_source']['details']['requestParameters']['name']
return self.createAlertDict(summary, category, tags, [event], severity)

Просмотреть файл

@ -11,13 +11,10 @@
import json
import os
import pynsive
import sys
import socket
import time
from configlib import getConfig, OptionParser
from datetime import datetime, timedelta
from operator import itemgetter
from datetime import datetime
import boto.sqs
import boto.sts
import boto.s3
@ -35,6 +32,9 @@ from utilities.toUTC import toUTC
from elasticsearch_client import ElasticsearchClient
from utilities.logger import logger, initLogger
from lib.plugins import sendEventToPlugins, registerPlugins
CLOUDTRAIL_VERB_REGEX = re.compile(r'^([A-Z][^A-Z]*)')
# running under uwsgi?
@ -127,6 +127,152 @@ class RoleManager:
'security_token': credential.session_token} if credential else {}
def toUnicode(obj, encoding='utf-8'):
if type(obj) in [int, long, float, complex]:
# likely a number, convert it to string to get to unicode
obj = str(obj)
if isinstance(obj, basestring):
if not isinstance(obj, unicode):
obj = unicode(obj, encoding)
return obj
def removeAt(astring):
'''remove the leading @ from a string'''
return astring.replace('@', '')
def keyMapping(aDict):
'''map common key/fields to a normalized structure,
explicitly typed when possible to avoid schema changes for upsteam consumers
Special accomodations made for logstash,nxlog, beaver, heka and CEF
Some shippers attempt to conform to logstash-style @fieldname convention.
This strips the leading at symbol since it breaks some elastic search
libraries like elasticutils.
'''
returndict = dict()
returndict['source'] = 'cloudtrail'
returndict['details'] = {}
returndict['category'] = 'cloudtrail'
returndict['processid'] = str(os.getpid())
returndict['processname'] = sys.argv[0]
returndict['severity'] = 'INFO'
if 'sourceIPAddress' in aDict and 'eventName' in aDict and 'eventSource' in aDict:
summary_str = "{0} performed {1} in {2}".format(
aDict['sourceIPAddress'],
aDict['eventName'],
aDict['eventSource']
)
returndict['summary'] = summary_str
if 'eventName' in aDict:
returndict['details']['eventVerb'] = CLOUDTRAIL_VERB_REGEX.findall(aDict['eventName'])[0]
returndict['details']['eventReadOnly'] = (returndict['details']['eventVerb'] in ['Describe', 'Get', 'List'])
# set the timestamp when we received it, i.e. now
returndict['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
returndict['mozdefhostname'] = options.mozdefhostname
try:
for k, v in aDict.iteritems():
k = removeAt(k).lower()
if k == 'sourceip':
returndict[u'details']['sourceipaddress'] = v
elif k == 'sourceipaddress':
returndict[u'details']['sourceipaddress'] = v
elif k == 'facility':
returndict[u'source'] = v
elif k in ('eventsource'):
returndict[u'hostname'] = v
elif k in ('message', 'summary'):
returndict[u'summary'] = toUnicode(v)
elif k in ('payload') and 'summary' not in aDict.keys():
# special case for heka if it sends payload as well as a summary, keep both but move payload to the details section.
returndict[u'summary'] = toUnicode(v)
elif k in ('payload'):
returndict[u'details']['payload'] = toUnicode(v)
elif k in ('eventtime', 'timestamp', 'utctimestamp', 'date'):
returndict[u'utctimestamp'] = toUTC(v).isoformat()
returndict[u'timestamp'] = toUTC(v).isoformat()
elif k in ('hostname', 'source_host', 'host'):
returndict[u'hostname'] = toUnicode(v)
elif k in ('tags'):
if 'tags' not in returndict.keys():
returndict[u'tags'] = []
if type(v) == list:
returndict[u'tags'] += v
else:
if len(v) > 0:
returndict[u'tags'].append(v)
# nxlog keeps the severity name in syslogseverity,everyone else should use severity or level.
elif k in ('syslogseverity', 'severity', 'severityvalue', 'level', 'priority'):
returndict[u'severity'] = toUnicode(v).upper()
elif k in ('facility', 'syslogfacility'):
returndict[u'facility'] = toUnicode(v)
elif k in ('pid', 'processid'):
returndict[u'processid'] = toUnicode(v)
# nxlog sets sourcename to the processname (i.e. sshd), everyone else should call it process name or pname
elif k in ('pname', 'processname', 'sourcename', 'program'):
returndict[u'processname'] = toUnicode(v)
# the file, or source
elif k in ('path', 'logger', 'file'):
returndict[u'eventsource'] = toUnicode(v)
elif k in ('type', 'eventtype', 'category'):
returndict[u'category'] = toUnicode(v)
# custom fields as a list/array
elif k in ('fields', 'details'):
if len(v) > 0:
returndict[u'details'] = v
# custom fields/details as a one off, not in an array
# i.e. fields.something=value or details.something=value
# move them to a dict for consistency in querying
elif k.startswith('fields.') or k.startswith('details.'):
newName = k.replace('fields.', '')
newName = newName.lower().replace('details.', '')
# add a dict to hold the details if it doesn't exist
if 'details' not in returndict.keys():
returndict[u'details'] = dict()
# add field with a special case for shippers that
# don't send details
# in an array as int/floats/strings
# we let them dictate the data type with field_datatype
# convention
if newName.endswith('_int'):
returndict[u'details'][unicode(newName)] = int(v)
elif newName.endswith('_float'):
returndict[u'details'][unicode(newName)] = float(v)
else:
returndict[u'details'][unicode(newName)] = toUnicode(v)
else:
returndict[u'details'][k] = v
if 'utctimestamp' not in returndict.keys():
# default in case we don't find a reasonable timestamp
returndict['utctimestamp'] = toUTC(datetime.now()).isoformat()
except Exception as e:
sys.stderr.write('esworker-cloudtrail exception normalizing the message %r\n' % e)
return None
return returndict
def esConnect():
'''open or re-open a connection to elastic search'''
return ElasticsearchClient(
@ -221,131 +367,83 @@ class taskConsumer(object):
time.sleep(.1)
def on_message(self, message):
message['category'] = 'cloudtrail'
message['utctimestamp'] = toUTC(message['eventTime']).isoformat()
message['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
message['mozdefhostname'] = socket.gethostname()
message['hostname'] = message['eventSource']
message['processid'] = os.getpid()
message['processname'] = sys.argv[0]
message['severity'] = 'INFO'
summary_str = "{0} performed {1} in {2}".format(
message['sourceIPAddress'],
message['eventName'],
message['eventSource']
)
message['summary'] = summary_str
message['eventVerb'] = CLOUDTRAIL_VERB_REGEX.findall(
message['eventName'])[0]
message['eventReadOnly'] = (
message['eventVerb'] in ['Describe', 'Get', 'List'])
es.save_event(body=message, doc_type='cloudtrail', bulk=True)
def registerPlugins():
pluginList = list() # tuple of module,registration dict,priority
if os.path.exists('plugins'):
modules = pynsive.list_modules('plugins')
for mname in modules:
module = pynsive.import_module(mname)
reload(module)
if not module:
raise ImportError('Unable to load module {}'.format(mname))
else:
if 'message' in dir(module):
mclass = module.message()
mreg = mclass.registration
if 'priority' in dir(mclass):
mpriority = mclass.priority
else:
mpriority = 100
if isinstance(mreg, list):
print('[*] plugin {0} registered to receive messages with {1}'.format(mname, mreg))
pluginList.append((mclass, mreg, mpriority))
return pluginList
def checkPlugins(pluginList, lastPluginCheck):
if abs(datetime.now() - lastPluginCheck).seconds > options.plugincheckfrequency:
# print('[*] checking plugins')
lastPluginCheck = datetime.now()
pluginList = registerPlugins()
return pluginList, lastPluginCheck
else:
return pluginList, lastPluginCheck
def dict2List(inObj):
'''given a dictionary, potentially with multiple sub dictionaries
return a list of the dict keys and values
'''
if isinstance(inObj, dict):
for key, value in inObj.iteritems():
if isinstance(value, dict):
for d in dict2List(value):
yield d
elif isinstance(value, list):
yield key.encode('ascii', 'ignore').lower()
for l in dict2List(value):
yield l
else:
yield key.encode('ascii', 'ignore').lower()
if isinstance(value, str):
yield value.lower()
elif isinstance(value, unicode):
yield value.encode('ascii', 'ignore').lower()
else:
yield value
elif isinstance(inObj, list):
for v in inObj:
if isinstance(v, str):
yield v.lower()
elif isinstance(v, unicode):
yield v.encode('ascii', 'ignore').lower()
elif isinstance(v, list):
for l in dict2List(v):
yield l
elif isinstance(v, dict):
for d in dict2List(v):
yield d
else:
yield v
else:
yield ''
def sendEventToPlugins(anevent, metadata, pluginList):
'''compare the event to the plugin registrations.
plugins register with a list of keys or values
or values they want to match on
this function compares that registration list
to the current event and sends the event to plugins
in order
'''
if not isinstance(anevent, dict):
raise TypeError('event is type {0}, should be a dict'.format(type(anevent)))
# expecting tuple of module,criteria,priority in pluginList
# sort the plugin list by priority
for plugin in sorted(pluginList, key=itemgetter(2), reverse=False):
# assume we don't run this event through the plugin
send = False
if isinstance(plugin[1], list):
def on_message(self, body):
#print("RECEIVED MESSAGE: %r" % (body, ))
try:
if (set(plugin[1]).intersection([e for e in dict2List(anevent)])):
send = True
except TypeError:
sys.stderr.write('TypeError on set intersection for dict {0}'.format(anevent))
return (anevent, metadata)
if send:
(anevent, metadata) = plugin[0].onMessage(anevent, metadata)
if anevent is None:
# plug-in is signalling to drop this message
# early exit
return (anevent, metadata)
# default elastic search metadata for an event
metadata = {
'index': 'events',
'doc_type': 'cloudtrail',
'id': None
}
# just to be safe..check what we were sent.
if isinstance(body, dict):
bodyDict = body
elif isinstance(body, str) or isinstance(body, unicode):
try:
bodyDict = json.loads(body) # lets assume it's json
except ValueError as e:
# not json..ack but log the message
sys.stderr.write("esworker exception: unknown body type received %r\n" % body)
return
else:
sys.stderr.write("esworker exception: unknown body type received %r\n" % body)
return
return (anevent, metadata)
if 'customendpoint' in bodyDict.keys() and bodyDict['customendpoint']:
# custom document
# send to plugins to allow them to modify it if needed
(normalizedDict, metadata) = sendEventToPlugins(bodyDict, metadata, pluginList)
else:
# normalize the dict
# to the mozdef events standard
normalizedDict = keyMapping(bodyDict)
# send to plugins to allow them to modify it if needed
if normalizedDict is not None and isinstance(normalizedDict, dict) and normalizedDict.keys():
(normalizedDict, metadata) = sendEventToPlugins(normalizedDict, metadata, pluginList)
# drop the message if a plug in set it to None
# signaling a discard
if normalizedDict is None:
return
# make a json version for posting to elastic search
jbody = json.JSONEncoder().encode(normalizedDict)
try:
bulk = False
if options.esbulksize != 0:
bulk = True
bulk = False
res = self.esConnection.save_event(
index=metadata['index'],
doc_id=metadata['id'],
doc_type=metadata['doc_type'],
body=jbody,
bulk=bulk
)
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
# handle loss of server or race condition with index rotation/creation/aliasing
try:
self.esConnection = esConnect()
return
except kombu.exceptions.MessageStateError:
# state may be already set.
return
except ElasticsearchException as e:
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
try:
sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e))
return
except kombu.exceptions.MessageStateError:
# state may be already set.
return
except ValueError as e:
sys.stderr.write("esworker exception in events queue %r\n" % e)
def main():
@ -370,6 +468,9 @@ def main():
def initConfig():
# capture the hostname
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
# output our log to stdout or syslog
options.output = getConfig('output', 'stdout', options.configfile)
options.sysloghostname = getConfig('sysloghostname', 'localhost', options.configfile)
@ -396,23 +497,11 @@ def initConfig():
options.mqport = getConfig('mqport', 5672, options.configfile)
options.mqvhost = getConfig('mqvhost', '/', options.configfile)
# rabbit: run with message acking?
# also toggles transient/persistant delivery (messages in memory only or stored on disk)
# ack=True sets persistant delivery, False sets transient delivery
options.mqack = getConfig('mqack', True, options.configfile)
# aws options
options.accesskey = getConfig('accesskey', '', options.configfile)
options.secretkey = getConfig('secretkey', '', options.configfile)
options.region = getConfig('region', 'us-west-1', options.configfile)
# plugin options
# secs to pass before checking for new/updated plugins
# seems to cause memory leaks..
# regular updates are disabled for now,
# though we set the frequency anyway.
options.plugincheckfrequency = getConfig('plugincheckfrequency', 120, options.configfile)
# This is the full ARN that the s3 bucket lives under
options.cloudtrail_arn = getConfig('cloudtrail_arn', 'cloudtrail_arn', options.configfile)
@ -428,8 +517,5 @@ if __name__ == '__main__':
# open ES connection globally so we don't waste time opening it per message
es = esConnect()
# force a check for plugins and establish the plugin list
pluginList = list()
lastPluginCheck = datetime.now() - timedelta(minutes=60)
pluginList, lastPluginCheck = checkPlugins(pluginList, lastPluginCheck)
pluginList = registerPlugins()
main()

Просмотреть файл

@ -13,9 +13,11 @@ class TestAlertCloudtrailDeadman(AlertTestSuite):
default_event = {
"_type": "cloudtrail",
"_source": {
"details": {
"eventName": "somename"
}
}
}
# This alert is the expected result from running this task
default_alert = {

Просмотреть файл

@ -12,12 +12,14 @@ class TestAlertCloudtrailLoggingDisabled(AlertTestSuite):
default_event = {
"_type": "cloudtrail",
"_source": {
"details": {
"eventName": "StopLogging",
"requestParameters": {
"name": "cloudtrail_example_name"
}
}
}
}
# This alert is the expected result from running this task
default_alert = {
@ -59,7 +61,7 @@ class TestAlertCloudtrailLoggingDisabled(AlertTestSuite):
)
event = AlertTestSuite.create_event(default_event)
event['_source']['eventName'] = 'Badeventname'
event['_source']['details']['eventName'] = 'Badeventname'
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with bad eventName",
@ -78,7 +80,7 @@ class TestAlertCloudtrailLoggingDisabled(AlertTestSuite):
)
event = AlertTestSuite.create_event(default_event)
event['_source']['errorCode'] = 'AccessDenied'
event['_source']['details']['errorCode'] = 'AccessDenied'
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with excluding errorCode",

Просмотреть файл

@ -0,0 +1,134 @@
#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
#
# Contributors:
# Brandon Myers bmyers@mozilla.com
import pytz
import tzlocal
def utc_timezone():
return pytz.timezone('UTC')
tzlocal.get_localzone = utc_timezone
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../../mq"))
from mq import esworker_cloudtrail
class MockOptions():
@property
def mozdefhostname(self):
return 'sample'
class TestKeyMapping():
def setup(self):
mock_options = MockOptions()
esworker_cloudtrail.options = mock_options
self.key_mapping = esworker_cloudtrail.keyMapping
def test_cloudtrail_dict(self):
cloudtrail_dict = {
u'apiVersion': u'20140328',
u'awsRegion': u'us-west-2',
u'eventID': u'd29avef1-1a81-4125-4cb1-3ddca313b6c3',
u'eventName': u'CreateLogStream',
u'eventSource': u'logs.amazonaws.com',
u'eventTime': u'2017-10-14T19:54:01Z',
u'eventType': u'AwsApiCall',
u'eventVersion': u'1.04',
u'recipientAccountId': u'125234098624',
u'requestID': u'6146b9c2-153e-31e2-6782-1fb937ca1c57',
u'requestParameters': {
u'logGroupName': u'/aws/lambda/abcd-webhooks-pulse',
u'logStreamName': u'2017/10/14/[$LATEST]a7918c9450164d3db2cef43f95bba7a7'
},
u'responseElements': None,
u'sourceIPAddress': u'1.2.3.4',
u'userAgent': u'awslambda-worker',
u'userIdentity': {
u'accessKeyId': u'ASBSDGLKHSDGBD2YXSGSLDHTJA',
u'accountId': u'125234098624',
u'arn': u'arn:aws:sts::125234098624:assumed-role/lambda-abcd-webhooks-pulse/abcd-webhooks-pulse',
u'principalId': u'AROABRMQYSEGL3VWEDW3K:abcd-webhooks-pulse',
u'sessionContext': {
u'attributes': {
u'creationDate': u'2017-10-14T19:47:02Z',
u'mfaAuthenticated': u'false'
},
u'sessionIssuer': {
u'accountId': u'125234098624',
u'arn': u'arn:aws:iam::125234098624:role/lambda-abcd-webhooks-pulse',
u'principalId': u'AROABRMQYSEGL3VWEDW3K',
u'type': u'Role',
u'userName': u'lambda-abcd-webhooks-pulse'
}
},
u'type': u'AssumedRole'
}
}
result = self.key_mapping(cloudtrail_dict)
assert result['category'] == 'AwsApiCall'
assert result['hostname'] == 'logs.amazonaws.com'
assert result['mozdefhostname'] == 'sample'
assert type(result['processid']) is str
# verify processid is an integer inside of that string
assert int(result['processid'])
assert type(result['processname']) is str
assert result['severity'] == 'INFO'
assert result['summary'] == '1.2.3.4 performed CreateLogStream in logs.amazonaws.com'
assert result['timestamp'] == '2017-10-14T19:54:01+00:00'
assert result['utctimestamp'] == '2017-10-14T19:54:01+00:00'
assert result['receivedtimestamp'] != result['utctimestamp']
expected_details = {
u'apiversion': u'20140328',
u'awsregion': u'us-west-2',
'eventReadOnly': False,
'eventVerb': u'Create',
u'eventid': u'd29avef1-1a81-4125-4cb1-3ddca313b6c3',
u'eventname': u'CreateLogStream',
u'eventversion': u'1.04',
u'recipientaccountid': u'125234098624',
u'requestid': u'6146b9c2-153e-31e2-6782-1fb937ca1c57',
u'requestparameters': {
u'logGroupName': u'/aws/lambda/abcd-webhooks-pulse',
u'logStreamName': u'2017/10/14/[$LATEST]a7918c9450164d3db2cef43f95bba7a7'
},
u'responseelements': None,
'sourceipaddress': u'1.2.3.4',
u'useragent': u'awslambda-worker',
u'useridentity': {
u'accessKeyId': u'ASBSDGLKHSDGBD2YXSGSLDHTJA',
u'accountId': u'125234098624',
u'arn': u'arn:aws:sts::125234098624:assumed-role/lambda-abcd-webhooks-pulse/abcd-webhooks-pulse',
u'principalId': u'AROABRMQYSEGL3VWEDW3K:abcd-webhooks-pulse',
u'sessionContext': {
u'attributes': {
u'creationDate': u'2017-10-14T19:47:02Z',
u'mfaAuthenticated': u'false'
},
u'sessionIssuer': {
u'accountId': u'125234098624',
u'arn': u'arn:aws:iam::125234098624:role/lambda-abcd-webhooks-pulse',
u'principalId': u'AROABRMQYSEGL3VWEDW3K',
u'type': u'Role',
u'userName': u'lambda-abcd-webhooks-pulse'
}
},
u'type': u'AssumedRole'
}
}
assert result['details'] == expected_details