Merge pull request #487 from mozilla/add_events_class

Create Event Class
This commit is contained in:
A Smith 2017-10-11 13:35:27 -05:00 коммит произвёл GitHub
Родитель d89c0eb6d0 c40a251695
Коммит 3f97a66a24
13 изменённых файлов: 232 добавлений и 42 удалений

Просмотреть файл

@ -9,6 +9,7 @@ from query_models import SearchQuery, TermMatch, AggregatedResults, SimpleResult
from bulk_queue import BulkQueue
from utilities.logger import logger, initLogger
from event import Event
class ElasticsearchBadServer(Exception):
@ -103,15 +104,20 @@ class ElasticsearchClient():
if not self.bulk_queue.started():
self.bulk_queue.start_timer()
def bulk_save_object(self, index, doc_type, body, doc_id=None):
self.start_bulk_timer()
self.bulk_queue.add(index=index, doc_type=doc_type, body=body, doc_id=doc_id)
def finish_bulk(self):
self.bulk_queue.stop_timer()
def save_object(self, index, doc_type, body, doc_id=None, bulk=False):
# Try and parse it as json if it's a string
def __bulk_save_document(self, index, doc_type, body, doc_id=None):
self.start_bulk_timer()
self.bulk_queue.add(index=index, doc_type=doc_type, body=body, doc_id=doc_id)
def __save_document(self, index, doc_type, body, doc_id=None, bulk=False):
if bulk:
self.__bulk_save_document(index=index, doc_type=doc_type, body=body, doc_id=doc_id)
else:
return self.es_connection.index(index=index, doc_type=doc_type, id=doc_id, body=body)
def __parse_document(self, body, doc_type):
if type(body) is str:
body = json.loads(body)
@ -121,17 +127,21 @@ class ElasticsearchClient():
doc_body = body
if '_source' in body:
doc_body = body['_source']
return doc_body, doc_type
if bulk:
self.bulk_save_object(index=index, doc_type=doc_type, body=doc_body, doc_id=doc_id)
else:
return self.es_connection.index(index=index, doc_type=doc_type, id=doc_id, body=doc_body)
def save_object(self, body, index, doc_type, doc_id=None, bulk=False):
doc_body, doc_type = self.__parse_document(body, doc_type)
return self.__save_document(index=index, doc_type=doc_type, body=doc_body, doc_id=doc_id, bulk=bulk)
def save_alert(self, body, index='alerts', doc_type='alert', doc_id=None, bulk=False):
return self.save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id, bulk=bulk)
doc_body, doc_type = self.__parse_document(body, doc_type)
return self.__save_document(index=index, doc_type=doc_type, body=doc_body, doc_id=doc_id, bulk=bulk)
def save_event(self, body, index='events', doc_type='event', doc_id=None, bulk=False):
return self.save_object(index=index, doc_type=doc_type, body=body, doc_id=doc_id, bulk=bulk)
doc_body, doc_type = self.__parse_document(body, doc_type)
event = Event(doc_body)
event.add_required_fields()
return self.__save_document(index=index, doc_type=doc_type, body=event, doc_id=doc_id, bulk=bulk)
def get_object_by_id(self, object_id, indices):
id_match = TermMatch('_id', object_id)

40
lib/event.py Normal file
Просмотреть файл

@ -0,0 +1,40 @@
from datetime import datetime
import socket
from utilities.toUTC import toUTC
class Event(dict):
# We set default vaules so that we can later
# create an alert around these, and know when events
# have to use defaults
DEFAULT_STRING = 'UNKNOWN'
def add_required_fields(self):
if 'receivedtimestamp' not in self:
self['receivedtimestamp'] = toUTC(datetime.now()).isoformat()
if 'utctimestamp' not in self:
self['utctimestamp'] = toUTC(datetime.now()).isoformat()
if 'timestamp' not in self:
self['timestamp'] = toUTC(datetime.now()).isoformat()
if 'mozdefhostname' not in self:
self['mozdefhostname'] = socket.gethostname()
if 'tags' not in self:
self['tags'] = []
if 'category' not in self:
self['category'] = self.DEFAULT_STRING
if 'hostname' not in self:
self['hostname'] = self.DEFAULT_STRING
if 'processid' not in self:
self['processid'] = self.DEFAULT_STRING
if 'processname' not in self:
self['processname'] = self.DEFAULT_STRING
if 'severity' not in self:
self['severity'] = self.DEFAULT_STRING
if 'source' not in self:
self['source'] = self.DEFAULT_STRING
if 'summary' not in self:
self['summary'] = self.DEFAULT_STRING
if 'details' not in self:
self['details'] = {}

