зеркало из https://github.com/mozilla/MozDef.git
Update mq directory with search class
Signed-off-by: Brandon Myers <bmyers@mozilla.com>
This commit is contained in:
Родитель
5082d87f68
Коммит
76174add7d
|
@ -200,7 +200,7 @@ class AlertTask(Task):
|
|||
Send alert to elasticsearch
|
||||
"""
|
||||
try:
|
||||
res = self.es.save_alert(alertDict)
|
||||
res = self.es.save_alert(body=alertDict)
|
||||
self.log.debug('alert sent to ES')
|
||||
self.log.debug(res)
|
||||
return res
|
||||
|
@ -464,8 +464,7 @@ class AlertTask(Task):
|
|||
'id': alertResultES['_id']})
|
||||
event['_source']['alerttimestamp'] = toUTC(datetime.now()).isoformat()
|
||||
|
||||
|
||||
self.es.update_event(event['_index'], event['_type'], event['_id'], event['_source'])
|
||||
self.es.save_event(index=event['_index'], doc_type=event['_type'], body=event['_source'], doc_id=event['_id'])
|
||||
except Exception as e:
|
||||
self.log.error('Error while updating events in ES: {0}'.format(e))
|
||||
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch_dsl import Search
|
||||
|
||||
from query_models import SearchQuery, TermMatch, AggregatedResults, SimpleResults
|
||||
|
||||
import json
|
||||
|
@ -9,115 +6,154 @@ import json
|
|||
import os
|
||||
import sys
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../alerts/lib"))
|
||||
import pyes
|
||||
import pyes_enabled
|
||||
from config import ESv134
|
||||
# Remove this code when pyes is gone!
|
||||
|
||||
if pyes_enabled.pyes_on is True:
|
||||
import pyes
|
||||
else:
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch_dsl import Search
|
||||
|
||||
|
||||
class ElasticsearchBadServer(Exception):
|
||||
def __str__(self):
|
||||
return "Bad ES Server defined"
|
||||
|
||||
|
||||
class ElasticsearchException(Exception):
|
||||
def __str__(self):
|
||||
return "Exception in ES encountered"
|
||||
|
||||
|
||||
class ElasticsearchInvalidIndex(Exception):
|
||||
def __init__(self, index_name):
|
||||
self.index_name = index_name
|
||||
|
||||
def __str__(self):
|
||||
return "Invalid index: " + self.index_name
|
||||
|
||||
|
||||
class ElasticsearchClient():
|
||||
|
||||
def __init__(self, servers):
|
||||
def __init__(self, servers, bulk_amount=100):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
# ES v1
|
||||
self.pyes_client = pyes.ES(ESv134['servers'])
|
||||
self.es_connection = pyes.ES(servers, bulk_size=bulk_amount)
|
||||
else:
|
||||
# ES v2 and up
|
||||
self.es = Elasticsearch(servers)
|
||||
self.es.ping()
|
||||
self.es_connection = Elasticsearch(servers)
|
||||
self.es_connection.ping()
|
||||
|
||||
def delete_index(self, index_name, ignore_fail=False):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
if ignore_fail is True:
|
||||
self.pyes_client.indices.delete_index_if_exists(index_name)
|
||||
self.es_connection.indices.delete_index_if_exists(index_name)
|
||||
else:
|
||||
self.pyes_client.indices.delete_index(index_name)
|
||||
self.es_connection.indices.delete_index(index_name)
|
||||
else:
|
||||
ignore_codes = []
|
||||
if ignore_fail is True:
|
||||
ignore_codes = [400, 404]
|
||||
|
||||
self.es.indices.delete(index=index_name, ignore=ignore_codes)
|
||||
self.es_connection.indices.delete(index=index_name, ignore=ignore_codes)
|
||||
|
||||
def create_index(self, index_name, ignore_fail=False):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
self.pyes_client.indices.create_index(index_name)
|
||||
self.es_connection.indices.create_index(index_name)
|
||||
else:
|
||||
mapping = '''
|
||||
{
|
||||
"mappings":{}
|
||||
}'''
|
||||
self.es.indices.create(index=index_name, update_all_types='true', body=mapping)
|
||||
self.es_connection.indices.create(index=index_name, update_all_types='true', body=mapping)
|
||||
|
||||
def create_alias(self, alias_name, index_name):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
self.pyes_client.indices.add_alias(alias_name, index_name)
|
||||
self.es_connection.indices.add_alias(alias_name, index_name)
|
||||
else:
|
||||
self.es.indices.put_alias(index=index_name, name=alias_name)
|
||||
self.es_connection.indices.put_alias(index=index_name, name=alias_name)
|
||||
|
||||
def flush(self, index_name):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
self.pyes_client.indices.flush()
|
||||
self.es_connection.indices.flush()
|
||||
else:
|
||||
self.es.indices.flush(index=index_name)
|
||||
|
||||
def save_event(self, event, event_type='event'):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
return self.pyes_client.index(index='events', doc_type=event_type, doc=event)
|
||||
else:
|
||||
return self.es.index(index='events', doc_type=event_type, body=event)
|
||||
|
||||
def update_event(self, index, doc_type, event_id, event):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
self.pyes_client.update(index, doc_type, event_id, document=event)
|
||||
else:
|
||||
self.es.index(index=index, doc_type=doc_type, id=event_id, body=event)
|
||||
self.es_connection.indices.flush(index=index_name)
|
||||
|
||||
def search(self, search_query, indices):
|
||||
results = []
|
||||
if pyes_enabled.pyes_on is True:
|
||||
esresults = self.pyes_client.search(search_query, size=1000, indices=','.join(map(str, indices)))
|
||||
# todo: update the size amount
|
||||
esresults = self.es_connection.search(search_query, size=1000, indices=','.join(map(str, indices)))
|
||||
results = esresults._search_raw()
|
||||
else:
|
||||
results = Search(using=self.es, index=indices).filter(search_query).execute()
|
||||
results = Search(using=self.es_connection, index=indices).filter(search_query).execute()
|
||||
|
||||
result_set = SimpleResults(results)
|
||||
return result_set
|
||||
|
||||
def aggregated_search(self, search_query, indices, aggregations):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
query = search_query.search()
|
||||
for field_name in aggregations:
|
||||
query.facet.add_term_facet(field_name)
|
||||
query = search_query.search()
|
||||
for field_name in aggregations:
|
||||
query.facet.add_term_facet(field_name)
|
||||
|
||||
esresults = self.pyes_client.search(query, size=1000, indices=','.join(map(str, indices)))
|
||||
results = esresults._search_raw()
|
||||
# todo: change size here
|
||||
esresults = self.es_connection.search(query, size=1000, indices=','.join(map(str, indices)))
|
||||
results = esresults._search_raw()
|
||||
else:
|
||||
search_obj = Search(using=self.es, index=indices)
|
||||
query_obj = search_obj.filter(search_query)
|
||||
for field_name in aggregations:
|
||||
query_obj.aggs.bucket(field_name.to_dict()['terms']['field'], field_name)
|
||||
results = query_obj.execute()
|
||||
search_obj = Search(using=self.es_connection, index=indices)
|
||||
query_obj = search_obj.filter(search_query)
|
||||
for field_name in aggregations:
|
||||
query_obj.aggs.bucket(field_name.to_dict()['terms']['field'], field_name)
|
||||
results = query_obj.execute()
|
||||
|
||||
result_set = AggregatedResults(results)
|
||||
return result_set
|
||||
|
||||
def save_alert(self, alert):
|
||||
def save_object(self, index, doc_type, body, doc_id=None, bulk=False):
|
||||
if pyes_enabled.pyes_on is True:
|
||||
return self.pyes_client.index(index='alerts', doc_type='alert', doc=alert)
|
||||
else:
|
||||
return self.es.index(index='alerts', doc_type='alert', body=alert)
|
||||
try:
|
||||
if doc_id:
|
||||
return self.es_connection.index(index=index, doc_type=doc_type, doc=body, id=doc_id, bulk=bulk)
|
||||
else:
|
||||
return self.es_connection.index(index=index, doc_type=doc_type, doc=body, bulk=bulk)
|
||||
|
||||
def get_alert_by_id(self, alert_id):
|
||||
id_match = TermMatch('_id', alert_id)
|
||||
except pyes.exceptions.NoServerAvailable:
|
||||
raise ElasticsearchBadServer()
|
||||
except pyes.exceptions.InvalidIndexNameException:
|
||||
raise ElasticsearchInvalidIndex(index)
|
||||
except pyes.exceptions.ElasticSearchException:
|
||||
raise ElasticsearchException()
|
||||
else:
|
||||
if doc_id:
|
||||
return self.es_connection.index(index=index, doc_type=doc_type, id=doc_id, body=body)
|
||||
else:
|
||||
return self.es_connection.index(index=index, doc_type=doc_type, body=body)
|
||||
|
||||
|
||||
def save_alert(self, body, index='alerts', doc_type='alert', doc_id=None, bulk=False):
|
||||
return self.save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id, bulk=bulk)
|
||||
|
||||
def save_event(self, body, index='events', doc_type='event', doc_id=None, bulk=False):
|
||||
return self.save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id, bulk=bulk)
|
||||
|
||||
def get_object_by_id(self, object_id, indices):
|
||||
id_match = TermMatch('_id', object_id)
|
||||
search_query = SearchQuery()
|
||||
search_query.add_must(id_match)
|
||||
results = search_query.execute(self, indices=['alerts'])
|
||||
results = search_query.execute(self, indices=indices)
|
||||
if len(results['hits']) == 0:
|
||||
return None
|
||||
else:
|
||||
return results['hits'][0]
|
||||
|
||||
def get_alert_by_id(self, alert_id):
|
||||
return self.get_object_by_id(alert_id, ['alerts'])
|
||||
|
||||
def get_event_by_id(self, event_id):
|
||||
return self.get_object_by_id(event_id, ['events'])
|
||||
|
||||
def save_dashboard(self, dash_file, dash_name=None):
|
||||
f = open(dash_file)
|
||||
dashboardjson = json.load(f)
|
||||
|
@ -133,7 +169,10 @@ class ElasticsearchClient():
|
|||
}
|
||||
|
||||
if pyes_enabled.pyes_on is True:
|
||||
return self.pyes_client.index(index='kibana-int', doc_type='dashboard', doc=dashboarddata)
|
||||
return self.es_connection.index(index='kibana-int', doc_type='dashboard', doc=dashboarddata)
|
||||
else:
|
||||
return self.es.index(index='kibana-int', doc_type='dashboard', body=dashboarddata)
|
||||
return self.es_connection.index(index='kibana-int', doc_type='dashboard', body=dashboarddata)
|
||||
|
||||
def flush_bulk(input):
|
||||
print "NEED TO IMPLEMENT THIS!"
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import os
|
||||
|
||||
global pyes_on
|
||||
pyes_on = os.environ.get('PYES')
|
||||
pyes_off = os.environ.get('DSL')
|
||||
|
||||
if pyes_on == 'True':
|
||||
pyes_on = True
|
||||
print "\nUsing PYES\n"
|
||||
else:
|
||||
if pyes_off == 'True':
|
||||
pyes_on = False
|
||||
print "\nUsing Elasticsearch DSL\n"
|
||||
else:
|
||||
pyes_on = True
|
||||
print "\nUsing PYES\n"
|
|
@ -13,7 +13,6 @@ import dateutil.parser
|
|||
import json
|
||||
import kombu
|
||||
import logging
|
||||
import pyes
|
||||
import re
|
||||
import sys
|
||||
from configlib import getConfig, OptionParser
|
||||
|
@ -21,6 +20,10 @@ from kombu import Connection, Queue, Exchange
|
|||
from kombu.mixins import ConsumerMixin
|
||||
from logging.handlers import SysLogHandler
|
||||
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
from elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex
|
||||
|
||||
logger = logging.getLogger()
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
|
||||
|
@ -60,13 +63,8 @@ def splitTime(inString):
|
|||
return(' '.join(outString), ' '.join(outTime))
|
||||
|
||||
|
||||
def esConnect(conn):
|
||||
'''open or re-open a connection to elastic search'''
|
||||
if isinstance(conn, pyes.es.ES):
|
||||
return pyes.ES((list('{0}'.format(s) for s in options.esservers)))
|
||||
else:
|
||||
return pyes.ES((list('{0}'.format(s) for s in options.esservers)))
|
||||
|
||||
def esConnect():
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)))
|
||||
|
||||
class eventConsumer(ConsumerMixin):
|
||||
|
||||
|
@ -127,15 +125,13 @@ class eventConsumer(ConsumerMixin):
|
|||
msg, adate = splitTime(bodyDict['summary'])
|
||||
bodyDict['summary'] = msg
|
||||
try:
|
||||
self.esConnection.index(index='alerts',
|
||||
doc_type='alert',
|
||||
doc=bodyDict)
|
||||
except (pyes.exceptions.NoServerAvailable,
|
||||
pyes.exceptions.InvalidIndexNameException) as e:
|
||||
self.esConnection.save_alert(body=bodyDict)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index
|
||||
# rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect(None)
|
||||
self.esConnection = esConnect()
|
||||
message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
|
@ -280,5 +276,5 @@ if __name__ == '__main__':
|
|||
help="configuration file to use")
|
||||
(options, args) = parser.parse_args()
|
||||
initConfig()
|
||||
es = esConnect(None)
|
||||
es = esConnect()
|
||||
main()
|
||||
|
|
|
@ -8,60 +8,64 @@
|
|||
# Contributors:
|
||||
# Jeff Bryner jbryner@mozilla.com
|
||||
|
||||
import os
|
||||
import sys
|
||||
import pyes
|
||||
from kombu import Connection,Queue,Exchange
|
||||
from kombu import Connection, Queue, Exchange
|
||||
from kombu.mixins import ConsumerMixin
|
||||
from configlib import getConfig,OptionParser
|
||||
from configlib import getConfig, OptionParser
|
||||
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
from elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex
|
||||
|
||||
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return pyes.ES((list('{0}'.format(s) for s in options.esservers)))
|
||||
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)))
|
||||
|
||||
|
||||
class eventConsumer(ConsumerMixin):
|
||||
'''kombu mixin to receive events and copy them to an elastic search server.
|
||||
Helpful when testing new clusters, for failover,etc.
|
||||
Does not ack messages, deletes queues on exit so not guaranteed to copy all messages
|
||||
'''
|
||||
def __init__(self, mqConnection,eventQueue,eventExchange,esConnection):
|
||||
def __init__(self, mqConnection, eventQueue, eventExchange, esConnection):
|
||||
self.connection = mqConnection
|
||||
self.esConnection=esConnection
|
||||
self.eventQueue=eventQueue
|
||||
self.eventExchange=eventExchange
|
||||
self.esConnection = esConnection
|
||||
self.eventQueue = eventQueue
|
||||
self.eventExchange = eventExchange
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
consumer=Consumer(self.eventQueue, no_ack=True,callbacks=[self.on_message], accept=['json'])
|
||||
consumer = Consumer(self.eventQueue, no_ack=True, callbacks=[self.on_message], accept=['json'])
|
||||
consumer.qos(prefetch_count=options.prefetch)
|
||||
return [consumer]
|
||||
|
||||
def on_message(self, body, message):
|
||||
try:
|
||||
print("RECEIVED MESSAGE: %r" % (body, ))
|
||||
#copy event to es cluster
|
||||
# copy event to es cluster
|
||||
try:
|
||||
res=self.esConnection.index(index='events',doc_type='event',doc=body)
|
||||
#handle loss of server or race condition with index rotation/creation/aliasing
|
||||
except (pyes.exceptions.NoServerAvailable,pyes.exceptions.InvalidIndexNameException) as e:
|
||||
pass
|
||||
self.esConnection.save_event(body=body)
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
pass
|
||||
except Exception as e:
|
||||
sys.stderr.write("exception in events queue %r\n"%e)
|
||||
|
||||
def main():
|
||||
def main():
|
||||
#connect and declare the message queue/kombu objects.
|
||||
connString='amqp://{0}:{1}@{2}:{3}//'.format(options.mquser,options.mqpassword,options.mqserver,options.mqport)
|
||||
mqConn=Connection(connString)
|
||||
|
||||
|
||||
#topic exchange for listening to mozdef.event
|
||||
eventExchange=Exchange(name=options.eventexchange,type='topic',durable=False,delivery_mode=1)
|
||||
eventExchange(mqConn).declare()
|
||||
#Queue for the exchange
|
||||
eventQueue=Queue('',exchange=eventExchange,routing_key=options.routingkey,durable=False,exclusive=True,auto_delete=True)
|
||||
#eventQueue(mqConn).declare()
|
||||
|
||||
|
||||
#consume our queue and publish on the topic exchange
|
||||
eventConsumer(mqConn,eventQueue,eventExchange,es).run()
|
||||
|
||||
|
||||
|
||||
def initConfig():
|
||||
options.mqserver=getConfig('mqserver','localhost',options.configfile)
|
||||
|
@ -72,7 +76,7 @@ def initConfig():
|
|||
options.prefetch=getConfig('prefetch',1,options.configfile)
|
||||
options.mquser=getConfig('mquser','guest',options.configfile)
|
||||
options.mqpassword=getConfig('mqpassword','guest',options.configfile)
|
||||
options.mqport=getConfig('mqport',5672,options.configfile)
|
||||
options.mqport=getConfig('mqport',5672,options.configfile)
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser=OptionParser()
|
||||
|
|
|
@ -12,7 +12,6 @@ import json
|
|||
import kombu
|
||||
import math
|
||||
import os
|
||||
import pyes
|
||||
import pytz
|
||||
import pynsive
|
||||
import re
|
||||
|
@ -26,6 +25,11 @@ from kombu import Connection, Queue, Exchange
|
|||
from kombu.mixins import ConsumerMixin
|
||||
from threading import Timer
|
||||
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
from elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
|
||||
|
||||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
|
@ -223,9 +227,9 @@ def keyMapping(aDict):
|
|||
|
||||
return returndict
|
||||
|
||||
def esConnect(conn):
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return pyes.ES(server=(list('{0}'.format(s) for s in options.esservers)), bulk_size=options.esbulksize)
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(ConsumerMixin):
|
||||
|
@ -241,7 +245,7 @@ class taskConsumer(ConsumerMixin):
|
|||
else:
|
||||
self.muleid = 0
|
||||
if options.esbulksize != 0:
|
||||
# if we are bulk posting enable a timer to occasionally flush the pyes bulker even if it's not full
|
||||
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
|
||||
# to prevent events from sticking around an idle worker
|
||||
Timer(options.esbulktimeout, self.flush_es_bulk).start()
|
||||
|
||||
|
@ -307,21 +311,23 @@ class taskConsumer(ConsumerMixin):
|
|||
doctype = bodyDict['details']['deviceproduct']
|
||||
|
||||
try:
|
||||
bulk = False
|
||||
if options.esbulksize != 0:
|
||||
res = self.esConnection.index(index='events', doc_type=doctype, doc=jbody, bulk=True)
|
||||
else:
|
||||
res = self.esConnection.index(index='events', doc_type=doctype, doc=jbody, bulk=False)
|
||||
bulk = True
|
||||
|
||||
except (pyes.exceptions.NoServerAvailable, pyes.exceptions.InvalidIndexNameException) as e:
|
||||
self.esConnection.save_event(body=jbody, doc_type=doctype, bulk=bulk)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect(None)
|
||||
self.esConnection = esConnect()
|
||||
message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
return
|
||||
except pyes.exceptions.ElasticSearchException as e:
|
||||
# todo: fix the exception
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
|
@ -521,7 +527,7 @@ if __name__ == '__main__':
|
|||
initConfig()
|
||||
|
||||
# open ES connection globally so we don't waste time opening it per message
|
||||
es = esConnect(None)
|
||||
es = esConnect()
|
||||
|
||||
# force a check for plugins and establish the plugin list
|
||||
pluginList = list()
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
import time
|
||||
import pyes
|
||||
from datetime import datetime
|
||||
from dateutil.parser import parse
|
||||
import pytz
|
||||
|
@ -10,6 +9,10 @@ from configlib import getConfig,OptionParser
|
|||
from kombu import Connection,Queue
|
||||
from kombu.mixins import ConsumerMixin
|
||||
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
from elasticsearch_client import ElasticsearchClient
|
||||
|
||||
def toUTC(suspectedDate,localTimeZone=None):
|
||||
'''make a UTC date out of almost anything'''
|
||||
utc=pytz.UTC
|
||||
|
@ -20,7 +23,7 @@ def toUTC(suspectedDate,localTimeZone=None):
|
|||
objDate=parse(suspectedDate,fuzzy=True)
|
||||
elif type(suspectedDate)==datetime:
|
||||
objDate=suspectedDate
|
||||
|
||||
|
||||
if objDate.tzinfo is None:
|
||||
objDate=pytz.timezone(localTimeZone).localize(objDate)
|
||||
objDate=utc.normalize(objDate)
|
||||
|
@ -28,7 +31,7 @@ def toUTC(suspectedDate,localTimeZone=None):
|
|||
objDate=utc.normalize(objDate)
|
||||
if objDate is not None:
|
||||
objDate=utc.normalize(objDate)
|
||||
|
||||
|
||||
return objDate.isoformat()
|
||||
|
||||
def removeDictAt(aDict):
|
||||
|
@ -51,7 +54,7 @@ def isCEF(aDict):
|
|||
#maybe it snuck in some other way
|
||||
#check some key CEF indicators (the header fields)
|
||||
if 'fields' in aDict.keys():
|
||||
|
||||
|
||||
lowerKeys=[s.lower() for s in aDict['fields'].keys()]
|
||||
if 'devicevendor' in lowerKeys and 'deviceproduct' in lowerKeys and 'deviceversion' in lowerKeys:
|
||||
return True
|
||||
|
@ -67,19 +70,19 @@ def keyMapping(aDict):
|
|||
returndict=dict()
|
||||
#save the source event for chain of custody/forensics
|
||||
#returndict['original']=aDict
|
||||
|
||||
|
||||
for k,v in aDict.iteritems():
|
||||
|
||||
|
||||
if removeAt(k.lower()) in ('message','summary'):
|
||||
returndict[u'summary']=str(v)
|
||||
|
||||
|
||||
if removeAt(k.lower()) in ('eventtime','timestamp'):
|
||||
returndict[u'utctimestamp']=toUTC(v)
|
||||
returndict[u'timestamp']=parse(v,fuzzy=True).isoformat()
|
||||
|
||||
|
||||
if removeAt(k.lower()) in ('hostname','source_host','host'):
|
||||
returndict[u'hostname']=str(v)
|
||||
|
||||
|
||||
|
||||
if removeAt(k.lower()) in ('tags'):
|
||||
if len(v)>0:
|
||||
|
@ -88,17 +91,17 @@ def keyMapping(aDict):
|
|||
#nxlog keeps the severity name in syslogseverity,everyone else should use severity or level.
|
||||
if removeAt(k.lower()) in ('syslogseverity','severity','severityvalue','level'):
|
||||
returndict[u'severity']=str(v).upper()
|
||||
|
||||
|
||||
if removeAt(k.lower()) in ('facility','syslogfacility'):
|
||||
returndict[u'facility']=str(v)
|
||||
|
||||
if removeAt(k.lower()) in ('pid','processid'):
|
||||
returndict[u'processid']=str(v)
|
||||
|
||||
|
||||
#nxlog sets sourcename to the processname (i.e. sshd), everyone else should call it process name or pname
|
||||
if removeAt(k.lower()) in ('pname','processname','sourcename'):
|
||||
returndict[u'processname']=str(v)
|
||||
|
||||
|
||||
#the file, or source
|
||||
if removeAt(k.lower()) in ('path','logger','file'):
|
||||
returndict[u'eventsource']=str(v)
|
||||
|
@ -110,28 +113,28 @@ def keyMapping(aDict):
|
|||
if removeAt(k.lower()) in ('fields','details'):
|
||||
if len(v)>0:
|
||||
returndict[u'details']=v
|
||||
|
||||
|
||||
#custom fields/details as a one off, not in an array fields.something=value or details.something=value
|
||||
if removeAt(k.lower()).startswith('fields.') or removeAt(k.lower()).startswith('details.'):
|
||||
#custom/parsed field
|
||||
returndict[unicode(k.lower().replace('fields','details'))]=str(v)
|
||||
|
||||
|
||||
if 'utctimestamp' not in returndict.keys():
|
||||
#we didn't find a reasonable timestamp, so lets set it to now:
|
||||
returndict['utctimestamp']=toUTC(datetime.now())
|
||||
|
||||
|
||||
#set the timestamp when we received it, i.e. now
|
||||
returndict['receivedtimestatmp']=toUTC(datetime.now())
|
||||
|
||||
|
||||
|
||||
return returndict
|
||||
|
||||
|
||||
def normaliseJSON(jsonStringIn):
|
||||
try:
|
||||
j=json.loads(jsonStringIn)
|
||||
j=keyMapping(j)
|
||||
return j
|
||||
|
||||
|
||||
except ValueError as ve:
|
||||
sys.stderr.write("Invalid json %r\n"%jsonStringIn)
|
||||
return None
|
||||
|
@ -142,15 +145,13 @@ def normaliseJSON(jsonStringIn):
|
|||
def callback(ch, method, properties, body):
|
||||
#print(" [*] Received %r" % (body))
|
||||
#declare elastic search connection
|
||||
#es=pyes.ES(("http",options.esserver,options.esport))
|
||||
#es=pyes.ES((list('{0}'.format(s) for s in options.esservers)))
|
||||
try:
|
||||
bodydict=json.loads(body) #raw body we were sent
|
||||
jbody=normaliseJSON(body) #normalized body with validated fields to be sent to elastic search
|
||||
|
||||
|
||||
if jbody is not None: #could be empty,or invalid
|
||||
jbody=json.JSONEncoder().encode(jbody)
|
||||
|
||||
|
||||
#figure out what type of document we are indexing and post to the elastic search index.
|
||||
doctype='event'
|
||||
if isCEF(bodydict):
|
||||
|
@ -158,12 +159,12 @@ def callback(ch, method, properties, body):
|
|||
doctype='cef'
|
||||
if 'deviceproduct' in bodydict['fields'].keys():
|
||||
doctype=bodydict['fields']['deviceproduct']
|
||||
|
||||
res=es.index(index='events',doc_type=doctype,doc=jbody)
|
||||
|
||||
res = es.save_event(body=jbody, doc_type=doctype)
|
||||
#print(' [*] elasticsearch:{0}'.format(res))
|
||||
#publish on the events topic queue
|
||||
ch.basic_publish(exchange='events',routing_key='mozdef.event',body=jbody)
|
||||
|
||||
ch.basic_publish(exchange='events',routing_key='mozdef.event',body=jbody)
|
||||
|
||||
ch.basic_ack(delivery_tag = method.delivery_tag)
|
||||
except Exception as e:
|
||||
sys.stderr.write("esworker exception in events queue %r\n"%e)
|
||||
|
@ -191,10 +192,10 @@ class kConsumer(ConsumerMixin):
|
|||
jbody=normaliseJSON(body) #normalized body with validated fields to be sent to elastic search
|
||||
else:
|
||||
sys.stderr.write("esworker exception: unknown body type received %r\n"%body)
|
||||
|
||||
|
||||
if jbody is not None: #could be empty,or invalid
|
||||
jbody=json.JSONEncoder().encode(jbody)
|
||||
|
||||
|
||||
#figure out what type of document we are indexing and post to the elastic search index.
|
||||
doctype='event'
|
||||
if isCEF(bodydict):
|
||||
|
@ -202,25 +203,24 @@ class kConsumer(ConsumerMixin):
|
|||
doctype='cef'
|
||||
if 'deviceproduct' in bodydict['fields'].keys():
|
||||
doctype=bodydict['fields']['deviceproduct']
|
||||
|
||||
res=self.esconnection.index(index='events',doc_type=doctype,doc=jbody)
|
||||
|
||||
res = self.esconnection.save_event(body=jbody, doc_type=doctype)
|
||||
#print(' [*] elasticsearch:{0}'.format(res))
|
||||
#TODO publish on the events topic queue
|
||||
|
||||
|
||||
message.ack()
|
||||
except OSError as e:
|
||||
sys.stderr.write("esworker exception in events queue %r\n"%e)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
#connect and declare the queues
|
||||
connString='amqp://guest:guest@{0}:5672//'.format(options.mqserver)
|
||||
mqConn=Connection(connString)
|
||||
eventTaskQueue=Queue(options.taskqueue)
|
||||
eventTaskQueue(mqConn).declare()
|
||||
kConsumer(mqConn,eventTaskQueue,es).run()
|
||||
|
||||
|
||||
|
||||
def initConfig():
|
||||
#change this to your default zone for when it's not specified
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
import json
|
||||
import math
|
||||
import os
|
||||
import pyes
|
||||
import pytz
|
||||
import pynsive
|
||||
import re
|
||||
|
@ -32,6 +31,11 @@ import base64
|
|||
import requests
|
||||
from threading import Timer
|
||||
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
from elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
|
||||
|
||||
# running under uwsgi?
|
||||
try:
|
||||
import uwsgi
|
||||
|
@ -303,9 +307,9 @@ def keyMapping(aDict):
|
|||
return returndict
|
||||
|
||||
|
||||
def esConnect(conn):
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return pyes.ES(server=(list('{0}'.format(s) for s in options.esservers)), bulk_size=options.esbulksize)
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(object):
|
||||
|
@ -318,7 +322,7 @@ class taskConsumer(object):
|
|||
timedelta(seconds=options.ptbackoff)
|
||||
|
||||
if options.esbulksize != 0:
|
||||
# if we are bulk posting enable a timer to occasionally flush the pyes bulker even if it's not full
|
||||
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
|
||||
# to prevent events from sticking around an idle worker
|
||||
Timer(options.esbulktimeout, self.flush_es_bulk).start()
|
||||
|
||||
|
@ -433,33 +437,28 @@ class taskConsumer(object):
|
|||
metadata['doc_type'] = normalizedDict['details']['deviceproduct']
|
||||
|
||||
try:
|
||||
bulk = False
|
||||
if options.esbulksize != 0:
|
||||
res = self.esConnection.index(
|
||||
index=metadata['index'],
|
||||
id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
doc=jbody,
|
||||
bulk=True
|
||||
)
|
||||
else:
|
||||
res = self.esConnection.index(
|
||||
index=metadata['index'],
|
||||
id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
doc=jbody,
|
||||
bulk=False
|
||||
)
|
||||
bulk = True
|
||||
|
||||
except (pyes.exceptions.NoServerAvailable, pyes.exceptions.InvalidIndexNameException) as e:
|
||||
res = self.esConnection.save_object(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect(None)
|
||||
self.esConnection = esConnect()
|
||||
#message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
return
|
||||
except pyes.exceptions.ElasticSearchException as e:
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
|
@ -667,7 +666,7 @@ if __name__ == '__main__':
|
|||
initConfig()
|
||||
|
||||
# open ES connection globally so we don't waste time opening it per message
|
||||
es = esConnect(None)
|
||||
es = esConnect()
|
||||
|
||||
# force a check for plugins and establish the plugin list
|
||||
pluginList = list()
|
||||
|
|
|
@ -14,7 +14,6 @@ import json
|
|||
import kombu
|
||||
import math
|
||||
import os
|
||||
import pyes
|
||||
import pytz
|
||||
import pynsive
|
||||
import re
|
||||
|
@ -29,6 +28,10 @@ from kombu import Connection, Queue, Exchange
|
|||
from kombu.mixins import ConsumerMixin
|
||||
from threading import Timer
|
||||
|
||||
import os
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
from elasticsearch_client import ElasticsearchClient, ElasticsearchBadServer, ElasticsearchInvalidIndex, ElasticsearchException
|
||||
|
||||
|
||||
# running under uwsgi?
|
||||
try:
|
||||
|
@ -251,9 +254,9 @@ def keyMapping(aDict):
|
|||
return returndict
|
||||
|
||||
|
||||
def esConnect(conn):
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return pyes.ES(server=(list('{0}'.format(s) for s in options.esservers)), bulk_size=options.esbulksize)
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(ConsumerMixin):
|
||||
|
@ -269,7 +272,7 @@ class taskConsumer(ConsumerMixin):
|
|||
else:
|
||||
self.muleid = 0
|
||||
if options.esbulksize != 0:
|
||||
# if we are bulk posting enable a timer to occasionally flush the pyes bulker even if it's not full
|
||||
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
|
||||
# to prevent events from sticking around an idle worker
|
||||
Timer(options.esbulktimeout, self.flush_es_bulk).start()
|
||||
|
||||
|
@ -345,33 +348,28 @@ class taskConsumer(ConsumerMixin):
|
|||
metadata['doc_type'] = normalizedDict['details']['deviceproduct']
|
||||
|
||||
try:
|
||||
bulk = False
|
||||
if options.esbulksize != 0:
|
||||
res = self.esConnection.index(
|
||||
index=metadata['index'],
|
||||
id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
doc=jbody,
|
||||
bulk=True
|
||||
)
|
||||
else:
|
||||
res = self.esConnection.index(
|
||||
index=metadata['index'],
|
||||
id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
doc=jbody,
|
||||
bulk=False
|
||||
)
|
||||
bulk = True
|
||||
|
||||
except (pyes.exceptions.NoServerAvailable, pyes.exceptions.InvalidIndexNameException) as e:
|
||||
res = self.esConnection.save_object(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect(None)
|
||||
self.esConnection = esConnect()
|
||||
message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
return
|
||||
except pyes.exceptions.ElasticSearchException as e:
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
|
@ -614,7 +612,7 @@ if __name__ == '__main__':
|
|||
initConfig()
|
||||
|
||||
# open ES connection globally so we don't waste time opening it per message
|
||||
es = esConnect(None)
|
||||
es = esConnect()
|
||||
|
||||
# force a check for plugins and establish the plugin list
|
||||
pluginList = list()
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
import json
|
||||
import math
|
||||
import os
|
||||
import pyes
|
||||
import pytz
|
||||
import pynsive
|
||||
import re
|
||||
|
@ -257,9 +256,9 @@ def keyMapping(aDict):
|
|||
return returndict
|
||||
|
||||
|
||||
def esConnect(conn):
|
||||
def esConnect():
|
||||
'''open or re-open a connection to elastic search'''
|
||||
return pyes.ES(server=(list('{0}'.format(s) for s in options.esservers)), bulk_size=options.esbulksize)
|
||||
return ElasticsearchClient((list('{0}'.format(s) for s in options.esservers)), options.esbulksize)
|
||||
|
||||
|
||||
class taskConsumer(object):
|
||||
|
@ -270,7 +269,7 @@ class taskConsumer(object):
|
|||
self.taskQueue = taskQueue
|
||||
|
||||
if options.esbulksize != 0:
|
||||
# if we are bulk posting enable a timer to occasionally flush the pyes bulker even if it's not full
|
||||
# if we are bulk posting enable a timer to occasionally flush the bulker even if it's not full
|
||||
# to prevent events from sticking around an idle worker
|
||||
Timer(options.esbulktimeout, self.flush_es_bulk).start()
|
||||
|
||||
|
@ -389,33 +388,28 @@ class taskConsumer(object):
|
|||
metadata['doc_type'] = normalizedDict['details']['deviceproduct']
|
||||
|
||||
try:
|
||||
bulk = False
|
||||
if options.esbulksize != 0:
|
||||
res = self.esConnection.index(
|
||||
index=metadata['index'],
|
||||
id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
doc=jbody,
|
||||
bulk=True
|
||||
)
|
||||
else:
|
||||
res = self.esConnection.index(
|
||||
index=metadata['index'],
|
||||
id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
doc=jbody,
|
||||
bulk=False
|
||||
)
|
||||
bulk = True
|
||||
|
||||
except (pyes.exceptions.NoServerAvailable, pyes.exceptions.InvalidIndexNameException) as e:
|
||||
res = self.esConnection.save_object(
|
||||
index=metadata['index'],
|
||||
doc_id=metadata['id'],
|
||||
doc_type=metadata['doc_type'],
|
||||
body=jbody,
|
||||
bulk=bulk
|
||||
)
|
||||
|
||||
except (ElasticsearchBadServer, ElasticsearchInvalidIndex) as e:
|
||||
# handle loss of server or race condition with index rotation/creation/aliasing
|
||||
try:
|
||||
self.esConnection = esConnect(None)
|
||||
self.esConnection = esConnect()
|
||||
#message.requeue()
|
||||
return
|
||||
except kombu.exceptions.MessageStateError:
|
||||
# state may be already set.
|
||||
return
|
||||
except pyes.exceptions.ElasticSearchException as e:
|
||||
except ElasticsearchException as e:
|
||||
# exception target for queue capacity issues reported by elastic search so catch the error, report it and retry the message
|
||||
try:
|
||||
sys.stderr.write('ElasticSearchException: {0} reported while indexing event'.format(e))
|
||||
|
@ -651,7 +645,7 @@ if __name__ == '__main__':
|
|||
initConfig()
|
||||
|
||||
# open ES connection globally so we don't waste time opening it per message
|
||||
es = esConnect(None)
|
||||
es = esConnect()
|
||||
|
||||
# force a check for plugins and establish the plugin list
|
||||
pluginList = list()
|
||||
|
|
|
@ -1,15 +1,30 @@
|
|||
import os
|
||||
import sys
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../alerts/lib"))
|
||||
|
||||
from query_models import SearchQuery, TermMatch, Aggregation
|
||||
|
||||
from unit_test_suite import UnitTestSuite
|
||||
|
||||
# Remove this code when pyes is gone!
|
||||
import os
|
||||
import sys
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), "../lib"))
|
||||
import pyes_enabled
|
||||
# Remove this code when pyes is gone!
|
||||
|
||||
|
||||
class ElasticsearchClientTest(UnitTestSuite):
|
||||
def setup(self):
|
||||
super(ElasticsearchClientTest, self).setup()
|
||||
|
||||
def get_num_events(self):
|
||||
search_query = SearchQuery()
|
||||
search_query.add_must(TermMatch('_type', 'event'))
|
||||
search_query.add_aggregation(Aggregation('_type'))
|
||||
results = search_query.execute(self.es_client)
|
||||
return results['aggregations']['_type']['terms'][0]['count']
|
||||
|
||||
|
||||
class TestWriteWithRead(ElasticsearchClientTest):
|
||||
def setup(self):
|
||||
|
@ -39,7 +54,7 @@ class TestWriteWithRead(ElasticsearchClientTest):
|
|||
'tags': ['nsm,bro,correlated'],
|
||||
'url': 'https://mana.mozilla.org/wiki/display/SECURITY/NSM+IR+procedures',
|
||||
'utctimestamp': '2016-08-19T16:40:57.851092+00:00'}
|
||||
self.saved_alert = self.es_client.save_alert(self.alert)
|
||||
self.saved_alert = self.es_client.save_alert(body=self.alert)
|
||||
self.es_client.flush('alerts')
|
||||
|
||||
def test_saved_type(self):
|
||||
|
@ -54,3 +69,154 @@ class TestWriteWithRead(ElasticsearchClientTest):
|
|||
|
||||
def test_bad_id(self):
|
||||
assert self.es_client.get_alert_by_id("123") is None
|
||||
|
||||
|
||||
class MockTransportClass:
|
||||
|
||||
def __init__(self):
|
||||
self.request_counts = 0
|
||||
self.original_function = None
|
||||
|
||||
def _send_request(self, method, path, body=None, params=None, headers=None, raw=False, return_response=False):
|
||||
self.request_counts += 1
|
||||
return self.original_function(method, path, body, params)
|
||||
|
||||
def backup_function(self, orig_function):
|
||||
self.original_function = orig_function
|
||||
|
||||
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
|
||||
self.request_counts += 1
|
||||
return self.original_function(method, url, params=params, body=body)
|
||||
|
||||
|
||||
class TestSimpleWrites(ElasticsearchClientTest):
|
||||
|
||||
def test_simple_writing(self):
|
||||
mock_class = MockTransportClass()
|
||||
|
||||
if pyes_enabled.pyes_on is True:
|
||||
mock_class.backup_function(self.es_client.es_connection._send_request)
|
||||
self.es_client.es_connection._send_request = mock_class._send_request
|
||||
else:
|
||||
mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
|
||||
self.es_client.es_connection.transport.perform_request = mock_class.perform_request
|
||||
|
||||
event_length = 10000
|
||||
events = []
|
||||
for num in range(event_length):
|
||||
events.append({"key": "value" + str(num)})
|
||||
|
||||
for event in events:
|
||||
self.es_client.save_event(body=event)
|
||||
self.es_client.flush('events')
|
||||
|
||||
assert mock_class.request_counts == 10001
|
||||
num_events = self.get_num_events()
|
||||
assert num_events == 10000
|
||||
|
||||
|
||||
class TestBulkWrites(ElasticsearchClientTest):
|
||||
|
||||
def test_bulk_writing(self):
|
||||
mock_class = MockTransportClass()
|
||||
|
||||
if pyes_enabled.pyes_on is True:
|
||||
mock_class.backup_function(self.es_client.es_connection._send_request)
|
||||
self.es_client.es_connection._send_request = mock_class._send_request
|
||||
else:
|
||||
mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
|
||||
self.es_client.es_connection.transport.perform_request = mock_class.perform_request
|
||||
|
||||
event_length = 10000
|
||||
events = []
|
||||
for num in range(event_length):
|
||||
events.append({"key": "value" + str(num)})
|
||||
|
||||
for event in events:
|
||||
self.es_client.save_event(body=event, bulk=True)
|
||||
self.es_client.flush('events')
|
||||
|
||||
assert mock_class.request_counts == 101
|
||||
num_events = self.get_num_events()
|
||||
assert num_events == 10000
|
||||
|
||||
|
||||
class TestBulkWritesWithLessThanThreshold(ElasticsearchClientTest):
|
||||
|
||||
def test_bulk_writing(self):
|
||||
mock_class = MockTransportClass()
|
||||
|
||||
if pyes_enabled.pyes_on is True:
|
||||
mock_class.backup_function(self.es_client.es_connection._send_request)
|
||||
self.es_client.es_connection._send_request = mock_class._send_request
|
||||
else:
|
||||
mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
|
||||
self.es_client.es_connection.transport.perform_request = mock_class.perform_request
|
||||
|
||||
event_length = 9995
|
||||
events = []
|
||||
for num in range(event_length):
|
||||
events.append({"key": "value" + str(num)})
|
||||
|
||||
for event in events:
|
||||
self.es_client.save_event(body=event, bulk=True)
|
||||
self.es_client.flush('events')
|
||||
|
||||
assert mock_class.request_counts == 101
|
||||
num_events = self.get_num_events()
|
||||
assert num_events == 9995
|
||||
|
||||
|
||||
class TestBulkWritesWithLessThanThreshold(ElasticsearchClientTest):
|
||||
|
||||
def test_bulk_writing(self):
|
||||
self.es_client.save_event(body={'key': 'value'}, bulk=True)
|
||||
id_match = TermMatch('_type', 'event')
|
||||
search_query = SearchQuery()
|
||||
search_query.add_must(id_match)
|
||||
results = search_query.execute(self.es_client, indices=['events'])
|
||||
assert len(results['hits']) == 0
|
||||
|
||||
event_length = 100
|
||||
events = []
|
||||
for num in range(event_length):
|
||||
events.append({"key": "value" + str(num)})
|
||||
|
||||
for event in events:
|
||||
self.es_client.save_event(body=event, bulk=True)
|
||||
self.es_client.flush('events')
|
||||
|
||||
id_match = TermMatch('_type', 'event')
|
||||
search_query = SearchQuery()
|
||||
search_query.add_must(id_match)
|
||||
results = search_query.execute(self.es_client, indices=['events'])
|
||||
assert len(results['hits']) == 101
|
||||
|
||||
# set bulk amount to 20 events
|
||||
# add 100 events and make sure requests sent is 5 and events is 100
|
||||
# add 90 events and make sure requests sent is 4 and events is 80
|
||||
# todo: add unit test for writing stuff to ES, then making sure it gets purged/flushed correctly
|
||||
# todo: add unit tests for verifying number of entries obtained via search is 1000
|
||||
# todo: verify that when you flush bulk, it writes all of the events in the queue
|
||||
|
||||
class TestWriteWithID(ElasticsearchClientTest):
|
||||
|
||||
def test_write_with_id(self):
|
||||
event = {'key': 'value'}
|
||||
saved_event = self.es_client.save_event(body=event, doc_id="12345")
|
||||
assert saved_event['_id'] == '12345'
|
||||
|
||||
|
||||
class TestWriteWithIDExists(ElasticsearchClientTest):
|
||||
|
||||
def test_write_with_id(self):
|
||||
event_id = "12345"
|
||||
event = {'key': 'value'}
|
||||
saved_event = self.es_client.save_event(body=event, doc_id=event_id)
|
||||
assert saved_event['_id'] == event_id
|
||||
event['new_key'] = 'updated_value'
|
||||
saved_event = self.es_client.save_event(body=event, doc_id=event_id)
|
||||
assert saved_event['_id'] == event_id
|
||||
self.es_client.flush('events')
|
||||
fetched_event = self.es_client.get_event_by_id(event_id)
|
||||
assert fetched_event['_source'] == event
|
||||
|
|
|
@ -24,7 +24,7 @@ class UnitTestSuite(object):
|
|||
self.reset_elasticsearch()
|
||||
|
||||
def populate_test_event(self, event, event_type='event'):
|
||||
self.es_client.save_event(event, event_type)
|
||||
self.es_client.save_event(body=event, doc_type=event_type)
|
||||
self.es_client.flush(self.index_name)
|
||||
|
||||
def setup_elasticsearch(self):
|
||||
|
|
Загрузка…
Ссылка в новой задаче