update aggregation mechanisms to allow specifying the dict path as key.subkey.subkey.etc, closes #275

This commit is contained in:
Jeff Bryner 2015-05-27 13:23:05 -07:00
Родитель 3e90b2dc03
Коммит dda10eca82
1 изменённых файлов: 44 добавлений и 18 удалений

Просмотреть файл

@ -28,7 +28,7 @@ def toUTC(suspectedDate, localTimeZone=None):
utc = pytz.UTC
objDate = None
if localTimeZone is None:
localTimeZone= OPTIONS['defaulttimezone']
localTimeZone= OPTIONS['defaulttimezone']
if type(suspectedDate) in (str, unicode):
objDate = parse(suspectedDate, fuzzy=True)
elif type(suspectedDate) == datetime:
@ -68,6 +68,17 @@ def dictpath(path):
yield '{0}'.format(i)
def getValueByPath(input_dict, path_string):
"""
Gets data/value from a dictionary using a dotted accessor-string
http://stackoverflow.com/a/7534478
path_string can be key.subkey.subkey.subkey
"""
return_data = input_dict
for chunk in path_string.split('.'):
return_data = return_data.get(chunk, {})
return return_data
class AlertTask(Task):
@ -145,7 +156,7 @@ class AlertTask(Task):
for k in list(keypaths(i)):
if not (set(k[0]).symmetric_difference(path)):
inspectlist.append(k[1])
return Counter(inspectlist).most_common()
@ -200,7 +211,7 @@ class AlertTask(Task):
must, should and must_not are pyes filter objects lists
see http://pyes.readthedocs.org/en/latest/references/pyes.filters.html
"""
self.begindateUTC = toUTC(datetime.now() - timedelta(**date_timedelta))
@ -303,9 +314,20 @@ class AlertTask(Task):
self.log.error('Error while searching events in ES: {0}'.format(e))
def searchEventsAggreg(self, aggregField, samplesLimit=5):
def searchEventsAggregated(self, aggregationPath, samplesLimit=5):
"""
Search aggregations matching filters by aggregField, store them in self.aggregations
Search events, aggregate matching ES filters by aggregationPath,
store them in self.aggregations as a list of dictionaries
keys:
value: the text value that was found in the aggregationPath
count: the hitcount of the text value
events: the sampled list of events that matched
allevents: the unsample, total list of matching events
aggregationPath can be key.subkey.subkey to specify a path to a dictionary value
relative to the _source that's returned from elastic search.
ex: details.sourceipaddress
"""
try:
pyesresults = self.es.search(
@ -316,13 +338,14 @@ class AlertTask(Task):
# List of aggregation values that can be counted/summarized by Counter
# Example: ['evil@evil.com','haxoor@noob.com', 'evil@evil.com'] for an email aggregField
aggregValues = []
aggregationValues = []
for r in results:
aggregValues.append(r['_source']['details'][aggregField])
aggregationValues.append(getValueByPath(r['_source'], aggregationPath))
# [{value:'evil@evil.com',count:1337,events:[...]}, ...]
aggregList = []
for i in Counter(aggregValues).most_common():
aggregationList = []
for i in Counter(aggregationValues).most_common():
idict = {
'value': i[0],
'count': i[1],
@ -330,16 +353,16 @@ class AlertTask(Task):
'allevents': []
}
for r in results:
if r['_source']['details'][aggregField].encode('ascii', 'ignore') == i[0]:
if getValueByPath(r['_source'], aggregationPath).encode('ascii', 'ignore') == i[0]:
# copy events detail into this aggregation up to our samples limit
if len(idict['events']) < samplesLimit:
idict['events'].append(r)
# also copy all events to a non-sampled list
# so we mark all events as alerted and don't re-alert
idict['allevents'].append(r)
aggregList.append(idict)
aggregationList.append(idict)
self.aggregations = aggregList
self.aggregations = aggregationList
self.log.debug(self.aggregations)
except Exception as e:
self.log.error('Error while searching events in ES: {0}'.format(e))
@ -366,7 +389,7 @@ class AlertTask(Task):
self.log.debug(alert)
alertResultES = self.alertToES(alert)
self.alertToMessageQueue(alert)
self.hookAfterInsertion(alert)
self.hookAfterInsertion(alert)
def walkAggregations(self, threshold):
@ -374,13 +397,16 @@ class AlertTask(Task):
Walk through aggregations, provide some methods to hook in alerts
"""
if len(self.aggregations) > 0:
for aggreg in self.aggregations:
if aggreg['count'] >= threshold:
alert = self.onAggreg(aggreg)
for aggregation in self.aggregations:
if aggregation['count'] >= threshold:
alert = self.onAggregation(aggregation)
self.log.debug(alert)
if alert:
alertResultES = self.alertToES(alert)
self.tagEventsAlert(aggreg['allevents'], alertResultES)
# even though we only sample events in the alert
# tag all events as alerted to avoid re-alerting
# on events we've already processed.
self.tagEventsAlert(aggregation['allevents'], alertResultES)
self.alertToMessageQueue(alert)
@ -429,7 +455,7 @@ class AlertTask(Task):
"""
pass
def onAggreg(self, aggreg):
def onAggregation(self, aggregation):
"""
To be overriden by children to run their code
to be used when creating an alert using an aggregation