Просмотреть файл

@ -269,7 +269,7 @@ class taskConsumer(ConsumerMixin):
if options.esbulksize != 0:
bulk = True
res = self.esConnection.save_object(
res = self.esConnection.save_event(
index=metadata['index'],
doc_id=metadata['id'],
doc_type=metadata['doc_type'],

Просмотреть файл

@ -355,7 +355,7 @@ class taskConsumer(object):
if options.esbulksize != 0:
bulk = True
res = self.esConnection.save_object(
res = self.esConnection.save_event(
index=metadata['index'],
doc_id=metadata['id'],
doc_type=metadata['doc_type'],

Просмотреть файл

@ -154,7 +154,7 @@ class taskConsumer(object):
if self.options.esbulksize != 0:
bulk = True
self.esConnection.save_object(
self.esConnection.save_event(
index=metadata['index'],
doc_id=metadata['id'],
doc_type=metadata['doc_type'],

Просмотреть файл

@ -327,7 +327,7 @@ class taskConsumer(object):
if options.esbulksize != 0:
bulk = True
res = self.esConnection.save_object(
res = self.esConnection.save_event(
index=metadata['index'],
doc_id=metadata['id'],
doc_type=metadata['doc_type'],

Просмотреть файл

@ -35,7 +35,7 @@ class QueryTestSuite(UnitTestSuite):
self.reset_elasticsearch()
self.setup_elasticsearch()
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
# Testing must

Просмотреть файл

@ -27,7 +27,7 @@ class TestAggregation(UnitTestSuite):
{"test": "value", "summary": "abvc space line"},
]
for event in events:
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()
@ -62,7 +62,7 @@ class TestAggregation(UnitTestSuite):
{"test": "value", "note": "abvc space line"},
]
for event in events:
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()
@ -93,8 +93,7 @@ class TestAggregation(UnitTestSuite):
{"test": "value", "summary": "think"},
]
for event in events:
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()
@ -132,8 +131,7 @@ class TestAggregation(UnitTestSuite):
{"test": "value", "summary": "think"},
]
for event in events:
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()
@ -165,8 +163,7 @@ class TestAggregation(UnitTestSuite):
]
for event in events:
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()
@ -192,8 +189,7 @@ class TestAggregation(UnitTestSuite):
{"test": "value", "summary": "think"},
]
for event in events:
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()
@ -208,8 +204,7 @@ class TestAggregation(UnitTestSuite):
def test_aggregation_with_default_size(self):
for num in range(0, 100):
event = {'keyname': 'value' + str(num)}
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()
@ -221,8 +216,7 @@ class TestAggregation(UnitTestSuite):
def test_aggregation_with_aggregation_size(self):
for num in range(0, 100):
event = {'keyname': 'value' + str(num)}
self.populate_test_event(event)
self.populate_test_object(event)
self.flush(self.event_index_name)
search_query = SearchQuery()

Просмотреть файл

@ -156,7 +156,6 @@ class TestExecute(SearchQueryUnitTest):
assert sorted_hits[0]['_index'] == datetime.now().strftime("events-%Y%m%d")
assert sorted_hits[0]['_source'].keys() == ['ip', 'details', 'summary']
assert sorted_hits[0]['_source']['ip'] == '1.2.3.4'
assert sorted_hits[0]['_source']['summary'] == 'Test Summary'
@ -169,20 +168,17 @@ class TestExecute(SearchQueryUnitTest):
assert sorted_hits[1]['_index'] == datetime.now().strftime("events-%Y%m%d")
assert sorted_hits[1]['_source'].keys() == ['ip', 'details', 'summary']
assert sorted_hits[1]['_source']['ip'] == '1.2.3.4'
assert sorted_hits[1]['_source']['summary'] == 'Test Summary'
assert sorted_hits[1]['_source']['details'].keys() == ['information']
assert sorted_hits[1]['_source']['details']['information'] == 'Example information'
assert sorted_hits[2].keys() == ['_score', '_type', '_id', '_source', '_index']
assert type(sorted_hits[2]['_id']) == unicode
assert sorted_hits[2]['_type'] == 'event'
assert sorted_hits[2]['_index'] == datetime.now().strftime("events-%Y%m%d")
assert sorted_hits[2]['_source'].keys() == ['ip', 'details', 'summary']
assert sorted_hits[2]['_source']['ip'] == '127.0.0.1'
assert sorted_hits[2]['_source']['summary'] == 'Test Summary'
@ -238,7 +234,6 @@ class TestExecute(SearchQueryUnitTest):
assert results['hits'][0]['_index'] == datetime.now().strftime("events-%Y%m%d")
assert results['hits'][0]['_source'].keys() == ['note', 'details', 'summary']
assert results['hits'][0]['_source']['note'] == 'Example note'
assert results['hits'][0]['_source']['summary'] == 'Test Summary'
@ -251,7 +246,6 @@ class TestExecute(SearchQueryUnitTest):
assert results['hits'][1]['_index'] == datetime.now().strftime("events-%Y%m%d")
assert results['hits'][1]['_source'].keys() == ['note', 'details', 'summary']
assert results['hits'][1]['_source']['note'] == 'Example note'
assert results['hits'][1]['_source']['summary'] == 'Test Summary'
@ -289,7 +283,6 @@ class TestExecute(SearchQueryUnitTest):
assert results['hits'][0]['_index'] == datetime.now().strftime("events-%Y%m%d")
assert results['hits'][0]['_source'].keys() == ['note', 'details', 'summary']
assert results['hits'][0]['_source']['note'] == 'Example note'
assert results['hits'][0]['_source']['summary'] == 'Test Summary'
@ -318,10 +311,12 @@ class TestExecute(SearchQueryUnitTest):
too_old_event = default_event
too_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 11})
too_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 11})
self.populate_test_event(too_old_event)
not_old_event = default_event
not_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 9})
not_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 9})
self.populate_test_event(not_old_event)
self.flush(self.event_index_name)
@ -344,10 +339,12 @@ class TestExecute(SearchQueryUnitTest):
self.populate_test_event(default_event)
default_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'minutes': 11})
default_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'minutes': 11})
self.populate_test_event(default_event)
not_old_event = default_event
not_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'minutes': 9})
not_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'minutes': 9})
self.populate_test_event(not_old_event)
self.flush(self.event_index_name)
@ -370,10 +367,12 @@ class TestExecute(SearchQueryUnitTest):
self.populate_test_event(default_event)
default_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'hours': 11})
default_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'hours': 11})
self.populate_test_event(default_event)
not_old_event = default_event
not_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'hours': 9})
not_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'hours': 9})
self.populate_test_event(not_old_event)
self.flush(self.event_index_name)
@ -396,10 +395,12 @@ class TestExecute(SearchQueryUnitTest):
self.populate_test_event(default_event)
default_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 11})
default_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 11})
self.populate_test_event(default_event)
not_old_event = default_event
not_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 9})
not_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 9})
self.populate_test_event(not_old_event)
self.flush(self.event_index_name)
@ -422,10 +423,12 @@ class TestExecute(SearchQueryUnitTest):
self.populate_test_event(default_event)
default_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 11})
default_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 11})
self.populate_test_event(default_event)
not_old_event = default_event
not_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 9})
not_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'days': 9})
self.populate_test_event(not_old_event)
self.flush(self.event_index_name)
@ -446,7 +449,7 @@ class TestExecute(SearchQueryUnitTest):
}
}
self.populate_test_event(default_event)
self.populate_test_object(default_event)
self.flush(self.event_index_name)
results = query.execute(self.es_client)
@ -504,10 +507,12 @@ class TestExecute(SearchQueryUnitTest):
too_old_event = default_event
too_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 11})
too_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 11})
self.populate_test_event(too_old_event)
not_old_event = default_event
not_old_event['receivedtimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 9})
not_old_event['utctimestamp'] = UnitTestSuite.subtract_from_timestamp({'seconds': 9})
self.populate_test_event(not_old_event)
self.flush(self.event_index_name)

Просмотреть файл

@ -21,6 +21,7 @@ sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
from unit_test_suite import UnitTestSuite
import time
import json
from elasticsearch_client import ElasticsearchClient, ElasticsearchInvalidIndex
import pytest
@ -123,7 +124,7 @@ class TestWithBadIndex(ElasticsearchClientTest):
class TestSimpleWrites(ElasticsearchClientTest):
def test_simple_writing(self):
def test_simple_writing_event_dict(self):
mock_class = MockTransportClass()
mock_class.backup_function(self.es_client.es_connection.transport.perform_request)
self.es_client.es_connection.transport.perform_request = mock_class.perform_request
@ -141,6 +142,49 @@ class TestSimpleWrites(ElasticsearchClientTest):
num_events = self.get_num_events()
assert num_events == 100
def test_simple_writing_event_string(self):
event = json.dumps({"key": "example value for string of json test"})
self.es_client.save_event(body=event)
self.flush(self.event_index_name)
num_events = self.get_num_events()
assert num_events == 1
query = SearchQuery()
query.add_must(ExistsMatch('key'))
results = query.execute(self.es_client)
assert sorted(results['hits'][0].keys()) == ['_id', '_index', '_score', '_source', '_type']
assert results['hits'][0]['_source']['key'] == 'example value for string of json test'
assert len(results['hits']) == 1
assert results['hits'][0]['_type'] == 'event'
def test_writing_event_defaults(self):
query = SearchQuery()
default_event = {}
self.populate_test_event(default_event)
self.flush(self.event_index_name)
query.add_must(ExistsMatch('summary'))
results = query.execute(self.es_client)
assert len(results['hits']) == 1
assert sorted(results['hits'][0].keys()) == ['_id', '_index', '_score', '_source', '_type']
saved_event = results['hits'][0]['_source']
assert 'category' in saved_event
assert 'details' in saved_event
assert 'hostname' in saved_event
assert 'mozdefhostname' in saved_event
assert 'processid' in saved_event
assert 'processname' in saved_event
assert 'receivedtimestamp' in saved_event
assert 'severity' in saved_event
assert 'source' in saved_event
assert 'summary' in saved_event
assert 'tags' in saved_event
assert 'timestamp' in saved_event
assert 'utctimestamp' in saved_event
assert 'category' in saved_event
def test_writing_with_type(self):
query = SearchQuery()
default_event = {
@ -159,8 +203,10 @@ class TestSimpleWrites(ElasticsearchClientTest):
query.add_must(ExistsMatch('summary'))
results = query.execute(self.es_client)
assert len(results['hits']) == 1
assert sorted(results['hits'][0].keys()) == ['_id', '_index', '_score', '_source', '_type']
assert results['hits'][0]['_type'] == 'example'
assert results['hits'][0]['_source'] == default_event['_source']
assert results['hits'][0]['_source']['summary'] == 'Test summary'
assert results['hits'][0]['_source']['details'] == {"note": "Example note"}
def test_writing_with_source(self):
query = SearchQuery()
@ -179,6 +225,7 @@ class TestSimpleWrites(ElasticsearchClientTest):
query.add_must(ExistsMatch('summary'))
results = query.execute(self.es_client)
assert len(results['hits']) == 1
assert sorted(results['hits'][0].keys()) == ['_id', '_index', '_score', '_source', '_type']
assert results['hits'][0]['_type'] == 'event'
@ -287,7 +334,6 @@ class TestWriteWithIDExists(ElasticsearchClientTest):
assert saved_event['_id'] == event_id
self.flush(self.event_index_name)
fetched_event = self.es_client.get_event_by_id(event_id)
assert fetched_event['_source'] == event
class TestGetIndices(ElasticsearchClientTest):

91
tests/lib/test_event.py Normal file
Просмотреть файл

@ -0,0 +1,91 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../../lib"))
from event import Event
from utilities.toUTC import toUTC
import socket
class MockHostname(object):
def hostname(self):
return 'randomhostname'
class TestEvent(object):
def setup(self):
self.params = {
'summary': 'example summary',
'somefield': 'HIGH',
}
self.event = Event(self.params)
def test_basic_init(self):
event = Event()
assert type(event) is Event
def test_getitem(self):
assert self.event['summary'] == self.params['summary']
assert self.event['somefield'] == self.params['somefield']
def test_setitem(self):
assert self.event['summary'] == self.params['summary']
self.event['summary'] = 'other summary value'
assert self.event['summary'] == 'other summary value'
def test_add_required_fields_default(self):
mock_class = MockHostname()
socket.gethostname = mock_class.hostname
self.event.add_required_fields()
assert self.event['receivedtimestamp'] is not None
assert toUTC(self.event['receivedtimestamp']).isoformat() == self.event['receivedtimestamp']
assert self.event['utctimestamp'] is not None
assert toUTC(self.event['utctimestamp']).isoformat() == self.event['utctimestamp']
assert self.event['timestamp'] is not None
assert toUTC(self.event['timestamp']).isoformat() == self.event['timestamp']
assert self.event['mozdefhostname'] == 'randomhostname'
assert self.event['tags'] == []
assert self.event['category'] == 'UNKNOWN'
assert self.event['hostname'] == 'UNKNOWN'
assert self.event['processid'] == 'UNKNOWN'
assert self.event['processname'] == 'UNKNOWN'
assert self.event['severity'] == 'UNKNOWN'
assert self.event['source'] == 'UNKNOWN'
assert self.event['summary'] == 'example summary'
assert self.event['tags'] == []
assert self.event['details'] == {}
def test_add_required_fields(self):
params = {
'receivedtimestamp': '2017-09-14T20:05:20.779595+00:00',
'utctimestamp': '2017-09-14T20:05:20.299387+00:00',
'timestamp': '2017-09-14T20:05:19.116195+00:00',
'mozdefhostname': 'randomhostname',
'tags': [],
'category': 'Authentication',
'hostname': 'host.domain.com',
'processid': 12345,
'processname': '/bin/testproc',
'severity': 'HIGH',
'source': '/var/log/syslog/mozdef.log',
'summary': 'example summary',
'tags': ['example'],
'details': {
'firstkey': 'firstvalue',
}
}
event = Event(params)
event.add_required_fields()
assert event['receivedtimestamp'] == '2017-09-14T20:05:20.779595+00:00'
assert event['utctimestamp'] == '2017-09-14T20:05:20.299387+00:00'
assert event['timestamp'] == '2017-09-14T20:05:19.116195+00:00'
assert event['mozdefhostname'] == 'randomhostname'
assert event['category'] == 'Authentication'
assert event['hostname'] == 'host.domain.com'
assert event['processid'] == 12345
assert event['processname'] == '/bin/testproc'
assert event['severity'] == 'HIGH'
assert event['source'] == '/var/log/syslog/mozdef.log'
assert event['summary'] == 'example summary'
assert event['tags'] == ['example']
assert event['details'] == {'firstkey': 'firstvalue'}

Просмотреть файл

@ -70,6 +70,7 @@ class TestEsworkerSNSSQS(UnitTestSuite):
u'processname': u'dhclient',
u'receivedtimestamp': u'2017-05-26T17:47:17.813876+00:00',
u'severity': u'INFO',
u'source': u'UNKNOWN',
u'summary': u'DHCPREQUEST of 1.2.3.4 on eth0 to 5.6.7.8 port 67 (xid=0x123456)',
u'tags': [u'example-logs-mozdef'],
u'timestamp': u'2017-05-25T07:14:15+00:00',

Просмотреть файл

@ -100,6 +100,9 @@ class UnitTestSuite(object):
def populate_test_event(self, event, event_type='event'):
self.es_client.save_event(body=event, doc_type=event_type)
def populate_test_object(self, event, event_type='event'):
self.es_client.save_object(index='events', body=event, doc_type=event_type)
def setup_elasticsearch(self):
default_mapping_file = os.path.join(os.path.dirname(__file__), "../config/defaultMappingTemplate.json")
mapping_str = ''