зеркало из https://github.com/mozilla/MozDef.git
Merge pull request #1468 from mpurzynski/esworker_exceptions_fixups
Esworker exceptions fixups
This commit is contained in:
Коммит
4f0bafed14
|
@ -17,12 +17,16 @@ import gzip
|
|||
from io import BytesIO
|
||||
import re
|
||||
import time
|
||||
import kombu
|
||||
from ssl import SSLEOFError, SSLError
|
||||
from threading import Thread
|
||||
|
||||
from mozdef_util.utilities.toUTC import toUTC
|
||||
from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
from mozdef_util.elasticsearch_client import (
|
||||
ElasticsearchClient,
|
||||
ElasticsearchBadServer,
|
||||
ElasticsearchInvalidIndex,
|
||||
ElasticsearchException,
|
||||
)
|
||||
from mozdef_util.utilities.logger import logger, initLogger
|
||||
from mozdef_util.utilities.to_unicode import toUnicode
|
||||
from mozdef_util.utilities.remove_at import removeAt
|
||||
|
@ -32,165 +36,163 @@ from lib.plugins import sendEventToPlugins, registerPlugins
|
|||
from lib.sqs import connect_sqs
|
||||
|
||||
|
||||
CLOUDTRAIL_VERB_REGEX = re.compile(r'^([A-Z][^A-Z]*)')
|
||||
CLOUDTRAIL_VERB_REGEX = re.compile(r"^([A-Z][^A-Z]*)")
|
||||
|
||||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
|
||||
hasUWSGI = True
|
||||
except ImportError as e:
|
||||
hasUWSGI = False
|
||||
|
||||
|
||||
def keyMapping(aDict):
|
||||
'''map common key/fields to a normalized structure,
|
||||
"""map common key/fields to a normalized structure,
|
||||
explicitly typed when possible to avoid schema changes for upsteam consumers
|
||||
Special accomodations made for logstash,nxlog, beaver, heka and CEF
|
||||
Some shippers attempt to conform to logstash-style @fieldname convention.
|
||||
This strips the leading at symbol since it breaks some elastic search
|
||||
libraries like elasticutils.
|
||||
'''
|
||||
"""
|
||||
returndict = dict()
|
||||
|
||||
returndict['source'] = 'cloudtrail'
|
||||
returndict['details'] = {}
|
||||
returndict['category'] = 'cloudtrail'
|
||||
returndict['processid'] = str(os.getpid())
|
||||
returndict['processname'] = sys.argv[0]
|
||||
returndict['severity'] = 'INFO'
|
||||
if 'sourceIPAddress' in aDict and 'eventName' in aDict and 'eventSource' in aDict:
|
||||
returndict["source"] = "cloudtrail"
|
||||
returndict["details"] = {}
|
||||
returndict["category"] = "cloudtrail"
|
||||
returndict["processid"] = str(os.getpid())
|
||||
returndict["processname"] = sys.argv[0]
|
||||
returndict["severity"] = "INFO"
|
||||
if "sourceIPAddress" in aDict and "eventName" in aDict and "eventSource" in aDict:
|
||||
summary_str = "{0} performed {1} in {2}".format(
|
||||
aDict['sourceIPAddress'],
|
||||
aDict['eventName'],
|
||||
aDict['eventSource']
|
||||
aDict["sourceIPAddress"], aDict["eventName"], aDict["eventSource"]
|
||||
)
|
||||
returndict['summary'] = summary_str
|
||||
returndict["summary"] = summary_str
|
||||
|
||||
if 'eventName' in aDict:
|
||||
if "eventName" in aDict:
|
||||
# Uppercase first character
|
||||
aDict['eventName'] = aDict['eventName'][0].upper() + aDict['eventName'][1:]
|
||||
returndict['details']['eventVerb'] = CLOUDTRAIL_VERB_REGEX.findall(aDict['eventName'])[0]
|
||||
returndict['details']['eventReadOnly'] = (returndict['details']['eventVerb'] in ['Describe', 'Get', 'List'])
|
||||
aDict["eventName"] = aDict["eventName"][0].upper() + aDict["eventName"][1:]
|
||||
returndict["details"]["eventVerb"] = CLOUDTRAIL_VERB_REGEX.findall(aDict["eventName"])[0]
|
||||
returndict["details"]["eventReadOnly"] = returndict["details"]["eventVerb"] in ["Describe", "Get", "List"]
|
||||
# set the timestamp when we received it, i.e. now
|
||||
returndict['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict['mozdefhostname'] = options.mozdefhostname
|
||||
returndict["receivedtimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
returndict["mozdefhostname"] = options.mozdefhostname
|
||||
try:
|
||||
for k, v in aDict.items():
|
||||
k = removeAt(k).lower()
|
||||
|
||||
if k == 'sourceip':
|
||||
returndict['details']['sourceipaddress'] = v
|
||||
if k == "sourceip":
|
||||
returndict["details"]["sourceipaddress"] = v
|
||||
|
||||
elif k == 'sourceipaddress':
|
||||
returndict['details']['sourceipaddress'] = v
|
||||
elif k == "sourceipaddress":
|
||||
returndict["details"]["sourceipaddress"] = v
|
||||
|
||||
elif k in ('facility', 'source'):
|
||||
returndict['source'] = v
|
||||
elif k in ("facility", "source"):
|
||||
returndict["source"] = v
|
||||
|
||||
elif k in ('eventsource'):
|
||||
returndict['hostname'] = v
|
||||
elif k in ("eventsource"):
|
||||
returndict["hostname"] = v
|
||||
|
||||
elif k in ('message', 'summary'):
|
||||
returndict['summary'] = toUnicode(v)
|
||||
elif k in ("message", "summary"):
|
||||
returndict["summary"] = toUnicode(v)
|
||||
|
||||
elif k in ('payload') and 'summary' not in aDict:
|
||||
elif k in ("payload") and "summary" not in aDict:
|
||||
# special case for heka if it sends payload as well as a summary, keep both but move payload to the details section.
|
||||
returndict['summary'] = toUnicode(v)
|
||||
elif k in ('payload'):
|
||||
returndict['details']['payload'] = toUnicode(v)
|
||||
returndict["summary"] = toUnicode(v)
|
||||
elif k in ("payload"):
|
||||
returndict["details"]["payload"] = toUnicode(v)
|
||||
|
||||
elif k in ('eventtime', 'timestamp', 'utctimestamp', 'date'):
|
||||
returndict['utctimestamp'] = toUTC(v).isoformat()
|
||||
returndict['timestamp'] = toUTC(v).isoformat()
|
||||
elif k in ("eventtime", "timestamp", "utctimestamp", "date"):
|
||||
returndict["utctimestamp"] = toUTC(v).isoformat()
|
||||
returndict["timestamp"] = toUTC(v).isoformat()
|
||||
|
||||
elif k in ('hostname', 'source_host', 'host'):
|
||||
returndict['hostname'] = toUnicode(v)
|
||||
elif k in ("hostname", "source_host", "host"):
|
||||
returndict["hostname"] = toUnicode(v)
|
||||
|
||||
elif k in ('tags'):
|
||||
if 'tags' not in returndict:
|
||||
returndict['tags'] = []
|
||||
elif k in ("tags"):
|
||||
if "tags" not in returndict:
|
||||
returndict["tags"] = []
|
||||
if type(v) == list:
|
||||
returndict['tags'] += v
|
||||
returndict["tags"] += v
|
||||
else:
|
||||
if len(v) > 0:
|
||||
returndict['tags'].append(v)
|
||||
returndict["tags"].append(v)
|
||||
|
||||
# nxlog keeps the severity name in syslogseverity,everyone else should use severity or level.
|
||||
elif k in ('syslogseverity', 'severity', 'severityvalue', 'level', 'priority'):
|
||||
returndict['severity'] = toUnicode(v).upper()
|
||||
elif k in ("syslogseverity", "severity", "severityvalue", "level", "priority"):
|
||||
returndict["severity"] = toUnicode(v).upper()
|
||||
|
||||
elif k in ('facility', 'syslogfacility'):
|
||||
returndict['facility'] = toUnicode(v)
|
||||
elif k in ("facility", "syslogfacility"):
|
||||
returndict["facility"] = toUnicode(v)
|
||||
|
||||
elif k in ('pid', 'processid'):
|
||||
returndict['processid'] = toUnicode(v)
|
||||
elif k in ("pid", "processid"):
|
||||
returndict["processid"] = toUnicode(v)
|
||||
|
||||
# nxlog sets sourcename to the processname (i.e. sshd), everyone else should call it process name or pname
|
||||
elif k in ('pname', 'processname', 'sourcename', 'program'):
|
||||
returndict['processname'] = toUnicode(v)
|
||||
elif k in ("pname", "processname", "sourcename", "program"):
|
||||
returndict["processname"] = toUnicode(v)
|
||||
|
||||
# the file, or source
|
||||
elif k in ('path', 'logger', 'file'):
|
||||
returndict['eventsource'] = toUnicode(v)
|
||||
elif k in ("path", "logger", "file"):
|
||||
returndict["eventsource"] = toUnicode(v)
|
||||
|
||||
elif k in ('type', 'eventtype', 'category'):
|
||||
returndict['category'] = toUnicode(v)
|
||||
returndict['type'] = 'cloudtrail'
|
||||
elif k in ("type", "eventtype", "category"):
|
||||
returndict["category"] = toUnicode(v)
|
||||
returndict["type"] = "cloudtrail"
|
||||
|
||||
# custom fields as a list/array
|
||||
elif k in ('fields', 'details'):
|
||||
elif k in ("fields", "details"):
|
||||
if type(v) is not dict:
|
||||
returndict['details']['message'] = v
|
||||
returndict["details"]["message"] = v
|
||||
else:
|
||||
if len(v) > 0:
|
||||
for details_key, details_value in v.items():
|
||||
returndict['details'][details_key] = details_value
|
||||
returndict["details"][details_key] = details_value
|
||||
|
||||
# custom fields/details as a one off, not in an array
|
||||
# i.e. fields.something=value or details.something=value
|
||||
# move them to a dict for consistency in querying
|
||||
elif k.startswith('fields.') or k.startswith('details.'):
|
||||
newName = k.replace('fields.', '')
|
||||
newName = newName.lower().replace('details.', '')
|
||||
elif k.startswith("fields.") or k.startswith("details."):
|
||||
newName = k.replace("fields.", "")
|
||||
newName = newName.lower().replace("details.", "")
|
||||
# add a dict to hold the details if it doesn't exist
|
||||
if 'details' not in returndict:
|
||||
returndict['details'] = dict()
|
||||
if "details" not in returndict:
|
||||
returndict["details"] = dict()
|
||||
# add field with a special case for shippers that
|
||||
# don't send details
|
||||
# in an array as int/floats/strings
|
||||
# we let them dictate the data type with field_datatype
|
||||
# convention
|
||||
if newName.endswith('_int'):
|
||||
returndict['details'][str(newName)] = int(v)
|
||||
elif newName.endswith('_float'):
|
||||
returndict['details'][str(newName)] = float(v)
|
||||
if newName.endswith("_int"):
|
||||
returndict["details"][str(newName)] = int(v)
|
||||
elif newName.endswith("_float"):
|
||||
returndict["details"][str(newName)] = float(v)
|
||||
else:
|
||||
returndict['details'][str(newName)] = toUnicode(v)
|
||||
returndict["details"][str(newName)] = toUnicode(v)
|
||||
else:
|
||||
returndict['details'][k] = v
|
||||
returndict["details"][k] = v
|
||||
|
||||
if 'utctimestamp' not in returndict:
|
||||
if "utctimestamp" not in returndict:
|
||||
# default in case we don't find a reasonable timestamp
|
||||
returndict['utctimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict["utctimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.error('Malformed message: %r' % aDict)
|
||||
logger.error("Malformed message: %r" % aDict)
|
||||
|
||||
return returndict
|
||||
|
||||
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
"""open or re-open a connection to elastic search"""
|
||||
return ElasticsearchClient(
|
||||
(list('{0}'.format(s) for s in options.esservers)),
|
||||
(list("{0}".format(s) for s in options.esservers)),
|
||||
bulk_amount=options.esbulksize,
|
||||
bulk_refresh_time=options.esbulktimeout
|
||||
bulk_refresh_time=options.esbulktimeout,
|
||||
)
|
||||
|
||||
|
||||
class taskConsumer(object):
|
||||
|
||||
def __init__(self, queue, esConnection):
|
||||
self.sqs_queue = queue
|
||||
self.esConnection = esConnection
|
||||
|
@ -207,44 +209,34 @@ class taskConsumer(object):
|
|||
# between reauthenticating and getting a new set of creds
|
||||
# eventually this gets set by aws response
|
||||
self.flush_wait_time = 1800
|
||||
if options.cloudtrail_arn not in ['<cloudtrail_arn>', 'cloudtrail_arn']:
|
||||
client = boto3.client(
|
||||
'sts',
|
||||
aws_access_key_id=options.accesskey,
|
||||
aws_secret_access_key=options.secretkey
|
||||
)
|
||||
response = client.assume_role(
|
||||
RoleArn=options.cloudtrail_arn,
|
||||
RoleSessionName='MozDef-CloudTrail-Reader',
|
||||
)
|
||||
if options.cloudtrail_arn not in ["<cloudtrail_arn>", "cloudtrail_arn"]:
|
||||
client = boto3.client("sts", aws_access_key_id=options.accesskey, aws_secret_access_key=options.secretkey)
|
||||
response = client.assume_role(RoleArn=options.cloudtrail_arn, RoleSessionName="MozDef-CloudTrail-Reader")
|
||||
role_creds = {
|
||||
'aws_access_key_id': response['Credentials']['AccessKeyId'],
|
||||
'aws_secret_access_key': response['Credentials']['SecretAccessKey'],
|
||||
'aws_session_token': response['Credentials']['SessionToken']
|
||||
"aws_access_key_id": response["Credentials"]["AccessKeyId"],
|
||||
"aws_secret_access_key": response["Credentials"]["SecretAccessKey"],
|
||||
"aws_session_token": response["Credentials"]["SessionToken"],
|
||||
}
|
||||
current_time = toUTC(datetime.now())
|
||||
# Let's remove 3 seconds from the flush wait time just in case
|
||||
self.flush_wait_time = (response['Credentials']['Expiration'] - current_time).seconds - 3
|
||||
self.flush_wait_time = (response["Credentials"]["Expiration"] - current_time).seconds - 3
|
||||
else:
|
||||
role_creds = {}
|
||||
role_creds['region_name'] = options.region
|
||||
self.s3_client = boto3.client(
|
||||
's3',
|
||||
**get_aws_credentials(**role_creds)
|
||||
)
|
||||
role_creds["region_name"] = options.region
|
||||
self.s3_client = boto3.client("s3", **get_aws_credentials(**role_creds))
|
||||
|
||||
def reauth_timer(self):
|
||||
while True:
|
||||
time.sleep(self.flush_wait_time)
|
||||
logger.debug('Recycling credentials and reassuming role')
|
||||
logger.debug("Recycling credentials and reassuming role")
|
||||
self.authenticate()
|
||||
|
||||
def parse_s3_file(self, s3_obj):
|
||||
compressed_data = s3_obj['Body'].read()
|
||||
compressed_data = s3_obj["Body"].read()
|
||||
databuf = BytesIO(compressed_data)
|
||||
gzip_file = gzip.GzipFile(fileobj=databuf)
|
||||
json_logs = json.loads(gzip_file.read())
|
||||
return json_logs['Records']
|
||||
return json_logs["Records"]
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -254,39 +246,39 @@ class taskConsumer(object):
|
|||
body_message = msg.body
|
||||
event = json.loads(body_message)
|
||||
|
||||
if not event['Message']:
|
||||
logger.error('Invalid message format for cloudtrail SQS messages')
|
||||
logger.error('Malformed Message: %r' % body_message)
|
||||
if not event["Message"]:
|
||||
logger.error("Invalid message format for cloudtrail SQS messages")
|
||||
logger.error("Malformed Message: %r" % body_message)
|
||||
continue
|
||||
|
||||
if event['Message'] == 'CloudTrail validation message.':
|
||||
if event["Message"] == "CloudTrail validation message.":
|
||||
# We don't care about these messages
|
||||
continue
|
||||
|
||||
message_json = json.loads(event['Message'])
|
||||
message_json = json.loads(event["Message"])
|
||||
|
||||
if 's3ObjectKey' not in message_json:
|
||||
logger.error('Invalid message format, expecting an s3ObjectKey in Message')
|
||||
logger.error('Malformed Message: %r' % body_message)
|
||||
if "s3ObjectKey" not in message_json:
|
||||
logger.error("Invalid message format, expecting an s3ObjectKey in Message")
|
||||
logger.error("Malformed Message: %r" % body_message)
|
||||
continue
|
||||
|
||||
s3_log_files = message_json['s3ObjectKey']
|
||||
s3_log_files = message_json["s3ObjectKey"]
|
||||
for log_file in s3_log_files:
|
||||
logger.debug('Downloading and parsing ' + log_file)
|
||||
s3_obj = self.s3_client.get_object(Bucket=message_json['s3Bucket'], Key=log_file)
|
||||
logger.debug("Downloading and parsing " + log_file)
|
||||
s3_obj = self.s3_client.get_object(Bucket=message_json["s3Bucket"], Key=log_file)
|
||||
events = self.parse_s3_file(s3_obj)
|
||||
for event in events:
|
||||
self.on_message(event)
|
||||
|
||||
msg.delete()
|
||||
except (SSLEOFError, SSLError, socket.error):
|
||||
logger.info('Received network related error...reconnecting')
|
||||
logger.info("Received network related error...reconnecting")
|
||||
time.sleep(5)
|
||||
self.sqs_queue = connect_sqs(
|
||||
region_name=options.region,
|
||||
aws_access_key_id=options.accesskey,
|
||||
aws_secret_access_key=options.secretkey,
|
||||
task_exchange=options.taskexchange
|
||||
task_exchange=options.taskexchange,
|
||||
)
|
||||
time.sleep(options.sleep_time)
|
||||
|
||||
|
@ -294,16 +286,13 @@ class taskConsumer(object):
|
|||
# print("RECEIVED MESSAGE: %r" % (body, ))
|
||||
try:
|
||||
# default elastic search metadata for an event
|
||||
metadata = {
|
||||
'index': 'events',
|
||||
'id': None
|
||||
}
|
||||
metadata = {"index": "events", "id": None}
|
||||
# just to be safe..check what we were sent.
|
||||
if isinstance(body, dict):
|
||||
bodyDict = body
|
||||
elif isinstance(body, str):
|
||||
try:
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
except ValueError as e:
|
||||
# not json..ack but log the message
|
||||
logger.error("Unknown body type received %r" % body)
|
||||
|
@ -312,7 +301,7 @@ class taskConsumer(object):
|
|||
logger.error("Unknown body type received %r\n" % body)
|
||||
return
|
||||
|
||||
if 'customendpoint' in bodyDict and bodyDict['customendpoint']:
|
||||
if "customendpoint" in bodyDict and bodyDict["customendpoint"]:
|
||||
# custom document
|
||||
# send to plugins to allow them to modify it if needed
|
||||
(normalizedDict, metadata) = sendEventToPlugins(bodyDict, metadata, pluginList)
|
||||
|
@ -339,32 +328,25 @@ class taskConsumer(object):
|
|||
bulk = True
|
||||
|
||||
bulk = False
|
||||
self.esConnection.save_event(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
self.esConnection.save_event(index=metadata["index"], doc_id=metadata["id"], body=jbody, bulk=bulk)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException):
|
||||
logger.exception(
|
||||
"ElasticSearchException: {0} reported while indexing event, messages lost".format(e)
|
||||
)
|
||||
return
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
logger.exception('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
return
|
||||
logger.exception("ElasticSearchException: {0} reported while indexing event, messages lost".format(e))
|
||||
return
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.error('Malformed message: %r' % body)
|
||||
logger.error("Malformed message: %r" % body)
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -374,17 +356,17 @@ def main():
|
|||
if hasUWSGI:
|
||||
logger.info("started as uwsgi mule {0}".format(uwsgi.mule_id()))
|
||||
else:
|
||||
logger.info('started without uwsgi')
|
||||
logger.info("started without uwsgi")
|
||||
|
||||
if options.mqprotocol not in ('sqs'):
|
||||
logger.error('Can only process SQS queues, terminating')
|
||||
if options.mqprotocol not in ("sqs"):
|
||||
logger.error("Can only process SQS queues, terminating")
|
||||
sys.exit(1)
|
||||
|
||||
sqs_queue = connect_sqs(
|
||||
region_name=options.region,
|
||||
aws_access_key_id=options.accesskey,
|
||||
aws_secret_access_key=options.secretkey,
|
||||
task_exchange=options.taskexchange
|
||||
task_exchange=options.taskexchange,
|
||||
)
|
||||
# consume our queue
|
||||
taskConsumer(sqs_queue, es).run()
|
||||
|
@ -392,49 +374,51 @@ def main():
|
|||
|
||||
def initConfig():
|
||||
# capture the hostname
|
||||
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
|
||||
options.mozdefhostname = getConfig("mozdefhostname", socket.gethostname(), options.configfile)
|
||||
|
||||
# output our log to stdout or syslog
|
||||
options.output = getConfig('output', 'stdout', options.configfile)
|
||||
options.sysloghostname = getConfig('sysloghostname', 'localhost', options.configfile)
|
||||
options.syslogport = getConfig('syslogport', 514, options.configfile)
|
||||
options.output = getConfig("output", "stdout", options.configfile)
|
||||
options.sysloghostname = getConfig("sysloghostname", "localhost", options.configfile)
|
||||
options.syslogport = getConfig("syslogport", 514, options.configfile)
|
||||
|
||||
# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
|
||||
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
|
||||
options.esbulksize = getConfig('esbulksize', 0, options.configfile)
|
||||
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
|
||||
options.esservers = list(getConfig("esservers", "http://localhost:9200", options.configfile).split(","))
|
||||
options.esbulksize = getConfig("esbulksize", 0, options.configfile)
|
||||
options.esbulktimeout = getConfig("esbulktimeout", 30, options.configfile)
|
||||
|
||||
# set to sqs for Amazon
|
||||
options.mqprotocol = getConfig('mqprotocol', 'sqs', options.configfile)
|
||||
options.mqprotocol = getConfig("mqprotocol", "sqs", options.configfile)
|
||||
|
||||
# rabbit message queue options
|
||||
options.mqserver = getConfig('mqserver', 'localhost', options.configfile)
|
||||
options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile)
|
||||
options.mqserver = getConfig("mqserver", "localhost", options.configfile)
|
||||
options.taskexchange = getConfig("taskexchange", "eventtask", options.configfile)
|
||||
# rabbit: how many messages to ask for at once from the message queue
|
||||
options.prefetch = getConfig('prefetch', 10, options.configfile)
|
||||
options.prefetch = getConfig("prefetch", 10, options.configfile)
|
||||
# rabbit: user creds
|
||||
options.mquser = getConfig('mquser', 'guest', options.configfile)
|
||||
options.mqpassword = getConfig('mqpassword', 'guest', options.configfile)
|
||||
options.mquser = getConfig("mquser", "guest", options.configfile)
|
||||
options.mqpassword = getConfig("mqpassword", "guest", options.configfile)
|
||||
# rabbit: port/vhost
|
||||
options.mqport = getConfig('mqport', 5672, options.configfile)
|
||||
options.mqvhost = getConfig('mqvhost', '/', options.configfile)
|
||||
options.mqport = getConfig("mqport", 5672, options.configfile)
|
||||
options.mqvhost = getConfig("mqvhost", "/", options.configfile)
|
||||
|
||||
# aws options
|
||||
options.accesskey = getConfig('accesskey', '', options.configfile)
|
||||
options.secretkey = getConfig('secretkey', '', options.configfile)
|
||||
options.region = getConfig('region', '', options.configfile)
|
||||
options.accesskey = getConfig("accesskey", "", options.configfile)
|
||||
options.secretkey = getConfig("secretkey", "", options.configfile)
|
||||
options.region = getConfig("region", "", options.configfile)
|
||||
|
||||
# This is the full ARN that the s3 bucket lives under
|
||||
options.cloudtrail_arn = getConfig('cloudtrail_arn', 'cloudtrail_arn', options.configfile)
|
||||
options.cloudtrail_arn = getConfig("cloudtrail_arn", "cloudtrail_arn", options.configfile)
|
||||
|
||||
# How long to sleep between iterations of querying AWS
|
||||
options.sleep_time = getConfig('sleep_time', 0.1, options.configfile)
|
||||
options.sleep_time = getConfig("sleep_time", 0.1, options.configfile)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# configure ourselves
|
||||
parser = OptionParser()
|
||||
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
|
||||
parser.add_option(
|
||||
"-c", dest="configfile", default=sys.argv[0].replace(".py", ".conf"), help="configuration file to use"
|
||||
)
|
||||
(options, args) = parser.parse_args()
|
||||
initConfig()
|
||||
initLogger(options)
|
||||
|
|
|
@ -15,7 +15,12 @@ from datetime import datetime
|
|||
from kombu import Connection, Queue, Exchange
|
||||
from kombu.mixins import ConsumerMixin
|
||||
|
||||
from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
from mozdef_util.elasticsearch_client import (
|
||||
ElasticsearchClient,
|
||||
ElasticsearchBadServer,
|
||||
ElasticsearchInvalidIndex,
|
||||
ElasticsearchException,
|
||||
)
|
||||
|
||||
from mozdef_util.utilities.toUTC import toUTC
|
||||
from mozdef_util.utilities.logger import logger, initLogger
|
||||
|
@ -28,154 +33,156 @@ from lib.plugins import sendEventToPlugins, registerPlugins
|
|||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
|
||||
hasUWSGI = True
|
||||
except ImportError as e:
|
||||
hasUWSGI = False
|
||||
|
||||
|
||||
def keyMapping(aDict):
|
||||
'''map common key/fields to a normalized structure,
|
||||
"""map common key/fields to a normalized structure,
|
||||
explicitly typed when possible to avoid schema changes for upsteam consumers
|
||||
Special accomodations made for logstash,nxlog, beaver, heka and CEF
|
||||
Some shippers attempt to conform to logstash-style @fieldname convention.
|
||||
This strips the leading at symbol since it breaks some elastic search
|
||||
libraries like elasticutils.
|
||||
'''
|
||||
"""
|
||||
returndict = dict()
|
||||
|
||||
# uncomment to save the source event for debugging, or chain of custody/forensics
|
||||
# returndict['original']=aDict
|
||||
|
||||
# set the timestamp when we received it, i.e. now
|
||||
returndict['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict['mozdefhostname'] = options.mozdefhostname
|
||||
returndict['details'] = {}
|
||||
returndict["receivedtimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
returndict["mozdefhostname"] = options.mozdefhostname
|
||||
returndict["details"] = {}
|
||||
try:
|
||||
for k, v in aDict.items():
|
||||
k = removeAt(k).lower()
|
||||
|
||||
if k == 'sourceip':
|
||||
returndict['details']['eventsourceipaddress'] = v
|
||||
if k == "sourceip":
|
||||
returndict["details"]["eventsourceipaddress"] = v
|
||||
|
||||
if k in ('facility', 'source'):
|
||||
returndict['source'] = v
|
||||
if k in ("facility", "source"):
|
||||
returndict["source"] = v
|
||||
|
||||
if k in ('message', 'summary'):
|
||||
returndict['summary'] = toUnicode(v)
|
||||
if k in ("message", "summary"):
|
||||
returndict["summary"] = toUnicode(v)
|
||||
|
||||
if k in ('payload') and 'summary' not in aDict:
|
||||
if k in ("payload") and "summary" not in aDict:
|
||||
# special case for heka if it sends payload as well as a summary, keep both but move payload to the details section.
|
||||
returndict['summary'] = toUnicode(v)
|
||||
elif k in ('payload'):
|
||||
returndict['details']['payload'] = toUnicode(v)
|
||||
returndict["summary"] = toUnicode(v)
|
||||
elif k in ("payload"):
|
||||
returndict["details"]["payload"] = toUnicode(v)
|
||||
|
||||
if k in ('eventtime', 'timestamp', 'utctimestamp', 'date'):
|
||||
returndict['utctimestamp'] = toUTC(v).isoformat()
|
||||
returndict['timestamp'] = toUTC(v).isoformat()
|
||||
if k in ("eventtime", "timestamp", "utctimestamp", "date"):
|
||||
returndict["utctimestamp"] = toUTC(v).isoformat()
|
||||
returndict["timestamp"] = toUTC(v).isoformat()
|
||||
|
||||
if k in ('hostname', 'source_host', 'host'):
|
||||
returndict['hostname'] = toUnicode(v)
|
||||
if k in ("hostname", "source_host", "host"):
|
||||
returndict["hostname"] = toUnicode(v)
|
||||
|
||||
if k in ('tags'):
|
||||
if 'tags' not in returndict:
|
||||
returndict['tags'] = []
|
||||
if k in ("tags"):
|
||||
if "tags" not in returndict:
|
||||
returndict["tags"] = []
|
||||
if type(v) == list:
|
||||
returndict['tags'] += v
|
||||
returndict["tags"] += v
|
||||
else:
|
||||
if len(v) > 0:
|
||||
returndict['tags'].append(v)
|
||||
returndict["tags"].append(v)
|
||||
|
||||
# nxlog keeps the severity name in syslogseverity,everyone else should use severity or level.
|
||||
if k in ('syslogseverity', 'severity', 'severityvalue', 'level', 'priority'):
|
||||
returndict['severity'] = toUnicode(v).upper()
|
||||
if k in ("syslogseverity", "severity", "severityvalue", "level", "priority"):
|
||||
returndict["severity"] = toUnicode(v).upper()
|
||||
|
||||
if k in ('facility', 'syslogfacility'):
|
||||
returndict['facility'] = toUnicode(v)
|
||||
if k in ("facility", "syslogfacility"):
|
||||
returndict["facility"] = toUnicode(v)
|
||||
|
||||
if k in ('pid', 'processid'):
|
||||
returndict['processid'] = toUnicode(v)
|
||||
if k in ("pid", "processid"):
|
||||
returndict["processid"] = toUnicode(v)
|
||||
|
||||
# nxlog sets sourcename to the processname (i.e. sshd), everyone else should call it process name or pname
|
||||
if k in ('pname', 'processname', 'sourcename', 'program'):
|
||||
returndict['processname'] = toUnicode(v)
|
||||
if k in ("pname", "processname", "sourcename", "program"):
|
||||
returndict["processname"] = toUnicode(v)
|
||||
|
||||
# the file, or source
|
||||
if k in ('path', 'logger', 'file'):
|
||||
returndict['eventsource'] = toUnicode(v)
|
||||
if k in ("path", "logger", "file"):
|
||||
returndict["eventsource"] = toUnicode(v)
|
||||
|
||||
if k in ('type', 'eventtype', 'category'):
|
||||
returndict['category'] = toUnicode(v)
|
||||
if k in ("type", "eventtype", "category"):
|
||||
returndict["category"] = toUnicode(v)
|
||||
|
||||
# custom fields as a list/array
|
||||
if k in ('fields', 'details'):
|
||||
if k in ("fields", "details"):
|
||||
if type(v) is not dict:
|
||||
returndict['details']['message'] = v
|
||||
returndict["details"]["message"] = v
|
||||
else:
|
||||
if len(v) > 0:
|
||||
for details_key, details_value in v.items():
|
||||
returndict['details'][details_key] = details_value
|
||||
returndict["details"][details_key] = details_value
|
||||
|
||||
# custom fields/details as a one off, not in an array
|
||||
# i.e. fields.something=value or details.something=value
|
||||
# move them to a dict for consistency in querying
|
||||
if k.startswith('fields.') or k.startswith('details.'):
|
||||
newName = k.replace('fields.', '')
|
||||
newName = newName.lower().replace('details.', '')
|
||||
if k.startswith("fields.") or k.startswith("details."):
|
||||
newName = k.replace("fields.", "")
|
||||
newName = newName.lower().replace("details.", "")
|
||||
# add field with a special case for shippers that
|
||||
# don't send details
|
||||
# in an array as int/floats/strings
|
||||
# we let them dictate the data type with field_datatype
|
||||
# convention
|
||||
if newName.endswith('_int'):
|
||||
returndict['details'][str(newName)] = int(v)
|
||||
elif newName.endswith('_float'):
|
||||
returndict['details'][str(newName)] = float(v)
|
||||
if newName.endswith("_int"):
|
||||
returndict["details"][str(newName)] = int(v)
|
||||
elif newName.endswith("_float"):
|
||||
returndict["details"][str(newName)] = float(v)
|
||||
else:
|
||||
returndict['details'][str(newName)] = toUnicode(v)
|
||||
returndict["details"][str(newName)] = toUnicode(v)
|
||||
|
||||
# nxlog windows log handling
|
||||
if 'Domain' in aDict and 'SourceModuleType' in aDict:
|
||||
if "Domain" in aDict and "SourceModuleType" in aDict:
|
||||
# nxlog parses all windows event fields very well
|
||||
# copy all fields to details
|
||||
returndict['details'][k] = v
|
||||
returndict["details"][k] = v
|
||||
|
||||
if 'utctimestamp' not in returndict:
|
||||
if "utctimestamp" not in returndict:
|
||||
# default in case we don't find a reasonable timestamp
|
||||
returndict['utctimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict["utctimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
|
||||
if 'type' not in returndict:
|
||||
if "type" not in returndict:
|
||||
# default replacement for old _type subcategory.
|
||||
# to preserve filtering capabilities
|
||||
returndict['type'] = 'event'
|
||||
returndict["type"] = "event"
|
||||
|
||||
except Exception as e:
|
||||
logger.exception('Received exception while normalizing message: %r' % e)
|
||||
logger.error('Malformed message: %r' % aDict)
|
||||
logger.exception("Received exception while normalizing message: %r" % e)
|
||||
logger.error("Malformed message: %r" % aDict)
|
||||
return None
|
||||
|
||||
return returndict
|
||||
|
||||
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
"""open or re-open a connection to elastic search"""
|
||||
return ElasticsearchClient((list("{0}".format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(ConsumerMixin):
|
||||
|
||||
def __init__(self, mqConnection, taskQueue, topicExchange, esConnection):
|
||||
self.connection = mqConnection
|
||||
self.esConnection = esConnection
|
||||
self.taskQueue = taskQueue
|
||||
self.topicExchange = topicExchange
|
||||
self.mqproducer = self.connection.Producer(serializer='json')
|
||||
self.mqproducer = self.connection.Producer(serializer="json")
|
||||
if hasUWSGI:
|
||||
self.muleid = uwsgi.mule_id()
|
||||
else:
|
||||
self.muleid = 0
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
consumer = Consumer(self.taskQueue, callbacks=[self.on_message], accept=['json', 'text/plain'], no_ack=(not options.mqack))
|
||||
consumer = Consumer(
|
||||
self.taskQueue, callbacks=[self.on_message], accept=["json", "text/plain"], no_ack=(not options.mqack)
|
||||
)
|
||||
consumer.qos(prefetch_count=options.prefetch)
|
||||
return [consumer]
|
||||
|
||||
|
@ -183,16 +190,13 @@ class taskConsumer(ConsumerMixin):
|
|||
# print("RECEIVED MESSAGE: %r" % (body, ))
|
||||
try:
|
||||
# default elastic search metadata for an event
|
||||
metadata = {
|
||||
'index': 'events',
|
||||
'id': None
|
||||
}
|
||||
metadata = {"index": "events", "id": None}
|
||||
# just to be safe..check what we were sent.
|
||||
if isinstance(body, dict):
|
||||
bodyDict = body
|
||||
elif isinstance(body, str):
|
||||
try:
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
except ValueError as e:
|
||||
# not json..ack but log the message
|
||||
logger.error("Exception: unknown body type received: %r" % body)
|
||||
|
@ -203,7 +207,7 @@ class taskConsumer(ConsumerMixin):
|
|||
message.ack()
|
||||
return
|
||||
|
||||
if 'customendpoint' in bodyDict and bodyDict['customendpoint']:
|
||||
if "customendpoint" in bodyDict and bodyDict["customendpoint"]:
|
||||
# custom document
|
||||
# send to plugins to allow them to modify it if needed
|
||||
(normalizedDict, metadata) = sendEventToPlugins(bodyDict, metadata, pluginList)
|
||||
|
@ -230,12 +234,7 @@ class taskConsumer(ConsumerMixin):
|
|||
if options.esbulksize != 0:
|
||||
bulk = True
|
||||
|
||||
self.esConnection.save_event(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
self.esConnection.save_event(index=metadata["index"], doc_id=metadata["id"], body=jbody, bulk=bulk)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
|
@ -245,16 +244,22 @@ class taskConsumer(ConsumerMixin):
|
|||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
logger.exception(
|
||||
"Elastic Search and RabbitMQ exception (messages lost) while indexing event: %r" % e
|
||||
)
|
||||
return
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
logger.exception('ElasticSearchException while indexing event: %r' % e)
|
||||
logger.error('Malformed message body: %r' % body)
|
||||
logger.exception("ElasticSearchException while indexing event: %r" % e)
|
||||
logger.error("Malformed message body: %r" % body)
|
||||
message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
logger.exception(
|
||||
"Elastic Search and RabbitMQ exception (messages lost) while indexing event: %r" % e
|
||||
)
|
||||
return
|
||||
# post the dict (kombu serializes it to json) to the events topic queue
|
||||
# using the ensure function to shortcut connection/queue drops/stalls, etc.
|
||||
|
@ -263,15 +268,17 @@ class taskConsumer(ConsumerMixin):
|
|||
message.ack()
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.error('Malformed message body: %r' % body)
|
||||
logger.error("Malformed message body: %r" % body)
|
||||
|
||||
|
||||
def main():
|
||||
# connect and declare the message queue/kombu objects.
|
||||
# only py-amqp supports ssl and doesn't recognize amqps
|
||||
# so fix up the connection string accordingly
|
||||
connString = 'amqp://{0}:{1}@{2}:{3}/{4}'.format(options.mquser, options.mqpassword, options.mqserver, options.mqport, options.mqvhost)
|
||||
if options.mqprotocol == 'amqps':
|
||||
connString = "amqp://{0}:{1}@{2}:{3}/{4}".format(
|
||||
options.mquser, options.mqpassword, options.mqserver, options.mqport, options.mqvhost
|
||||
)
|
||||
if options.mqprotocol == "amqps":
|
||||
mqSSL = True
|
||||
else:
|
||||
mqSSL = False
|
||||
|
@ -279,61 +286,75 @@ def main():
|
|||
# Task Exchange for events sent via http for us to normalize and post to elastic search
|
||||
if options.mqack:
|
||||
# conservative, store msgs to disk, ack each message
|
||||
eventTaskExchange = Exchange(name=options.taskexchange, type='direct', durable=True, delivery_mode=2)
|
||||
eventTaskExchange = Exchange(name=options.taskexchange, type="direct", durable=True, delivery_mode=2)
|
||||
else:
|
||||
# fast, transient delivery, store in memory only, auto-ack messages
|
||||
eventTaskExchange = Exchange(name=options.taskexchange, type='direct', durable=True, delivery_mode=1)
|
||||
eventTaskExchange = Exchange(name=options.taskexchange, type="direct", durable=True, delivery_mode=1)
|
||||
eventTaskExchange(mqConn).declare()
|
||||
# Queue for the exchange
|
||||
if options.mqack:
|
||||
eventTaskQueue = Queue(options.taskexchange, exchange=eventTaskExchange, routing_key=options.taskexchange, durable=True, no_ack=False)
|
||||
eventTaskQueue = Queue(
|
||||
options.taskexchange,
|
||||
exchange=eventTaskExchange,
|
||||
routing_key=options.taskexchange,
|
||||
durable=True,
|
||||
no_ack=False,
|
||||
)
|
||||
else:
|
||||
eventTaskQueue = Queue(options.taskexchange, exchange=eventTaskExchange, routing_key=options.taskexchange, durable=True, no_ack=True)
|
||||
eventTaskQueue = Queue(
|
||||
options.taskexchange,
|
||||
exchange=eventTaskExchange,
|
||||
routing_key=options.taskexchange,
|
||||
durable=True,
|
||||
no_ack=True,
|
||||
)
|
||||
eventTaskQueue(mqConn).declare()
|
||||
|
||||
# topic exchange for anyone who wants to queue and listen for mozdef.event
|
||||
eventTopicExchange = Exchange(name=options.eventexchange, type='topic', durable=False, delivery_mode=1)
|
||||
eventTopicExchange = Exchange(name=options.eventexchange, type="topic", durable=False, delivery_mode=1)
|
||||
eventTopicExchange(mqConn).declare()
|
||||
|
||||
if hasUWSGI:
|
||||
logger.info("started as uwsgi mule {0}".format(uwsgi.mule_id()))
|
||||
else:
|
||||
logger.info('started without uwsgi')
|
||||
logger.info("started without uwsgi")
|
||||
# consume our queue and publish on the topic exchange
|
||||
taskConsumer(mqConn, eventTaskQueue, eventTopicExchange, es).run()
|
||||
|
||||
|
||||
def initConfig():
|
||||
# capture the hostname
|
||||
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
|
||||
options.mozdefhostname = getConfig("mozdefhostname", socket.gethostname(), options.configfile)
|
||||
|
||||
# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
|
||||
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
|
||||
options.esbulksize = getConfig('esbulksize', 0, options.configfile)
|
||||
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
|
||||
options.esservers = list(getConfig("esservers", "http://localhost:9200", options.configfile).split(","))
|
||||
options.esbulksize = getConfig("esbulksize", 0, options.configfile)
|
||||
options.esbulktimeout = getConfig("esbulktimeout", 30, options.configfile)
|
||||
|
||||
# message queue options
|
||||
options.mqserver = getConfig('mqserver', 'localhost', options.configfile)
|
||||
options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile)
|
||||
options.eventexchange = getConfig('eventexchange', 'events', options.configfile)
|
||||
options.mqserver = getConfig("mqserver", "localhost", options.configfile)
|
||||
options.taskexchange = getConfig("taskexchange", "eventtask", options.configfile)
|
||||
options.eventexchange = getConfig("eventexchange", "events", options.configfile)
|
||||
# how many messages to ask for at once from the message queue
|
||||
options.prefetch = getConfig('prefetch', 50, options.configfile)
|
||||
options.mquser = getConfig('mquser', 'guest', options.configfile)
|
||||
options.mqpassword = getConfig('mqpassword', 'guest', options.configfile)
|
||||
options.mqport = getConfig('mqport', 5672, options.configfile)
|
||||
options.mqvhost = getConfig('mqvhost', '/', options.configfile)
|
||||
options.prefetch = getConfig("prefetch", 50, options.configfile)
|
||||
options.mquser = getConfig("mquser", "guest", options.configfile)
|
||||
options.mqpassword = getConfig("mqpassword", "guest", options.configfile)
|
||||
options.mqport = getConfig("mqport", 5672, options.configfile)
|
||||
options.mqvhost = getConfig("mqvhost", "/", options.configfile)
|
||||
# set to either amqp or amqps for ssl
|
||||
options.mqprotocol = getConfig('mqprotocol', 'amqp', options.configfile)
|
||||
options.mqprotocol = getConfig("mqprotocol", "amqp", options.configfile)
|
||||
# run with message acking?
|
||||
# also toggles transient/persistant delivery (messages in memory only or stored on disk)
|
||||
# ack=True sets persistant delivery, False sets transient delivery
|
||||
options.mqack = getConfig('mqack', True, options.configfile)
|
||||
options.mqack = getConfig("mqack", True, options.configfile)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# configure ourselves
|
||||
parser = OptionParser()
|
||||
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
|
||||
parser.add_option(
|
||||
"-c", dest="configfile", default=sys.argv[0].replace(".py", ".conf"), help="configuration file to use"
|
||||
)
|
||||
(options, args) = parser.parse_args()
|
||||
initConfig()
|
||||
initLogger(options)
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
|
||||
|
||||
import json
|
||||
import kombu
|
||||
import sys
|
||||
import socket
|
||||
import time
|
||||
|
@ -19,7 +18,12 @@ from datetime import datetime, timedelta
|
|||
import calendar
|
||||
import requests
|
||||
|
||||
from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
from mozdef_util.elasticsearch_client import (
|
||||
ElasticsearchClient,
|
||||
ElasticsearchBadServer,
|
||||
ElasticsearchInvalidIndex,
|
||||
ElasticsearchException,
|
||||
)
|
||||
|
||||
from mozdef_util.utilities.toUTC import toUTC
|
||||
from mozdef_util.utilities.to_unicode import toUnicode
|
||||
|
@ -32,39 +36,39 @@ from lib.plugins import sendEventToPlugins, registerPlugins
|
|||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
|
||||
hasUWSGI = True
|
||||
except ImportError as e:
|
||||
hasUWSGI = False
|
||||
|
||||
|
||||
class PTRequestor(object):
|
||||
|
||||
def __init__(self, apikey, evmax=2000):
|
||||
self._papertrail_api = 'https://papertrailapp.com/api/v1/events/search.json'
|
||||
self._papertrail_api = "https://papertrailapp.com/api/v1/events/search.json"
|
||||
self._apikey = apikey
|
||||
self._events = {}
|
||||
self._evmax = evmax
|
||||
self._evidcache = []
|
||||
|
||||
def parse_events(self, resp):
|
||||
for x in resp['events']:
|
||||
if x['id'] in self._evidcache:
|
||||
for x in resp["events"]:
|
||||
if x["id"] in self._evidcache:
|
||||
# saw this event last time, just ignore it
|
||||
continue
|
||||
self._events[x['id']] = x
|
||||
if 'reached_record_limit' in resp and resp['reached_record_limit']:
|
||||
return resp['min_id']
|
||||
self._events[x["id"]] = x
|
||||
if "reached_record_limit" in resp and resp["reached_record_limit"]:
|
||||
return resp["min_id"]
|
||||
return None
|
||||
|
||||
def makerequest(self, query, stime, etime, maxid):
|
||||
payload = {
|
||||
'min_time': calendar.timegm(stime.utctimetuple()),
|
||||
'max_time': calendar.timegm(etime.utctimetuple()),
|
||||
'q': query
|
||||
"min_time": calendar.timegm(stime.utctimetuple()),
|
||||
"max_time": calendar.timegm(etime.utctimetuple()),
|
||||
"q": query,
|
||||
}
|
||||
if maxid is not None:
|
||||
payload['max_id'] = maxid
|
||||
hdrs = {'X-Papertrail-Token': self._apikey}
|
||||
payload["max_id"] = maxid
|
||||
hdrs = {"X-Papertrail-Token": self._apikey}
|
||||
|
||||
max_retries = 3
|
||||
total_retries = 0
|
||||
|
@ -94,7 +98,7 @@ class PTRequestor(object):
|
|||
if maxid is None:
|
||||
break
|
||||
if len(self._events.keys()) > self._evmax:
|
||||
logger.warning('papertrail esworker hitting event request limit')
|
||||
logger.warning("papertrail esworker hitting event request limit")
|
||||
break
|
||||
# cache event ids we return to allow for some duplicate filtering checks
|
||||
# during next run
|
||||
|
@ -103,130 +107,130 @@ class PTRequestor(object):
|
|||
|
||||
|
||||
def keyMapping(aDict):
|
||||
'''map common key/fields to a normalized structure,
|
||||
"""map common key/fields to a normalized structure,
|
||||
explicitly typed when possible to avoid schema changes for upsteam consumers
|
||||
Special accomodations made for logstash,nxlog, beaver, heka and CEF
|
||||
Some shippers attempt to conform to logstash-style @fieldname convention.
|
||||
This strips the leading at symbol since it breaks some elastic search
|
||||
libraries like elasticutils.
|
||||
'''
|
||||
"""
|
||||
returndict = dict()
|
||||
|
||||
# uncomment to save the source event for debugging, or chain of custody/forensics
|
||||
# returndict['original']=aDict
|
||||
|
||||
# set the timestamp when we received it, i.e. now
|
||||
returndict['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict['mozdefhostname'] = options.mozdefhostname
|
||||
returndict['details'] = {}
|
||||
returndict["receivedtimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
returndict["mozdefhostname"] = options.mozdefhostname
|
||||
returndict["details"] = {}
|
||||
try:
|
||||
for k, v in aDict.items():
|
||||
k = removeAt(k).lower()
|
||||
|
||||
if k in ('message', 'summary'):
|
||||
returndict['summary'] = toUnicode(v)
|
||||
if k in ("message", "summary"):
|
||||
returndict["summary"] = toUnicode(v)
|
||||
|
||||
if k in ('payload') and 'summary' not in aDict:
|
||||
if k in ("payload") and "summary" not in aDict:
|
||||
# special case for heka if it sends payload as well as a summary, keep both but move payload to the details section.
|
||||
returndict['summary'] = toUnicode(v)
|
||||
elif k in ('payload'):
|
||||
returndict['details']['payload'] = toUnicode(v)
|
||||
returndict["summary"] = toUnicode(v)
|
||||
elif k in ("payload"):
|
||||
returndict["details"]["payload"] = toUnicode(v)
|
||||
|
||||
if k in ('eventtime', 'timestamp', 'utctimestamp'):
|
||||
returndict['utctimestamp'] = toUTC(v).isoformat()
|
||||
returndict['timestamp'] = toUTC(v).isoformat()
|
||||
if k in ("eventtime", "timestamp", "utctimestamp"):
|
||||
returndict["utctimestamp"] = toUTC(v).isoformat()
|
||||
returndict["timestamp"] = toUTC(v).isoformat()
|
||||
|
||||
if k in ('hostname', 'source_host', 'host'):
|
||||
returndict['hostname'] = toUnicode(v)
|
||||
if k in ("hostname", "source_host", "host"):
|
||||
returndict["hostname"] = toUnicode(v)
|
||||
|
||||
if k in ('tags'):
|
||||
if k in ("tags"):
|
||||
if len(v) > 0:
|
||||
returndict['tags'] = v
|
||||
returndict["tags"] = v
|
||||
|
||||
# nxlog keeps the severity name in syslogseverity,everyone else should use severity or level.
|
||||
if k in ('syslogseverity', 'severity', 'severityvalue', 'level'):
|
||||
returndict['severity'] = toUnicode(v).upper()
|
||||
if k in ("syslogseverity", "severity", "severityvalue", "level"):
|
||||
returndict["severity"] = toUnicode(v).upper()
|
||||
|
||||
if k in ('facility', 'syslogfacility','source'):
|
||||
returndict['source'] = toUnicode(v)
|
||||
if k in ("facility", "syslogfacility", "source"):
|
||||
returndict["source"] = toUnicode(v)
|
||||
|
||||
if k in ('pid', 'processid'):
|
||||
returndict['processid'] = toUnicode(v)
|
||||
if k in ("pid", "processid"):
|
||||
returndict["processid"] = toUnicode(v)
|
||||
|
||||
# nxlog sets sourcename to the processname (i.e. sshd), everyone else should call it process name or pname
|
||||
if k in ('pname', 'processname', 'sourcename'):
|
||||
returndict['processname'] = toUnicode(v)
|
||||
if k in ("pname", "processname", "sourcename"):
|
||||
returndict["processname"] = toUnicode(v)
|
||||
|
||||
# the file, or source
|
||||
if k in ('path', 'logger', 'file'):
|
||||
returndict['eventsource'] = toUnicode(v)
|
||||
if k in ("path", "logger", "file"):
|
||||
returndict["eventsource"] = toUnicode(v)
|
||||
|
||||
if k in ('type', 'eventtype', 'category'):
|
||||
returndict['category'] = toUnicode(v)
|
||||
if k in ("type", "eventtype", "category"):
|
||||
returndict["category"] = toUnicode(v)
|
||||
|
||||
# custom fields as a list/array
|
||||
if k in ('fields', 'details'):
|
||||
if k in ("fields", "details"):
|
||||
if type(v) is not dict:
|
||||
returndict['details']['message'] = v
|
||||
returndict["details"]["message"] = v
|
||||
else:
|
||||
if len(v) > 0:
|
||||
for details_key, details_value in v.items():
|
||||
returndict['details'][details_key] = details_value
|
||||
returndict["details"][details_key] = details_value
|
||||
|
||||
# custom fields/details as a one off, not in an array
|
||||
# i.e. fields.something=value or details.something=value
|
||||
# move them to a dict for consistency in querying
|
||||
if k.startswith('fields.') or k.startswith('details.'):
|
||||
newName = k.replace('fields.', '')
|
||||
newName = newName.lower().replace('details.', '')
|
||||
if k.startswith("fields.") or k.startswith("details."):
|
||||
newName = k.replace("fields.", "")
|
||||
newName = newName.lower().replace("details.", "")
|
||||
# add field with a special case for shippers that
|
||||
# don't send details
|
||||
# in an array as int/floats/strings
|
||||
# we let them dictate the data type with field_datatype
|
||||
# convention
|
||||
if newName.endswith('_int'):
|
||||
returndict['details'][str(newName)] = int(v)
|
||||
elif newName.endswith('_float'):
|
||||
returndict['details'][str(newName)] = float(v)
|
||||
if newName.endswith("_int"):
|
||||
returndict["details"][str(newName)] = int(v)
|
||||
elif newName.endswith("_float"):
|
||||
returndict["details"][str(newName)] = float(v)
|
||||
else:
|
||||
returndict['details'][str(newName)] = toUnicode(v)
|
||||
returndict["details"][str(newName)] = toUnicode(v)
|
||||
|
||||
# nxlog windows log handling
|
||||
if 'Domain' in aDict and 'SourceModuleType' in aDict:
|
||||
if "Domain" in aDict and "SourceModuleType" in aDict:
|
||||
# nxlog parses all windows event fields very well
|
||||
# copy all fields to details
|
||||
returndict['details'][k] = v
|
||||
returndict["details"][k] = v
|
||||
|
||||
if 'utctimestamp' not in returndict:
|
||||
if "utctimestamp" not in returndict:
|
||||
# default in case we don't find a reasonable timestamp
|
||||
returndict['utctimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict["utctimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
|
||||
if 'type' not in returndict:
|
||||
if "type" not in returndict:
|
||||
# default replacement for old _type subcategory.
|
||||
# to preserve filtering capabilities
|
||||
returndict['type'] = 'event'
|
||||
returndict["type"] = "event"
|
||||
|
||||
except Exception as e:
|
||||
logger.exception('Received exception while normalizing message: %r' % e)
|
||||
logger.error('Malformed message: %r' % aDict)
|
||||
logger.exception("Received exception while normalizing message: %r" % e)
|
||||
logger.error("Malformed message: %r" % aDict)
|
||||
return None
|
||||
|
||||
return returndict
|
||||
|
||||
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
"""open or re-open a connection to elastic search"""
|
||||
return ElasticsearchClient((list("{0}".format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(object):
|
||||
|
||||
def __init__(self, ptRequestor, esConnection):
|
||||
self.ptrequestor = ptRequestor
|
||||
self.esConnection = esConnection
|
||||
# calculate our initial request window
|
||||
self.lastRequestTime = toUTC(datetime.now()) - timedelta(seconds=options.sleep_time) - \
|
||||
timedelta(seconds=options.ptbackoff)
|
||||
self.lastRequestTime = (
|
||||
toUTC(datetime.now()) - timedelta(seconds=options.sleep_time) - timedelta(seconds=options.ptbackoff)
|
||||
)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -240,25 +244,25 @@ class taskConsumer(object):
|
|||
|
||||
# strip any line feeds from the message itself, we just convert them
|
||||
# into spaces
|
||||
msgdict['message'] = msgdict['message'].replace('\n', ' ').replace('\r', '')
|
||||
msgdict["message"] = msgdict["message"].replace("\n", " ").replace("\r", "")
|
||||
|
||||
event = dict()
|
||||
event['tags'] = ['papertrail', options.ptacctname]
|
||||
event['details'] = msgdict
|
||||
event["tags"] = ["papertrail", options.ptacctname]
|
||||
event["details"] = msgdict
|
||||
|
||||
if 'generated_at' in event['details']:
|
||||
event['utctimestamp'] = toUTC(event['details']['generated_at']).isoformat()
|
||||
if 'hostname' in event['details']:
|
||||
event['hostname'] = event['details']['hostname']
|
||||
if 'message' in event['details']:
|
||||
event['summary'] = event['details']['message']
|
||||
if 'severity' in event['details']:
|
||||
event['severity'] = event['details']['severity']
|
||||
if 'source_ip' in event['details']:
|
||||
event['sourceipaddress'] = event['details']['source_ip']
|
||||
if "generated_at" in event["details"]:
|
||||
event["utctimestamp"] = toUTC(event["details"]["generated_at"]).isoformat()
|
||||
if "hostname" in event["details"]:
|
||||
event["hostname"] = event["details"]["hostname"]
|
||||
if "message" in event["details"]:
|
||||
event["summary"] = event["details"]["message"]
|
||||
if "severity" in event["details"]:
|
||||
event["severity"] = event["details"]["severity"]
|
||||
if "source_ip" in event["details"]:
|
||||
event["sourceipaddress"] = event["details"]["source_ip"]
|
||||
else:
|
||||
event['severity'] = 'INFO'
|
||||
event['category'] = 'syslog'
|
||||
event["severity"] = "INFO"
|
||||
event["category"] = "syslog"
|
||||
|
||||
# process message
|
||||
self.on_message(event, msgdict)
|
||||
|
@ -266,22 +270,19 @@ class taskConsumer(object):
|
|||
time.sleep(options.sleep_time)
|
||||
|
||||
except ValueError as e:
|
||||
logger.exception('Exception while handling message: %r' % e)
|
||||
logger.exception("Exception while handling message: %r" % e)
|
||||
|
||||
def on_message(self, body, message):
|
||||
# print("RECEIVED MESSAGE: %r" % (body, ))
|
||||
try:
|
||||
# default elastic search metadata for an event
|
||||
metadata = {
|
||||
'index': 'events',
|
||||
'id': None
|
||||
}
|
||||
metadata = {"index": "events", "id": None}
|
||||
# just to be safe..check what we were sent.
|
||||
if isinstance(body, dict):
|
||||
bodyDict = body
|
||||
elif isinstance(body, str):
|
||||
try:
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
except ValueError as e:
|
||||
# not json..ack but log the message
|
||||
logger.error("esworker exception: unknown body type received %r" % body)
|
||||
|
@ -292,7 +293,7 @@ class taskConsumer(object):
|
|||
# message.ack()
|
||||
return
|
||||
|
||||
if 'customendpoint' in bodyDict and bodyDict['customendpoint']:
|
||||
if "customendpoint" in bodyDict and bodyDict["customendpoint"]:
|
||||
# custom document
|
||||
# send to plugins to allow them to modify it if needed
|
||||
(normalizedDict, metadata) = sendEventToPlugins(bodyDict, metadata, pluginList)
|
||||
|
@ -319,43 +320,36 @@ class taskConsumer(object):
|
|||
if options.esbulksize != 0:
|
||||
bulk = True
|
||||
|
||||
self.esConnection.save_event(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
self.esConnection.save_event(index=metadata["index"], doc_id=metadata["id"], body=jbody, bulk=bulk)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect()
|
||||
# messages are dropped here
|
||||
# message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException):
|
||||
logger.exception(
|
||||
"ElasticSearchException: {0} reported while indexing event, messages lost".format(e)
|
||||
)
|
||||
return
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
logger.exception('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
# message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
return
|
||||
|
||||
# message.ack()
|
||||
logger.exception("ElasticSearchException: {0} reported while indexing event, messages lost".format(e))
|
||||
# messages are dropped here
|
||||
# message.requeue()
|
||||
return
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.error('Malformed message body: %r' % body)
|
||||
logger.error("Malformed message body: %r" % body)
|
||||
|
||||
|
||||
def main():
|
||||
if hasUWSGI:
|
||||
logger.info("started as uwsgi mule {0}".format(uwsgi.mule_id()))
|
||||
else:
|
||||
logger.info('started without uwsgi')
|
||||
logger.info("started without uwsgi")
|
||||
|
||||
# establish api interface with papertrail
|
||||
ptRequestor = PTRequestor(options.ptapikey, evmax=options.ptquerymax)
|
||||
|
@ -366,28 +360,30 @@ def main():
|
|||
|
||||
def initConfig():
|
||||
# capture the hostname
|
||||
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
|
||||
options.mozdefhostname = getConfig("mozdefhostname", socket.gethostname(), options.configfile)
|
||||
|
||||
# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
|
||||
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
|
||||
options.esbulksize = getConfig('esbulksize', 0, options.configfile)
|
||||
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
|
||||
options.esservers = list(getConfig("esservers", "http://localhost:9200", options.configfile).split(","))
|
||||
options.esbulksize = getConfig("esbulksize", 0, options.configfile)
|
||||
options.esbulktimeout = getConfig("esbulktimeout", 30, options.configfile)
|
||||
|
||||
# papertrail configuration
|
||||
options.ptapikey = getConfig('papertrailapikey', 'none', options.configfile)
|
||||
options.ptquery = getConfig('papertrailquery', '', options.configfile)
|
||||
options.ptbackoff = getConfig('papertrailbackoff', 300, options.configfile)
|
||||
options.ptacctname = getConfig('papertrailaccount', 'unset', options.configfile)
|
||||
options.ptquerymax = getConfig('papertrailmaxevents', 2000, options.configfile)
|
||||
options.ptapikey = getConfig("papertrailapikey", "none", options.configfile)
|
||||
options.ptquery = getConfig("papertrailquery", "", options.configfile)
|
||||
options.ptbackoff = getConfig("papertrailbackoff", 300, options.configfile)
|
||||
options.ptacctname = getConfig("papertrailaccount", "unset", options.configfile)
|
||||
options.ptquerymax = getConfig("papertrailmaxevents", 2000, options.configfile)
|
||||
|
||||
# How long to sleep between polling
|
||||
options.sleep_time = getConfig('sleep_time', 60, options.configfile)
|
||||
options.sleep_time = getConfig("sleep_time", 60, options.configfile)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# configure ourselves
|
||||
parser = OptionParser()
|
||||
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
|
||||
parser.add_option(
|
||||
"-c", dest="configfile", default=sys.argv[0].replace(".py", ".conf"), help="configuration file to use"
|
||||
)
|
||||
(options, args) = parser.parse_args()
|
||||
initConfig()
|
||||
initLogger(options)
|
||||
|
|
|
@ -7,20 +7,23 @@
|
|||
|
||||
|
||||
import json
|
||||
|
||||
import sys
|
||||
import socket
|
||||
import time
|
||||
from configlib import getConfig, OptionParser
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
|
||||
import kombu
|
||||
from configlib import getConfig, OptionParser
|
||||
from datetime import datetime
|
||||
from ssl import SSLEOFError, SSLError
|
||||
|
||||
from mozdef_util.utilities.toUTC import toUTC
|
||||
from mozdef_util.utilities.logger import logger, initLogger
|
||||
from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
from mozdef_util.elasticsearch_client import (
|
||||
ElasticsearchClient,
|
||||
ElasticsearchBadServer,
|
||||
ElasticsearchInvalidIndex,
|
||||
ElasticsearchException,
|
||||
)
|
||||
|
||||
from lib.plugins import sendEventToPlugins, registerPlugins
|
||||
from lib.sqs import connect_sqs
|
||||
|
@ -29,18 +32,18 @@ from lib.sqs import connect_sqs
|
|||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
|
||||
hasUWSGI = True
|
||||
except ImportError as e:
|
||||
hasUWSGI = False
|
||||
|
||||
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
"""open or re-open a connection to elastic search"""
|
||||
return ElasticsearchClient((list("{0}".format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(object):
|
||||
|
||||
def __init__(self, queue, esConnection, options):
|
||||
self.sqs_queue = queue
|
||||
self.esConnection = esConnection
|
||||
|
@ -60,77 +63,69 @@ class taskConsumer(object):
|
|||
# delete message from queue
|
||||
msg.delete()
|
||||
except ValueError:
|
||||
logger.error('Invalid message, not JSON <dropping message and continuing>: %r' % msg_body)
|
||||
logger.error("Invalid message, not JSON <dropping message and continuing>: %r" % msg_body)
|
||||
msg.delete()
|
||||
continue
|
||||
time.sleep(options.sleep_time)
|
||||
except (SSLEOFError, SSLError, socket.error):
|
||||
logger.info('Received network related error...reconnecting')
|
||||
logger.info("Received network related error...reconnecting")
|
||||
time.sleep(5)
|
||||
self.sqs_queue = connect_sqs(
|
||||
options.region,
|
||||
options.accesskey,
|
||||
options.secretkey,
|
||||
options.taskexchange
|
||||
)
|
||||
self.sqs_queue = connect_sqs(options.region, options.accesskey, options.secretkey, options.taskexchange)
|
||||
|
||||
def on_message(self, message):
|
||||
try:
|
||||
# default elastic search metadata for an event
|
||||
metadata = {
|
||||
'index': 'events',
|
||||
'id': None
|
||||
}
|
||||
metadata = {"index": "events", "id": None}
|
||||
event = {}
|
||||
|
||||
event['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
event['mozdefhostname'] = self.options.mozdefhostname
|
||||
event["receivedtimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
event["mozdefhostname"] = self.options.mozdefhostname
|
||||
|
||||
if 'tags' in event:
|
||||
event['tags'].extend([self.options.taskexchange])
|
||||
if "tags" in event:
|
||||
event["tags"].extend([self.options.taskexchange])
|
||||
else:
|
||||
event['tags'] = [self.options.taskexchange]
|
||||
event["tags"] = [self.options.taskexchange]
|
||||
|
||||
event['severity'] = 'INFO'
|
||||
event['details'] = {}
|
||||
event["severity"] = "INFO"
|
||||
event["details"] = {}
|
||||
|
||||
for message_key, message_value in message.items():
|
||||
if 'Message' == message_key:
|
||||
if "Message" == message_key:
|
||||
try:
|
||||
message_json = json.loads(message_value)
|
||||
for inside_message_key, inside_message_value in message_json.items():
|
||||
if inside_message_key in ('type', 'category'):
|
||||
event['category'] = inside_message_value
|
||||
for (inside_message_key, inside_message_value) in message_json.items():
|
||||
if inside_message_key in ("type", "category"):
|
||||
event["category"] = inside_message_value
|
||||
# add type subcategory for filtering after
|
||||
# original type field is rewritten as category
|
||||
event['type'] = 'event'
|
||||
elif inside_message_key in ('processid', 'pid'):
|
||||
event["type"] = "event"
|
||||
elif inside_message_key in ("processid", "pid"):
|
||||
processid = str(inside_message_value)
|
||||
processid = processid.replace('[', '')
|
||||
processid = processid.replace(']', '')
|
||||
event['processid'] = processid
|
||||
elif inside_message_key in ('processname','pname'):
|
||||
event['processname'] = inside_message_value
|
||||
elif inside_message_key in ('hostname'):
|
||||
event['hostname'] = inside_message_value
|
||||
elif inside_message_key in ('time', 'timestamp'):
|
||||
event['timestamp'] = toUTC(inside_message_value).isoformat()
|
||||
event['utctimestamp'] = toUTC(event['timestamp']).astimezone(pytz.utc).isoformat()
|
||||
elif inside_message_key in ('summary','payload', 'message'):
|
||||
event['summary'] = inside_message_value.lstrip()
|
||||
elif inside_message_key in ('source'):
|
||||
event['source'] = inside_message_value
|
||||
elif inside_message_key in ('fields', 'details'):
|
||||
processid = processid.replace("[", "")
|
||||
processid = processid.replace("]", "")
|
||||
event["processid"] = processid
|
||||
elif inside_message_key in ("processname", "pname"):
|
||||
event["processname"] = inside_message_value
|
||||
elif inside_message_key in ("hostname"):
|
||||
event["hostname"] = inside_message_value
|
||||
elif inside_message_key in ("time", "timestamp"):
|
||||
event["timestamp"] = toUTC(inside_message_value).isoformat()
|
||||
event["utctimestamp"] = toUTC(event["timestamp"]).astimezone(pytz.utc).isoformat()
|
||||
elif inside_message_key in ("summary", "payload", "message"):
|
||||
event["summary"] = inside_message_value.lstrip()
|
||||
elif inside_message_key in ("source"):
|
||||
event["source"] = inside_message_value
|
||||
elif inside_message_key in ("fields", "details"):
|
||||
if type(inside_message_value) is not dict:
|
||||
event['details']['message'] = inside_message_value
|
||||
event["details"]["message"] = inside_message_value
|
||||
else:
|
||||
if len(inside_message_value) > 0:
|
||||
for details_key, details_value in inside_message_value.items():
|
||||
event['details'][details_key] = details_value
|
||||
for (details_key, details_value) in inside_message_value.items():
|
||||
event["details"][details_key] = details_value
|
||||
else:
|
||||
event['details'][inside_message_key] = inside_message_value
|
||||
event["details"][inside_message_key] = inside_message_value
|
||||
except ValueError:
|
||||
event['summary'] = message_value
|
||||
event["summary"] = message_value
|
||||
(event, metadata) = sendEventToPlugins(event, metadata, self.pluginList)
|
||||
# Drop message if plugins set to None
|
||||
if event is None:
|
||||
|
@ -138,7 +133,7 @@ class taskConsumer(object):
|
|||
self.save_event(event, metadata)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.error('Malformed message: %r' % message)
|
||||
logger.error("Malformed message: %r" % message)
|
||||
|
||||
def save_event(self, event, metadata):
|
||||
try:
|
||||
|
@ -155,44 +150,42 @@ class taskConsumer(object):
|
|||
if self.options.esbulksize != 0:
|
||||
bulk = True
|
||||
|
||||
self.esConnection.save_event(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
self.esConnection.save_event(index=metadata["index"], doc_id=metadata["id"], body=jbody, bulk=bulk)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException):
|
||||
logger.exception(
|
||||
"ElasticSearchException: {0} reported while indexing event, messages lost".format(e)
|
||||
)
|
||||
return
|
||||
except ElasticsearchException as e:
|
||||
logger.exception('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
logger.error('Malformed jbody: %r' % jbody)
|
||||
logger.exception("ElasticSearchException: {0} reported while indexing event, messages lost".format(e))
|
||||
logger.error("Malformed jbody: %r" % jbody)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.error('Malformed message: %r' % event)
|
||||
logger.error("Malformed message: %r" % event)
|
||||
|
||||
|
||||
def main():
|
||||
if hasUWSGI:
|
||||
logger.info("started as uwsgi mule {0}".format(uwsgi.mule_id()))
|
||||
else:
|
||||
logger.info('started without uwsgi')
|
||||
logger.info("started without uwsgi")
|
||||
|
||||
if options.mqprotocol not in ('sqs'):
|
||||
logger.error('Can only process SQS queues, terminating')
|
||||
if options.mqprotocol not in ("sqs"):
|
||||
logger.error("Can only process SQS queues, terminating")
|
||||
sys.exit(1)
|
||||
|
||||
sqs_queue = connect_sqs(
|
||||
region_name=options.region,
|
||||
aws_access_key_id=options.accesskey,
|
||||
aws_secret_access_key=options.secretkey,
|
||||
task_exchange=options.taskexchange
|
||||
task_exchange=options.taskexchange,
|
||||
)
|
||||
# consume our queue
|
||||
taskConsumer(sqs_queue, es, options).run()
|
||||
|
@ -200,34 +193,36 @@ def main():
|
|||
|
||||
def initConfig():
|
||||
# capture the hostname
|
||||
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
|
||||
options.mozdefhostname = getConfig("mozdefhostname", socket.gethostname(), options.configfile)
|
||||
|
||||
# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
|
||||
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
|
||||
options.esbulksize = getConfig('esbulksize', 0, options.configfile)
|
||||
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
|
||||
options.esservers = list(getConfig("esservers", "http://localhost:9200", options.configfile).split(","))
|
||||
options.esbulksize = getConfig("esbulksize", 0, options.configfile)
|
||||
options.esbulktimeout = getConfig("esbulktimeout", 30, options.configfile)
|
||||
|
||||
# set to sqs for Amazon
|
||||
options.mqprotocol = getConfig('mqprotocol', 'sqs', options.configfile)
|
||||
options.mqprotocol = getConfig("mqprotocol", "sqs", options.configfile)
|
||||
|
||||
# rabbit message queue options
|
||||
options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile)
|
||||
options.taskexchange = getConfig("taskexchange", "eventtask", options.configfile)
|
||||
# rabbit: how many messages to ask for at once from the message queue
|
||||
options.prefetch = getConfig('prefetch', 10, options.configfile)
|
||||
options.prefetch = getConfig("prefetch", 10, options.configfile)
|
||||
|
||||
# aws options
|
||||
options.accesskey = getConfig('accesskey', '', options.configfile)
|
||||
options.secretkey = getConfig('secretkey', '', options.configfile)
|
||||
options.region = getConfig('region', '', options.configfile)
|
||||
options.accesskey = getConfig("accesskey", "", options.configfile)
|
||||
options.secretkey = getConfig("secretkey", "", options.configfile)
|
||||
options.region = getConfig("region", "", options.configfile)
|
||||
|
||||
# How long to sleep between polling
|
||||
options.sleep_time = getConfig('sleep_time', 0.1, options.configfile)
|
||||
options.sleep_time = getConfig("sleep_time", 0.1, options.configfile)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# configure ourselves
|
||||
parser = OptionParser()
|
||||
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
|
||||
parser.add_option(
|
||||
"-c", dest="configfile", default=sys.argv[0].replace(".py", ".conf"), help="configuration file to use"
|
||||
)
|
||||
(options, args) = parser.parse_args()
|
||||
initConfig()
|
||||
initLogger(options)
|
||||
|
|
|
@ -18,14 +18,18 @@ import time
|
|||
from configlib import getConfig, OptionParser
|
||||
from datetime import datetime
|
||||
import base64
|
||||
import kombu
|
||||
from ssl import SSLEOFError, SSLError
|
||||
|
||||
from mozdef_util.utilities.toUTC import toUTC
|
||||
from mozdef_util.utilities.to_unicode import toUnicode
|
||||
from mozdef_util.utilities.remove_at import removeAt
|
||||
from mozdef_util.utilities.logger import logger, initLogger
|
||||
from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
from mozdef_util.elasticsearch_client import (
|
||||
ElasticsearchClient,
|
||||
ElasticsearchBadServer,
|
||||
ElasticsearchInvalidIndex,
|
||||
ElasticsearchException,
|
||||
)
|
||||
|
||||
from lib.plugins import sendEventToPlugins, registerPlugins
|
||||
from lib.sqs import connect_sqs
|
||||
|
@ -34,130 +38,130 @@ from lib.sqs import connect_sqs
|
|||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
|
||||
hasUWSGI = True
|
||||
except ImportError as e:
|
||||
hasUWSGI = False
|
||||
|
||||
|
||||
def keyMapping(aDict):
|
||||
'''map common key/fields to a normalized structure,
|
||||
"""map common key/fields to a normalized structure,
|
||||
explicitly typed when possible to avoid schema changes for upsteam consumers
|
||||
Special accomodations made for logstash,nxlog, beaver, heka and CEF
|
||||
Some shippers attempt to conform to logstash-style @fieldname convention.
|
||||
This strips the leading at symbol since it breaks some elastic search
|
||||
libraries like elasticutils.
|
||||
'''
|
||||
"""
|
||||
returndict = dict()
|
||||
|
||||
# uncomment to save the source event for debugging, or chain of custody/forensics
|
||||
# returndict['original']=aDict
|
||||
|
||||
# set the timestamp when we received it, i.e. now
|
||||
returndict['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict['mozdefhostname'] = options.mozdefhostname
|
||||
returndict['details'] = {}
|
||||
returndict["receivedtimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
returndict["mozdefhostname"] = options.mozdefhostname
|
||||
returndict["details"] = {}
|
||||
try:
|
||||
for k, v in aDict.items():
|
||||
k = removeAt(k).lower()
|
||||
|
||||
if k in ('message', 'summary'):
|
||||
returndict['summary'] = toUnicode(v)
|
||||
if k in ("message", "summary"):
|
||||
returndict["summary"] = toUnicode(v)
|
||||
|
||||
if k in ('payload') and 'summary' not in aDict:
|
||||
if k in ("payload") and "summary" not in aDict:
|
||||
# special case for heka if it sends payload as well as a summary, keep both but move payload to the details section.
|
||||
returndict['summary'] = toUnicode(v)
|
||||
elif k in ('payload'):
|
||||
returndict['details']['payload'] = toUnicode(v)
|
||||
returndict["summary"] = toUnicode(v)
|
||||
elif k in ("payload"):
|
||||
returndict["details"]["payload"] = toUnicode(v)
|
||||
|
||||
if k in ('eventtime', 'timestamp', 'utctimestamp'):
|
||||
returndict['utctimestamp'] = toUTC(v).isoformat()
|
||||
returndict['timestamp'] = toUTC(v).isoformat()
|
||||
if k in ("eventtime", "timestamp", "utctimestamp"):
|
||||
returndict["utctimestamp"] = toUTC(v).isoformat()
|
||||
returndict["timestamp"] = toUTC(v).isoformat()
|
||||
|
||||
if k in ('hostname', 'source_host', 'host'):
|
||||
returndict['hostname'] = toUnicode(v)
|
||||
if k in ("hostname", "source_host", "host"):
|
||||
returndict["hostname"] = toUnicode(v)
|
||||
|
||||
if k in ('tags'):
|
||||
if k in ("tags"):
|
||||
if len(v) > 0:
|
||||
returndict['tags'] = v
|
||||
returndict["tags"] = v
|
||||
|
||||
# nxlog keeps the severity name in syslogseverity,everyone else should use severity or level.
|
||||
if k in ('syslogseverity', 'severity', 'severityvalue', 'level'):
|
||||
returndict['severity'] = toUnicode(v).upper()
|
||||
if k in ("syslogseverity", "severity", "severityvalue", "level"):
|
||||
returndict["severity"] = toUnicode(v).upper()
|
||||
|
||||
if k in ('facility', 'syslogfacility','source'):
|
||||
returndict['source'] = toUnicode(v)
|
||||
if k in ("facility", "syslogfacility", "source"):
|
||||
returndict["source"] = toUnicode(v)
|
||||
|
||||
if k in ('pid', 'processid'):
|
||||
returndict['processid'] = toUnicode(v)
|
||||
if k in ("pid", "processid"):
|
||||
returndict["processid"] = toUnicode(v)
|
||||
|
||||
# nxlog sets sourcename to the processname (i.e. sshd), everyone else should call it process name or pname
|
||||
if k in ('pname', 'processname', 'sourcename'):
|
||||
returndict['processname'] = toUnicode(v)
|
||||
if k in ("pname", "processname", "sourcename"):
|
||||
returndict["processname"] = toUnicode(v)
|
||||
|
||||
# the file, or source
|
||||
if k in ('path', 'logger', 'file'):
|
||||
returndict['eventsource'] = toUnicode(v)
|
||||
if k in ("path", "logger", "file"):
|
||||
returndict["eventsource"] = toUnicode(v)
|
||||
|
||||
if k in ('type', 'eventtype', 'category'):
|
||||
returndict['category'] = toUnicode(v)
|
||||
if k in ("type", "eventtype", "category"):
|
||||
returndict["category"] = toUnicode(v)
|
||||
|
||||
# custom fields as a list/array
|
||||
if k in ('fields', 'details'):
|
||||
if k in ("fields", "details"):
|
||||
if type(v) is not dict:
|
||||
returndict['details']['message'] = v
|
||||
returndict["details"]["message"] = v
|
||||
else:
|
||||
if len(v) > 0:
|
||||
for details_key, details_value in v.items():
|
||||
returndict['details'][details_key] = details_value
|
||||
returndict["details"][details_key] = details_value
|
||||
|
||||
# custom fields/details as a one off, not in an array
|
||||
# i.e. fields.something=value or details.something=value
|
||||
# move them to a dict for consistency in querying
|
||||
if k.startswith('fields.') or k.startswith('details.'):
|
||||
newName = k.replace('fields.', '')
|
||||
newName = newName.lower().replace('details.', '')
|
||||
if k.startswith("fields.") or k.startswith("details."):
|
||||
newName = k.replace("fields.", "")
|
||||
newName = newName.lower().replace("details.", "")
|
||||
# add field with a special case for shippers that
|
||||
# don't send details
|
||||
# in an array as int/floats/strings
|
||||
# we let them dictate the data type with field_datatype
|
||||
# convention
|
||||
if newName.endswith('_int'):
|
||||
returndict['details'][str(newName)] = int(v)
|
||||
elif newName.endswith('_float'):
|
||||
returndict['details'][str(newName)] = float(v)
|
||||
if newName.endswith("_int"):
|
||||
returndict["details"][str(newName)] = int(v)
|
||||
elif newName.endswith("_float"):
|
||||
returndict["details"][str(newName)] = float(v)
|
||||
else:
|
||||
returndict['details'][str(newName)] = toUnicode(v)
|
||||
returndict["details"][str(newName)] = toUnicode(v)
|
||||
|
||||
# nxlog windows log handling
|
||||
if 'Domain' in aDict and 'SourceModuleType' in aDict:
|
||||
if "Domain" in aDict and "SourceModuleType" in aDict:
|
||||
# nxlog parses all windows event fields very well
|
||||
# copy all fields to details
|
||||
returndict['details'][k] = v
|
||||
returndict["details"][k] = v
|
||||
|
||||
if 'utctimestamp' not in returndict:
|
||||
if "utctimestamp" not in returndict:
|
||||
# default in case we don't find a reasonable timestamp
|
||||
returndict['utctimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
returndict["utctimestamp"] = toUTC(datetime.now()).isoformat()
|
||||
|
||||
if 'type' not in returndict:
|
||||
if "type" not in returndict:
|
||||
# default replacement for old _type subcategory.
|
||||
# to preserve filtering capabilities
|
||||
returndict['type'] = 'event'
|
||||
returndict["type"] = "event"
|
||||
|
||||
except Exception as e:
|
||||
logger.exception('Exception normalizing the message %r' % e)
|
||||
logger.error('Malformed message dict: %r' % aDict)
|
||||
logger.exception("Exception normalizing the message %r" % e)
|
||||
logger.error("Malformed message dict: %r" % aDict)
|
||||
return None
|
||||
|
||||
return returndict
|
||||
|
||||
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
"""open or re-open a connection to elastic search"""
|
||||
return ElasticsearchClient((list("{0}".format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(object):
|
||||
|
||||
def __init__(self, queue, esConnection):
|
||||
self.sqs_queue = queue
|
||||
self.esConnection = esConnection
|
||||
|
@ -180,7 +184,9 @@ class taskConsumer(object):
|
|||
tmp = base64.b64decode(tmp)
|
||||
msgbody = json.loads(tmp)
|
||||
except Exception as e:
|
||||
logger.error('Invalid message, not JSON <dropping message and continuing>: %r' % msg.get_body())
|
||||
logger.error(
|
||||
"Invalid message, not JSON <dropping message and continuing>: %r" % msg.get_body()
|
||||
)
|
||||
msg.delete()
|
||||
continue
|
||||
|
||||
|
@ -195,21 +201,19 @@ class taskConsumer(object):
|
|||
event = msgbody
|
||||
|
||||
# Was this message sent by fluentd-sqs
|
||||
fluentd_sqs_specific_fields = {
|
||||
'az', 'instance_id', '__tag'}
|
||||
if fluentd_sqs_specific_fields.issubset(
|
||||
set(msgbody.keys())):
|
||||
fluentd_sqs_specific_fields = {"az", "instance_id", "__tag"}
|
||||
if fluentd_sqs_specific_fields.issubset(set(msgbody.keys())):
|
||||
# Until we can influence fluentd-sqs to set the
|
||||
# 'customendpoint' key before submitting to SQS, we'll
|
||||
# need to do it here
|
||||
# TODO : Change nubis fluentd output to include
|
||||
# 'customendpoint'
|
||||
event['customendpoint'] = True
|
||||
event["customendpoint"] = True
|
||||
|
||||
if 'tags' in event:
|
||||
event['tags'].extend([options.taskexchange])
|
||||
if "tags" in event:
|
||||
event["tags"].extend([options.taskexchange])
|
||||
else:
|
||||
event['tags'] = [options.taskexchange]
|
||||
event["tags"] = [options.taskexchange]
|
||||
|
||||
# process message
|
||||
self.on_message(event, msg)
|
||||
|
@ -219,32 +223,24 @@ class taskConsumer(object):
|
|||
time.sleep(options.sleep_time)
|
||||
|
||||
except ValueError as e:
|
||||
logger.exception('Exception while handling message: %r' % e)
|
||||
logger.exception("Exception while handling message: %r" % e)
|
||||
msg.delete()
|
||||
except (SSLEOFError, SSLError, socket.error):
|
||||
logger.info('Received network related error...reconnecting')
|
||||
logger.info("Received network related error...reconnecting")
|
||||
time.sleep(5)
|
||||
self.sqs_queue = connect_sqs(
|
||||
options.region,
|
||||
options.accesskey,
|
||||
options.secretkey,
|
||||
options.taskexchange
|
||||
)
|
||||
self.sqs_queue = connect_sqs(options.region, options.accesskey, options.secretkey, options.taskexchange)
|
||||
|
||||
def on_message(self, body, message):
|
||||
# print("RECEIVED MESSAGE: %r" % (body, ))
|
||||
try:
|
||||
# default elastic search metadata for an event
|
||||
metadata = {
|
||||
'index': 'events',
|
||||
'id': None
|
||||
}
|
||||
metadata = {"index": "events", "id": None}
|
||||
# just to be safe..check what we were sent.
|
||||
if isinstance(body, dict):
|
||||
bodyDict = body
|
||||
elif isinstance(body, str):
|
||||
try:
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
bodyDict = json.loads(body) # lets assume it's json
|
||||
except ValueError as e:
|
||||
# not json..ack but log the message
|
||||
logger.error("Exception: unknown body type received %r" % body)
|
||||
|
@ -255,7 +251,7 @@ class taskConsumer(object):
|
|||
# message.ack()
|
||||
return
|
||||
|
||||
if 'customendpoint' in bodyDict and bodyDict['customendpoint']:
|
||||
if "customendpoint" in bodyDict and bodyDict["customendpoint"]:
|
||||
# custom document
|
||||
# send to plugins to allow them to modify it if needed
|
||||
(normalizedDict, metadata) = sendEventToPlugins(bodyDict, metadata, pluginList)
|
||||
|
@ -282,12 +278,7 @@ class taskConsumer(object):
|
|||
if options.esbulksize != 0:
|
||||
bulk = True
|
||||
|
||||
self.esConnection.save_event(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
self.esConnection.save_event(index=metadata["index"], doc_id=metadata["id"], body=jbody, bulk=bulk)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
|
@ -295,23 +286,23 @@ class taskConsumer(object):
|
|||
self.esConnection = esConnect()
|
||||
# message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException):
|
||||
# there's no requeue and we drop several messages
|
||||
# state may be already set.
|
||||
logger.exception(
|
||||
"ElasticSearchException: {0} reported while indexing event, messages lost".format(e)
|
||||
)
|
||||
return
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
logger.exception('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
# message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
return
|
||||
logger.exception("ElasticSearchException: {0} reported while indexing event, messages lost".format(e))
|
||||
# there's no requeue and we drop several messages
|
||||
# message.requeue()
|
||||
return
|
||||
|
||||
# message.ack()
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
logger.error('Malformed message body: %r' % body)
|
||||
logger.error("Malformed message body: %r" % body)
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -321,17 +312,17 @@ def main():
|
|||
if hasUWSGI:
|
||||
logger.info("started as uwsgi mule {0}".format(uwsgi.mule_id()))
|
||||
else:
|
||||
logger.info('started without uwsgi')
|
||||
logger.info("started without uwsgi")
|
||||
|
||||
if options.mqprotocol not in ('sqs'):
|
||||
logger.error('Can only process SQS queues, terminating')
|
||||
if options.mqprotocol not in ("sqs"):
|
||||
logger.error("Can only process SQS queues, terminating")
|
||||
sys.exit(1)
|
||||
|
||||
sqs_queue = connect_sqs(
|
||||
region_name=options.region,
|
||||
aws_access_key_id=options.accesskey,
|
||||
aws_secret_access_key=options.secretkey,
|
||||
task_exchange=options.taskexchange
|
||||
task_exchange=options.taskexchange,
|
||||
)
|
||||
# consume our queue
|
||||
taskConsumer(sqs_queue, es).run()
|
||||
|
@ -339,46 +330,48 @@ def main():
|
|||
|
||||
def initConfig():
|
||||
# capture the hostname
|
||||
options.mozdefhostname = getConfig('mozdefhostname', socket.gethostname(), options.configfile)
|
||||
options.mozdefhostname = getConfig("mozdefhostname", socket.gethostname(), options.configfile)
|
||||
|
||||
# elastic search options. set esbulksize to a non-zero value to enable bulk posting, set timeout to post no matter how many events after X seconds.
|
||||
options.esservers = list(getConfig('esservers', 'http://localhost:9200', options.configfile).split(','))
|
||||
options.esbulksize = getConfig('esbulksize', 0, options.configfile)
|
||||
options.esbulktimeout = getConfig('esbulktimeout', 30, options.configfile)
|
||||
options.esservers = list(getConfig("esservers", "http://localhost:9200", options.configfile).split(","))
|
||||
options.esbulksize = getConfig("esbulksize", 0, options.configfile)
|
||||
options.esbulktimeout = getConfig("esbulktimeout", 30, options.configfile)
|
||||
|
||||
# set to sqs for Amazon
|
||||
options.mqprotocol = getConfig('mqprotocol', 'sqs', options.configfile)
|
||||
options.mqprotocol = getConfig("mqprotocol", "sqs", options.configfile)
|
||||
|
||||
# rabbit message queue options
|
||||
options.mqserver = getConfig('mqserver', 'localhost', options.configfile)
|
||||
options.taskexchange = getConfig('taskexchange', 'eventtask', options.configfile)
|
||||
options.mqserver = getConfig("mqserver", "localhost", options.configfile)
|
||||
options.taskexchange = getConfig("taskexchange", "eventtask", options.configfile)
|
||||
# rabbit: how many messages to ask for at once from the message queue
|
||||
options.prefetch = getConfig('prefetch', 10, options.configfile)
|
||||
options.prefetch = getConfig("prefetch", 10, options.configfile)
|
||||
# rabbit: user creds
|
||||
options.mquser = getConfig('mquser', 'guest', options.configfile)
|
||||
options.mqpassword = getConfig('mqpassword', 'guest', options.configfile)
|
||||
options.mquser = getConfig("mquser", "guest", options.configfile)
|
||||
options.mqpassword = getConfig("mqpassword", "guest", options.configfile)
|
||||
# rabbit: port/vhost
|
||||
options.mqport = getConfig('mqport', 5672, options.configfile)
|
||||
options.mqvhost = getConfig('mqvhost', '/', options.configfile)
|
||||
options.mqport = getConfig("mqport", 5672, options.configfile)
|
||||
options.mqvhost = getConfig("mqvhost", "/", options.configfile)
|
||||
|
||||
# rabbit: run with message acking?
|
||||
# also toggles transient/persistant delivery (messages in memory only or stored on disk)
|
||||
# ack=True sets persistant delivery, False sets transient delivery
|
||||
options.mqack = getConfig('mqack', True, options.configfile)
|
||||
options.mqack = getConfig("mqack", True, options.configfile)
|
||||
|
||||
# aws options
|
||||
options.accesskey = getConfig('accesskey', '', options.configfile)
|
||||
options.secretkey = getConfig('secretkey', '', options.configfile)
|
||||
options.region = getConfig('region', '', options.configfile)
|
||||
options.accesskey = getConfig("accesskey", "", options.configfile)
|
||||
options.secretkey = getConfig("secretkey", "", options.configfile)
|
||||
options.region = getConfig("region", "", options.configfile)
|
||||
|
||||
# How long to sleep between polling
|
||||
options.sleep_time = getConfig('sleep_time', 0.1, options.configfile)
|
||||
options.sleep_time = getConfig("sleep_time", 0.1, options.configfile)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
# configure ourselves
|
||||
parser = OptionParser()
|
||||
parser.add_option("-c", dest='configfile', default=sys.argv[0].replace('.py', '.conf'), help="configuration file to use")
|
||||
parser.add_option(
|
||||
"-c", dest="configfile", default=sys.argv[0].replace(".py", ".conf"), help="configuration file to use"
|
||||
)
|
||||
(options, args) = parser.parse_args()
|
||||
initConfig()
|
||||
initLogger(options)
|
||||
|
|
Загрузка…
Ссылка в новой задаче