MozDef/tests/mq/test_esworker_eventtask.py

101 строка
3.2 KiB
Python

#!/usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
import pytz
import tzlocal
import datetime
import os
import sys
def utc_timezone():
return pytz.timezone('UTC')
tzlocal.get_localzone = utc_timezone
class MockOptions():
@property
def mozdefhostname(self):
return 'sample'
class TestKeyMapping():
def teardown(self):
sys.path.remove(self.mq_path)
def setup(self):
if 'lib' in sys.modules:
del sys.modules['lib']
self.mq_path = os.path.join(os.path.dirname(__file__), "../../mq/")
sys.path.insert(0, self.mq_path)
from mq import esworker_eventtask
mock_options = MockOptions()
esworker_eventtask.options = mock_options
self.key_mapping = esworker_eventtask.keyMapping
def test_syslog_dict(self):
syslog_dict = {
'CATEGORY': 'syslog',
'DATE': 'Oct 27 14:01:12',
'FACILITY': 'daemon',
'HOST': 'ub_server',
'HOST_FROM': '10.1.20.139',
'LEGACY_MSGHDR': 'systemd[1]: ',
'MESSAGE': 'Stopped Getty on tty1.',
'PID': '1',
'PRIORITY': 'info',
'PROGRAM': 'systemd',
'SEQNUM': '8',
'SOURCE': 'syslog_tcp',
'SOURCEIP': '10.1.20.139',
'TAGS': '.source.syslog_tcp'
}
result = self.key_mapping(syslog_dict)
assert result['processid'] == '1'
assert result['processname'] == 'systemd'
assert result['severity'] == 'INFO'
assert result['mozdefhostname'] == 'sample'
assert result['hostname'] == 'ub_server'
assert result['summary'] == 'Stopped Getty on tty1.'
assert result['source'] == 'syslog_tcp'
assert result['receivedtimestamp'] != result['utctimestamp']
expected_year = datetime.datetime.now().year
assert result['utctimestamp'] == str(expected_year) + '-10-27T14:01:12+00:00'
assert result['timestamp'] == str(expected_year) + '-10-27T14:01:12+00:00'
assert result['details']['eventsourceipaddress'] == '10.1.20.139'
assert result['tags'] == ['.source.syslog_tcp']
assert result['category'] == 'syslog'
def test_tags_list(self):
tags_dict = {
'tags': ['example1']
}
result = self.key_mapping(tags_dict)
assert result['tags'] == ['example1']
def test_details_nondict(self):
message = {
'summary': 'example summary',
'payload': 'examplepayload',
'details': 'somestring',
}
result = self.key_mapping(message)
assert result['summary'] == 'example summary'
assert sorted(result['details'].keys()) == ['message', 'payload']
assert result['details']['message'] == 'somestring'
assert result['details']['payload'] == 'examplepayload'
def test_no_details(self):
message = {
'summary': 'example summary',
}
result = self.key_mapping(message)
assert result['details'] == {}