Add function to append hostname to ip for ssh lateral alert

This commit is contained in:
Brandon Myers 2018-11-05 13:04:01 -06:00
Родитель 80878c7780
Коммит 866ad7798a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 8AA79AD83045BBC7
5 изменённых файлов: 142 добавлений и 13 удалений

Просмотреть файл

@ -10,6 +10,8 @@ import json
import kombu
import os
import sys
import socket
import netaddr
from configlib import getConfig, OptionParser
from datetime import datetime
@ -58,6 +60,17 @@ def getValueByPath(input_dict, path_string):
return return_data
def add_hostname_to_ip(ip, output_format, require_internal=True):
ip_obj = netaddr.IPNetwork(ip)[0]
if require_internal and not ip_obj.is_private():
return ip
try:
reversed_dns = socket.gethostbyaddr(ip)
return output_format.format(ip, reversed_dns[0])
except socket.herror:
return ip
class AlertTask(Task):
abstract = True

Просмотреть файл

@ -5,10 +5,8 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2017 Mozilla Corporation
from lib.alerttask import AlertTask
from mozdef_util.query_models import SearchQuery, TermMatch, QueryStringMatch, PhraseMatch
import json
import sys
from lib.alerttask import AlertTask, add_hostname_to_ip
from mozdef_util.query_models import SearchQuery, TermMatch, PhraseMatch
import re
import netaddr
@ -78,7 +76,7 @@ class SshLateral(AlertTask):
self._config = self.parse_json_alert_config('ssh_lateral.json')
def main(self):
search_query = SearchQuery(minutes=2)
search_query = SearchQuery(minutes=15)
search_query.add_must([
TermMatch('category', 'syslog'),
TermMatch('details.program', 'sshd'),
@ -123,8 +121,8 @@ class SshLateral(AlertTask):
# Determine if the origin of the connection was from a source outside
# of the exception policy, and in our address scope
candidates = []
sampleip = None
sampleuser = None
source_ips = []
users = []
for x in aggreg['events']:
m = re.match('Accepted publickey for (\S+) from (\S+).*', x['_source']['summary'])
if m is not None and len(m.groups()) == 2:
@ -150,14 +148,16 @@ class SshLateral(AlertTask):
# Check our exception list
if self.exception_check(m.group(1), srchost, m.group(2)):
continue
if sampleip is None:
sampleip = m.group(2)
if sampleuser is None:
sampleuser = m.group(1)
source_ips.append(m.group(2))
users.append(m.group(1))
candidates.append(x)
if len(candidates) == 0:
return None
summary = 'SSH lateral movement outside policy: access to {} from {} as {}'.format(srchost, sampleip, sampleuser)
src_hosts_info = []
for source_ip in source_ips:
src_hosts_info.append(add_hostname_to_ip(source_ip, '{0} ({1})'))
summary = 'SSH lateral movement outside policy: access to {} from {} as {}'.format(srchost, ','.join(src_hosts_info), ','.join(users))
return self.createAlertDict(summary, category, tags, aggreg['events'], severity)

Просмотреть файл

@ -13,6 +13,7 @@ sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
from unit_test_suite import UnitTestSuite
from freezegun import freeze_time
import mock
import copy
import re
@ -22,6 +23,13 @@ sys.path.append(os.path.join(os.path.dirname(__file__), "../../alerts/lib"))
from lib import alerttask
def mock_add_hostname_to_ip(ip):
if ip == '10.2.3.4':
return ['mock_hostname1.mozilla.org', ip]
else:
return ['mock.mozilla.org', ip]
class AlertTestSuite(UnitTestSuite):
def teardown(self):
os.chdir(self.orig_path)
@ -142,7 +150,8 @@ class AlertTestSuite(UnitTestSuite):
self.flush('events')
alert_task = test_case.run(alert_filename=self.alert_filename, alert_classname=self.alert_classname)
with mock.patch("socket.gethostbyaddr", side_effect=mock_add_hostname_to_ip):
alert_task = test_case.run(alert_filename=self.alert_filename, alert_classname=self.alert_classname)
self.verify_alert_task(alert_task, test_case)
def verify_rabbitmq_alert(self, found_alert, test_case):

Просмотреть файл

@ -0,0 +1,29 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../../../alerts/lib"))
from alerttask import add_hostname_to_ip
import mock
import socket
def reverse_lookup(ip):
if ip == '10.1.1.1':
return ('test.domain.com', ip)
if ip == '8.8.8.8':
raise socket.herror
class TestAddHostnameToIP(object):
def setup(self):
self.formatted_string = '{0} ({1})'
def test_internal_hostname(self):
with mock.patch("socket.gethostbyaddr", side_effect=reverse_lookup):
hostname_info = add_hostname_to_ip('10.1.1.1', self.formatted_string)
assert hostname_info == '10.1.1.1 (test.domain.com)'
def test_external_hostname(self):
with mock.patch("socket.gethostbyaddr", side_effect=reverse_lookup):
hostname_info = add_hostname_to_ip('8.8.8.8', self.formatted_string)
assert hostname_info == '8.8.8.8'

Просмотреть файл

@ -0,0 +1,78 @@
from positive_alert_test_case import PositiveAlertTestCase
from negative_alert_test_case import NegativeAlertTestCase
from alert_test_suite import AlertTestSuite
class TestSSHLateral(AlertTestSuite):
alert_filename = 'ssh_lateral'
alert_classname = 'SshLateral'
# This event is the default positive event that will cause the
# alert to trigger
default_event = {
'_type': 'event',
'_source': {
'category': 'syslog',
'hostname': 'test-host.enterprise.mozilla.com',
'summary': 'Accepted publickey for user1 from 10.2.3.4 port 19936 ssh2: RSA SHA256:ET72afGGbxabDersgSdQ+xJYB6ILXOFSDsLsTqDs',
'details': {
'program': 'sshd'
}
}
}
# This alert is the expected result from running this task
default_alert = {
'category': 'session',
'severity': 'WARNING',
'summary': 'SSH lateral movement outside policy: access to test-host.enterprise.mozilla.com from 10.2.3.4 (mock_hostname1.mozilla.org) as user1',
'tags': ['sshd', 'syslog'],
}
test_cases = []
test_cases.append(
PositiveAlertTestCase(
description='Positive test case with good event',
events=[AlertTestSuite.create_event(default_event)],
expected_alert=default_alert
)
)
event = AlertTestSuite.create_event(default_event)
event['_source']['category'] = 'bad'
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with bad event category",
events=[event],
)
)
event = AlertTestSuite.create_event(default_event)
event['_source']['summary'] = 'some bad summary'
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with bad event summary",
events=[event],
)
)
event = AlertTestSuite.create_event(default_event)
event['_source']['details']['program'] = 'ftpd'
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with bad event details.program",
events=[event],
)
)
event = AlertTestSuite.create_event(default_event)
event['_source']['utctimestamp'] = AlertTestSuite.subtract_from_timestamp_lambda({'minutes': 16})
event['_source']['receivedtimestamp'] = AlertTestSuite.subtract_from_timestamp_lambda({'minutes': 16})
test_cases.append(
NegativeAlertTestCase(
description="Negative test case with an event with old timestamp",
events=[event],
)
)