зеркало из https://github.com/mozilla/MozDef.git
Fixup
This commit is contained in:
Родитель
58d6da7d31
Коммит
3148479ffd
|
@ -3,12 +3,12 @@
|
|||
{
|
||||
"ipVersion": 4,
|
||||
"range": "8.32.0.0/16",
|
||||
"format": "{1} is in OFFICE1."
|
||||
"format": "{0} is in OFFICE1."
|
||||
},
|
||||
{
|
||||
"ipVersion": 6,
|
||||
"range": "4a00:7a49:232::/48",
|
||||
"format": "{1} is in OFFICE2."
|
||||
"format": "{0} is in OFFICE2."
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
# Copyright (c) 2014 Mozilla Corporation
|
||||
|
||||
from operator import add
|
||||
import os
|
||||
import re
|
||||
|
||||
|
@ -14,20 +15,6 @@ CONFIG_FILE = os.path.join(
|
|||
'ip_source_enrichment.json.conf')
|
||||
|
||||
|
||||
def _isIPv4(ip):
|
||||
try:
|
||||
return netaddr.valid_ipv4(ip)
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def _isIPv6(ip):
|
||||
try:
|
||||
return netaddr.valid_ipv6(ip)
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def _find_ip_addresses(string):
|
||||
'''List all of the IPv4 and IPv6 addresses found in a string.'''
|
||||
|
||||
|
@ -43,6 +30,42 @@ def enrich(alert, known_ips):
|
|||
on a configured mapping.
|
||||
'''
|
||||
|
||||
def find_ips(value):
|
||||
if isinstance(value, str):
|
||||
return _find_ip_addresses(value)
|
||||
|
||||
if isinstance(value, list) or isinstance(value, tuple):
|
||||
found = [find_ips(item) for item in value]
|
||||
return reduce(add, found, [])
|
||||
|
||||
if isinstance(value, dict):
|
||||
found = [find_ips(item) for item in value.values()]
|
||||
return reduce(add, found, [])
|
||||
|
||||
return []
|
||||
|
||||
ips = find_ips(alert)
|
||||
|
||||
alert = alert.copy()
|
||||
|
||||
for ip in set(ips):
|
||||
if netaddr.valid_ipv6(ip):
|
||||
ip = ip[0]
|
||||
|
||||
ip_address = netaddr.IPAddress(ip)
|
||||
|
||||
if isinstance(ip_address, tuple):
|
||||
ip_address = netaddr.IPAddress(ip_address[0])
|
||||
|
||||
matching_descriptions = filter(
|
||||
lambda known: ip_address in netaddr.IPSet([known['range']]),
|
||||
known_ips)
|
||||
|
||||
for desc in matching_descriptions:
|
||||
enriched = desc['format'].format(ip)
|
||||
|
||||
alert['summary'] += '; ' + enriched
|
||||
|
||||
return alert
|
||||
|
||||
|
||||
|
|
|
@ -6,16 +6,22 @@ sys.path.append(plugin_path)
|
|||
|
||||
from ip_source_enrichment import enrich
|
||||
|
||||
|
||||
good_ipv4 = '255.0.1.2'
|
||||
good_ipv6 = '3001:4d9c:b29:12f0::1'
|
||||
bad_ipv4 = '192.168.0.1'
|
||||
bad_ipv6 = '2001:db8:a0b:12f0::1'
|
||||
|
||||
known_ips = [
|
||||
{
|
||||
'ipVersion': 4,
|
||||
'range': '255.0.1.0/8',
|
||||
'format': '{1} known',
|
||||
'range': good_ipv4 + '/8',
|
||||
'format': '{0} known',
|
||||
},
|
||||
{
|
||||
'ipVersion': 6,
|
||||
'range': 'a02b:0db8:beef::/48',
|
||||
'format': '{1} known',
|
||||
'range': good_ipv6 + '/64',
|
||||
'format': '{0} known',
|
||||
}
|
||||
]
|
||||
|
||||
|
@ -24,8 +30,8 @@ alert_with_ipv4 = {
|
|||
'tags': ['portscan'],
|
||||
'summary': 'this is a test alert',
|
||||
'details': {
|
||||
'sourceipaddress': '255.0.1.2',
|
||||
'destinationipaddress': '192.168.0.1',
|
||||
'sourceipaddress': good_ipv4,
|
||||
'destinationipaddress': bad_ipv4,
|
||||
'ports': [22, 9001, 25505, 65534]
|
||||
}
|
||||
}
|
||||
|
@ -35,8 +41,8 @@ alert_with_ipv6 = {
|
|||
'tags': ['test'],
|
||||
'summary': 'Another test alert',
|
||||
'deails': {
|
||||
'sourceipaddress': 'a02b:0db8:beef:32cc:4122:0000',
|
||||
'destinationipaddress': 'abcd:beef:3232:9001:0000:1234',
|
||||
'sourceipaddress': good_ipv6,
|
||||
'destinationipaddress': bad_ipv6,
|
||||
'port': [22, 9001, 24404, 65532]
|
||||
}
|
||||
}
|
||||
|
@ -44,14 +50,14 @@ alert_with_ipv6 = {
|
|||
alert_with_ipv4_in_summary = {
|
||||
'category': 'test',
|
||||
'tags': ['ip', 'in', 'summary'],
|
||||
'summary': 'Testing:255.0.1.232 is a random IP in a poorly formatted string',
|
||||
'summary': 'Testing:{0} is a random IP in a poorly formatted string'.format(good_ipv4),
|
||||
'details': {}
|
||||
}
|
||||
|
||||
alert_with_ipv6_in_summary = {
|
||||
'category': 'test',
|
||||
'tags': ['ip', 'in', 'summary'],
|
||||
'summary': 'Found IPs ["a02b:0db8:beef:32cc:4122:0000"]',
|
||||
'summary': 'Found IPs ["{0}"]'.format(good_ipv6),
|
||||
'details': {}
|
||||
}
|
||||
|
||||
|
@ -60,29 +66,29 @@ class TestIPSourceEnrichment(object):
|
|||
def test_ipv4_addrs_enriched(self):
|
||||
enriched = enrich(alert_with_ipv4, known_ips)
|
||||
|
||||
assert '255.0.1.2 known' in enriched['summary']
|
||||
assert '{0} known'.format(good_ipv4) in enriched['summary']
|
||||
|
||||
def test_ipv6_addrs_enriched(self):
|
||||
enriched = enrich(alert_with_ipv6, known_ips)
|
||||
|
||||
assert 'a02b:0db8:beef:32cc:4122:0000 known' in enriched['summary']
|
||||
assert '{0} known'.format(good_ipv6) in enriched['summary']
|
||||
|
||||
def test_ipv4_addrs_in_summary_enriched(self):
|
||||
enriched = enrich(alert_with_ipv4_in_summary, known_ips)
|
||||
|
||||
assert '255.0.1.232 known' in enriched['summary']
|
||||
assert '{0} known'.format(good_ipv4) in enriched['summary']
|
||||
|
||||
def test_ipv6_addrs_in_summary_enriched(self):
|
||||
enriched = enrich(alert_with_ipv6_in_summary, known_ips)
|
||||
|
||||
assert 'a02b:0db8:beef:32cc:4122:0000 known' in enriched['summary']
|
||||
assert '{0} known'.format(good_ipv6) in enriched['summary']
|
||||
|
||||
def test_unrecognized_ipv4_addrs_not_enriched(self):
|
||||
enriched = enrich(alert_with_ipv4, known_ips)
|
||||
|
||||
assert '192.168.0.1 known' not in enriched['summary']
|
||||
assert '{0} known'.format(bad_ipv4) not in enriched['summary']
|
||||
|
||||
def test_unrecognized_ipv6_addrs_not_enriched(self):
|
||||
enriched = enrich(alert_with_ipv6, known_ips)
|
||||
|
||||
assert 'abcd:beef:3232:9001:0000:1234 known' not in enriched['summary']
|
||||
assert '{0} known'.format(bad_ipv6) not in enriched['summary']
|
||||
|
|
Загрузка…
Ссылка в новой задаче