diff --git a/alerts/cloudtrail_logging_disabled.py b/alerts/cloudtrail_logging_disabled.py index d197cb02..0920075a 100644 --- a/alerts/cloudtrail_logging_disabled.py +++ b/alerts/cloudtrail_logging_disabled.py @@ -18,10 +18,10 @@ class AlertCloudtrailLoggingDisabled(AlertTask): search_query.add_must([ TermMatch('_type', 'cloudtrail'), - TermMatch('eventName', 'StopLogging'), + TermMatch('details.eventName', 'StopLogging'), ]) - search_query.add_must_not(TermMatch('errorCode', 'AccessDenied')) + search_query.add_must_not(TermMatch('details.errorCode', 'AccessDenied')) self.filtersManual(search_query) self.searchEventsSimple() @@ -32,6 +32,6 @@ class AlertCloudtrailLoggingDisabled(AlertTask): tags = ['cloudtrail', 'aws', 'cloudtrailpagerduty'] severity = 'CRITICAL' - summary = 'Cloudtrail Logging Disabled: ' + event['_source']['requestParameters']['name'] + summary = 'Cloudtrail Logging Disabled: ' + event['_source']['details']['requestParameters']['name'] return self.createAlertDict(summary, category, tags, [event], severity) diff --git a/mq/esworker_cloudtrail.py b/mq/esworker_cloudtrail.py index 9dd58b66..641a8c00 100755 --- a/mq/esworker_cloudtrail.py +++ b/mq/esworker_cloudtrail.py @@ -11,13 +11,10 @@ import json import os -import pynsive import sys import socket -import time from configlib import getConfig, OptionParser -from datetime import datetime, timedelta -from operator import itemgetter +from datetime import datetime import boto.sqs import boto.sts import boto.s3 @@ -35,6 +32,9 @@ from utilities.toUTC import toUTC from elasticsearch_client import ElasticsearchClient from utilities.logger import logger, initLogger +from lib.plugins import sendEventToPlugins, registerPlugins + + CLOUDTRAIL_VERB_REGEX = re.compile(r'^([A-Z][^A-Z]*)') # running under uwsgi? @@ -127,6 +127,152 @@ class RoleManager: 'security_token': credential.session_token} if credential else {} +def toUnicode(obj, encoding='utf-8'): + if type(obj) in [int, long, float, complex]: + # likely a number, convert it to string to get to unicode + obj = str(obj) + if isinstance(obj, basestring): + if not isinstance(obj, unicode): + obj = unicode(obj, encoding) + return obj + + +def removeAt(astring): + '''remove the leading @ from a string''' + return astring.replace('@', '') + + +def keyMapping(aDict): + '''map common key/fields to a normalized structure, + explicitly typed when possible to avoid schema changes for upsteam consumers + Special accomodations made for logstash,nxlog, beaver, heka and CEF + Some shippers attempt to conform to logstash-style @fieldname convention. + This strips the leading at symbol since it breaks some elastic search + libraries like elasticutils. + ''' + returndict = dict() + + returndict['source'] = 'cloudtrail' + returndict['details'] = {} + returndict['category'] = 'cloudtrail' + returndict['processid'] = str(os.getpid()) + returndict['processname'] = sys.argv[0] + returndict['severity'] = 'INFO' + if 'sourceIPAddress' in aDict and 'eventName' in aDict and 'eventSource' in aDict: + summary_str = "{0} performed {1} in {2}".format( + aDict['sourceIPAddress'], + aDict['eventName'], + aDict['eventSource'] + ) + returndict['summary'] = summary_str + + if 'eventName' in aDict: + returndict['details']['eventVerb'] = CLOUDTRAIL_VERB_REGEX.findall(aDict['eventName'])[0] + returndict['details']['eventReadOnly'] = (returndict['details']['eventVerb'] in ['Describe', 'Get', 'List']) + # set the timestamp when we received it, i.e. now + returndict['receivedtimestamp'] = toUTC(datetime.now()).isoformat() + returndict['mozdefhostname'] = options.mozdefhostname + try: + for k, v in aDict.iteritems(): + k = removeAt(k).lower() + + if k == 'sourceip': + returndict[u'details']['sourceipaddress'] = v + + elif k == 'sourceipaddress': + returndict[u'details']['sourceipaddress'] = v + + elif k == 'facility': + returndict[u'source'] = v + + elif k in ('eventsource'): + returndict[u'hostname'] = v + + elif k in ('message', 'summary'): + returndict[u'summary'] = toUnicode(v) + + elif k in ('payload') and 'summary' not in aDict.keys(): + # special case for heka if it sends payload as well as a summary, keep both but move payload to the details section. + returndict[u'summary'] = toUnicode(v) + elif k in ('payload'): + returndict[u'details']['payload'] = toUnicode(v) + + elif k in ('eventtime', 'timestamp', 'utctimestamp', 'date'): + returndict[u'utctimestamp'] = toUTC(v).isoformat() + returndict[u'timestamp'] = toUTC(v).isoformat() + + elif k in ('hostname', 'source_host', 'host'): + returndict[u'hostname'] = toUnicode(v) + + elif k in ('tags'): + if 'tags' not in returndict.keys(): + returndict[u'tags'] = [] + if type(v) == list: + returndict[u'tags'] += v + else: + if len(v) > 0: + returndict[u'tags'].append(v) + + # nxlog keeps the severity name in syslogseverity,everyone else should use severity or level. + elif k in ('syslogseverity', 'severity', 'severityvalue', 'level', 'priority'): + returndict[u'severity'] = toUnicode(v).upper() + + elif k in ('facility', 'syslogfacility'): + returndict[u'facility'] = toUnicode(v) + + elif k in ('pid', 'processid'): + returndict[u'processid'] = toUnicode(v) + + # nxlog sets sourcename to the processname (i.e. sshd), everyone else should call it process name or pname + elif k in ('pname', 'processname', 'sourcename', 'program'): + returndict[u'processname'] = toUnicode(v) + + # the file, or source + elif k in ('path', 'logger', 'file'): + returndict[u'eventsource'] = toUnicode(v) + + elif k in ('type', 'eventtype', 'category'): + returndict[u'category'] = toUnicode(v) + + # custom fields as a list/array + elif k in ('fields', 'details'): + if len(v) > 0: + returndict[u'details'] = v + + # custom fields/details as a one off, not in an array + # i.e. fields.something=value or details.something=value + # move them to a dict for consistency in querying + elif k.startswith('fields.') or k.startswith('details.'): + newName = k.replace('fields.', '') + newName = newName.lower().replace('details.', '') + # add a dict to hold the details if it doesn't exist + if 'details' not in returndict.keys(): + returndict[u'details'] = dict() + # add field with a special case for shippers that + # don't send details + # in an array as int/floats/strings + # we let them dictate the data type with field_datatype + # convention + if newName.endswith('_int'): + returndict[u'details'][unicode(newName)] = int(v) + elif newName.endswith('_float'): + returndict[u'details'][unicode(newName)] = float(v) + else: + returndict[u'details'][unicode(newName)] = toUnicode(v) + else: + returndict[u'details'][k] = v + + if 'utctimestamp' not in returndict.keys(): + # default in case we don't find a reasonable timestamp + returndict['utctimestamp'] = toUTC(datetime.now()).isoformat() + + except Exception as e: + sys.stderr.write('esworker-cloudtrail exception normalizing the message %r\n' % e) + return None + + return returndict + + def esConnect(): '''open or re-open a connection to elastic search''' return ElasticsearchClient( @@ -221,131 +367,83 @@ class taskConsumer(object): time.sleep(.1) - def on_message(self, message): - message['category'] = 'cloudtrail' - message['utctimestamp'] = toUTC(message['eventTime']).isoformat() - message['receivedtimestamp'] = toUTC(datetime.now()).isoformat() - message['mozdefhostname'] = socket.gethostname() - message['hostname'] = message['eventSource'] - message['processid'] = os.getpid() - message['processname'] = sys.argv[0] - message['severity'] = 'INFO' - summary_str = "{0} performed {1} in {2}".format( - message['sourceIPAddress'], - message['eventName'], - message['eventSource'] - ) - message['summary'] = summary_str - message['eventVerb'] = CLOUDTRAIL_VERB_REGEX.findall( - message['eventName'])[0] - message['eventReadOnly'] = ( - message['eventVerb'] in ['Describe', 'Get', 'List']) - es.save_event(body=message, doc_type='cloudtrail', bulk=True) - - -def registerPlugins(): - pluginList = list() # tuple of module,registration dict,priority - if os.path.exists('plugins'): - modules = pynsive.list_modules('plugins') - for mname in modules: - module = pynsive.import_module(mname) - reload(module) - if not module: - raise ImportError('Unable to load module {}'.format(mname)) + def on_message(self, body): + #print("RECEIVED MESSAGE: %r" % (body, )) + try: + # default elastic search metadata for an event + metadata = { + 'index': 'events', + 'doc_type': 'cloudtrail', + 'id': None + } + # just to be safe..check what we were sent. + if isinstance(body, dict): + bodyDict = body + elif isinstance(body, str) or isinstance(body, unicode): + try: + bodyDict = json.loads(body) # lets assume it's json + except ValueError as e: + # not json..ack but log the message + sys.stderr.write("esworker exception: unknown body type received %r\n" % body) + return else: - if 'message' in dir(module): - mclass = module.message() - mreg = mclass.registration - if 'priority' in dir(mclass): - mpriority = mclass.priority - else: - mpriority = 100 - if isinstance(mreg, list): - print('[*] plugin {0} registered to receive messages with {1}'.format(mname, mreg)) - pluginList.append((mclass, mreg, mpriority)) - return pluginList + sys.stderr.write("esworker exception: unknown body type received %r\n" % body) + return - -def checkPlugins(pluginList, lastPluginCheck): - if abs(datetime.now() - lastPluginCheck).seconds > options.plugincheckfrequency: - # print('[*] checking plugins') - lastPluginCheck = datetime.now() - pluginList = registerPlugins() - return pluginList, lastPluginCheck - else: - return pluginList, lastPluginCheck - - -def dict2List(inObj): - '''given a dictionary, potentially with multiple sub dictionaries - return a list of the dict keys and values - ''' - if isinstance(inObj, dict): - for key, value in inObj.iteritems(): - if isinstance(value, dict): - for d in dict2List(value): - yield d - elif isinstance(value, list): - yield key.encode('ascii', 'ignore').lower() - for l in dict2List(value): - yield l + if 'customendpoint' in bodyDict.keys() and bodyDict['customendpoint']: + # custom document + # send to plugins to allow them to modify it if needed + (normalizedDict, metadata) = sendEventToPlugins(bodyDict, metadata, pluginList) else: - yield key.encode('ascii', 'ignore').lower() - if isinstance(value, str): - yield value.lower() - elif isinstance(value, unicode): - yield value.encode('ascii', 'ignore').lower() - else: - yield value - elif isinstance(inObj, list): - for v in inObj: - if isinstance(v, str): - yield v.lower() - elif isinstance(v, unicode): - yield v.encode('ascii', 'ignore').lower() - elif isinstance(v, list): - for l in dict2List(v): - yield l - elif isinstance(v, dict): - for d in dict2List(v): - yield d - else: - yield v - else: - yield '' + # normalize the dict + # to the mozdef events standard + normalizedDict = keyMapping(bodyDict) + # send to plugins to allow them to modify it if needed + if normalizedDict is not None and isinstance(normalizedDict, dict) and normalizedDict.keys(): + (normalizedDict, metadata) = sendEventToPlugins(normalizedDict, metadata, pluginList) -def sendEventToPlugins(anevent, metadata, pluginList): - '''compare the event to the plugin registrations. - plugins register with a list of keys or values - or values they want to match on - this function compares that registration list - to the current event and sends the event to plugins - in order - ''' - if not isinstance(anevent, dict): - raise TypeError('event is type {0}, should be a dict'.format(type(anevent))) + # drop the message if a plug in set it to None + # signaling a discard + if normalizedDict is None: + return + + # make a json version for posting to elastic search + jbody = json.JSONEncoder().encode(normalizedDict) - # expecting tuple of module,criteria,priority in pluginList - # sort the plugin list by priority - for plugin in sorted(pluginList, key=itemgetter(2), reverse=False): - # assume we don't run this event through the plugin - send = False - if isinstance(plugin[1], list): try: - if (set(plugin[1]).intersection([e for e in dict2List(anevent)])): - send = True - except TypeError: - sys.stderr.write('TypeError on set intersection for dict {0}'.format(anevent)) - return (anevent, metadata) - if send: - (anevent, metadata) = plugin[0].onMessage(anevent, metadata) - if anevent is None: - # plug-in is signalling to drop this message - # early exit - return (anevent, metadata) + bulk = False + if options.esbulksize != 0: + bulk = True - return (anevent, metadata) + bulk = False + res = self.esConnection.save_event( + index=metadata['index'], + doc_id=metadata['id'], + doc_type=metadata['doc_type'], + body=jbody, + bulk=bulk + ) + + except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e: + # handle loss of server or race condition with index rotation/creation/aliasing + try: + self.esConnection = esConnect() + return + except kombu.exceptions.MessageStateError: + # state may be already set. + return + except ElasticsearchException as e: + # exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message + try: + sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e)) + return + except kombu.exceptions.MessageStateError: + # state may be already set. + return + + except ValueError as e: + sys.stderr.write("esworker exception in events queue %r\n" % e) def main(): @@ -370,6 +468,9 @@ def main(): def initConfig(): + # capture the hostname + options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile) + # output our log to stdout or syslog options.output = getConfig('output', 'stdout', options.configfile) options.sysloghostname = getConfig('sysloghostname', 'localhost', options.configfile) @@ -396,23 +497,11 @@ def initConfig(): options.mqport = getConfig('mqport', 5672, options.configfile) options.mqvhost = getConfig('mqvhost', '/', options.configfile) - # rabbit: run with message acking? - # also toggles transient/persistant delivery (messages in memory only or stored on disk) - # ack=True sets persistant delivery, False sets transient delivery - options.mqack = getConfig('mqack', True, options.configfile) - # aws options options.accesskey = getConfig('accesskey', '', options.configfile) options.secretkey = getConfig('secretkey', '', options.configfile) options.region = getConfig('region', 'us-west-1', options.configfile) - # plugin options - # secs to pass before checking for new/updated plugins - # seems to cause memory leaks.. - # regular updates are disabled for now, - # though we set the frequency anyway. - options.plugincheckfrequency = getConfig('plugincheckfrequency', 120, options.configfile) - # This is the full ARN that the s3 bucket lives under options.cloudtrail_arn = getConfig('cloudtrail_arn', 'cloudtrail_arn', options.configfile) @@ -428,8 +517,5 @@ if __name__ == '__main__': # open ES connection globally so we don't waste time opening it per message es = esConnect() - # force a check for plugins and establish the plugin list - pluginList = list() - lastPluginCheck = datetime.now() - timedelta(minutes=60) - pluginList, lastPluginCheck = checkPlugins(pluginList, lastPluginCheck) + pluginList = registerPlugins() main() diff --git a/tests/alerts/test_cloudtrail_deadman.py b/tests/alerts/test_cloudtrail_deadman.py index e7c420b7..aef1db5c 100644 --- a/tests/alerts/test_cloudtrail_deadman.py +++ b/tests/alerts/test_cloudtrail_deadman.py @@ -13,7 +13,9 @@ class TestAlertCloudtrailDeadman(AlertTestSuite): default_event = { "_type": "cloudtrail", "_source": { - "eventName": "somename" + "details": { + "eventName": "somename" + } } } diff --git a/tests/alerts/test_cloudtrail_logging_disabled.py b/tests/alerts/test_cloudtrail_logging_disabled.py index 9f7b8ee2..056eacba 100644 --- a/tests/alerts/test_cloudtrail_logging_disabled.py +++ b/tests/alerts/test_cloudtrail_logging_disabled.py @@ -12,9 +12,11 @@ class TestAlertCloudtrailLoggingDisabled(AlertTestSuite): default_event = { "_type": "cloudtrail", "_source": { - "eventName": "StopLogging", - "requestParameters": { - "name": "cloudtrail_example_name" + "details": { + "eventName": "StopLogging", + "requestParameters": { + "name": "cloudtrail_example_name" + } } } } @@ -59,7 +61,7 @@ class TestAlertCloudtrailLoggingDisabled(AlertTestSuite): ) event = AlertTestSuite.create_event(default_event) - event['_source']['eventName'] = 'Badeventname' + event['_source']['details']['eventName'] = 'Badeventname' test_cases.append( NegativeAlertTestCase( description="Negative test case with bad eventName", @@ -78,7 +80,7 @@ class TestAlertCloudtrailLoggingDisabled(AlertTestSuite): ) event = AlertTestSuite.create_event(default_event) - event['_source']['errorCode'] = 'AccessDenied' + event['_source']['details']['errorCode'] = 'AccessDenied' test_cases.append( NegativeAlertTestCase( description="Negative test case with excluding errorCode", diff --git a/tests/mq/test_esworker_cloudtrail.py b/tests/mq/test_esworker_cloudtrail.py new file mode 100644 index 00000000..d8ea0620 --- /dev/null +++ b/tests/mq/test_esworker_cloudtrail.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# Copyright (c) 2017 Mozilla Corporation +# +# Contributors: +# Brandon Myers bmyers@mozilla.com + +import pytz +import tzlocal + + +def utc_timezone(): + return pytz.timezone('UTC') + + +tzlocal.get_localzone = utc_timezone + + +import os +import sys +sys.path.append(os.path.join(os.path.dirname(__file__), "../../mq")) +from mq import esworker_cloudtrail + + +class MockOptions(): + @property + def mozdefhostname(self): + return 'sample' + + +class TestKeyMapping(): + def setup(self): + mock_options = MockOptions() + esworker_cloudtrail.options = mock_options + self.key_mapping = esworker_cloudtrail.keyMapping + + def test_cloudtrail_dict(self): + cloudtrail_dict = { + u'apiVersion': u'20140328', + u'awsRegion': u'us-west-2', + u'eventID': u'd29avef1-1a81-4125-4cb1-3ddca313b6c3', + u'eventName': u'CreateLogStream', + u'eventSource': u'logs.amazonaws.com', + u'eventTime': u'2017-10-14T19:54:01Z', + u'eventType': u'AwsApiCall', + u'eventVersion': u'1.04', + u'recipientAccountId': u'125234098624', + u'requestID': u'6146b9c2-153e-31e2-6782-1fb937ca1c57', + u'requestParameters': { + u'logGroupName': u'/aws/lambda/abcd-webhooks-pulse', + u'logStreamName': u'2017/10/14/[$LATEST]a7918c9450164d3db2cef43f95bba7a7' + }, + u'responseElements': None, + u'sourceIPAddress': u'1.2.3.4', + u'userAgent': u'awslambda-worker', + u'userIdentity': { + u'accessKeyId': u'ASBSDGLKHSDGBD2YXSGSLDHTJA', + u'accountId': u'125234098624', + u'arn': u'arn:aws:sts::125234098624:assumed-role/lambda-abcd-webhooks-pulse/abcd-webhooks-pulse', + u'principalId': u'AROABRMQYSEGL3VWEDW3K:abcd-webhooks-pulse', + u'sessionContext': { + u'attributes': { + u'creationDate': u'2017-10-14T19:47:02Z', + u'mfaAuthenticated': u'false' + }, + u'sessionIssuer': { + u'accountId': u'125234098624', + u'arn': u'arn:aws:iam::125234098624:role/lambda-abcd-webhooks-pulse', + u'principalId': u'AROABRMQYSEGL3VWEDW3K', + u'type': u'Role', + u'userName': u'lambda-abcd-webhooks-pulse' + } + }, + u'type': u'AssumedRole' + } + } + + result = self.key_mapping(cloudtrail_dict) + + assert result['category'] == 'AwsApiCall' + assert result['hostname'] == 'logs.amazonaws.com' + assert result['mozdefhostname'] == 'sample' + assert type(result['processid']) is str + # verify processid is an integer inside of that string + assert int(result['processid']) + assert type(result['processname']) is str + assert result['severity'] == 'INFO' + assert result['summary'] == '1.2.3.4 performed CreateLogStream in logs.amazonaws.com' + assert result['timestamp'] == '2017-10-14T19:54:01+00:00' + assert result['utctimestamp'] == '2017-10-14T19:54:01+00:00' + assert result['receivedtimestamp'] != result['utctimestamp'] + expected_details = { + u'apiversion': u'20140328', + u'awsregion': u'us-west-2', + 'eventReadOnly': False, + 'eventVerb': u'Create', + u'eventid': u'd29avef1-1a81-4125-4cb1-3ddca313b6c3', + u'eventname': u'CreateLogStream', + u'eventversion': u'1.04', + u'recipientaccountid': u'125234098624', + u'requestid': u'6146b9c2-153e-31e2-6782-1fb937ca1c57', + u'requestparameters': { + u'logGroupName': u'/aws/lambda/abcd-webhooks-pulse', + u'logStreamName': u'2017/10/14/[$LATEST]a7918c9450164d3db2cef43f95bba7a7' + }, + u'responseelements': None, + 'sourceipaddress': u'1.2.3.4', + u'useragent': u'awslambda-worker', + u'useridentity': { + u'accessKeyId': u'ASBSDGLKHSDGBD2YXSGSLDHTJA', + u'accountId': u'125234098624', + u'arn': u'arn:aws:sts::125234098624:assumed-role/lambda-abcd-webhooks-pulse/abcd-webhooks-pulse', + u'principalId': u'AROABRMQYSEGL3VWEDW3K:abcd-webhooks-pulse', + u'sessionContext': { + u'attributes': { + u'creationDate': u'2017-10-14T19:47:02Z', + u'mfaAuthenticated': u'false' + }, + u'sessionIssuer': { + u'accountId': u'125234098624', + u'arn': u'arn:aws:iam::125234098624:role/lambda-abcd-webhooks-pulse', + u'principalId': u'AROABRMQYSEGL3VWEDW3K', + u'type': u'Role', + u'userName': u'lambda-abcd-webhooks-pulse' + } + }, + u'type': u'AssumedRole' + } + } + + assert result['details'] == expected_details