MozDef/alerts/plugins/ip_source_enrichment.py

153 строки
4.5 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# Copyright (c) 2014 Mozilla Corporation
import json
from operator import add
import os
import re
import functools
import netaddr
CONFIG_FILE = os.path.join(
os.path.dirname(__file__),
'ip_source_enrichment.json')
def _find_ip_addresses(string):
'''List all of the IPv4 and IPv6 addresses found in a string.'''
ipv4_rx = r'(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)'
ipv6_rx = r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
ipv4 = re.findall(ipv4_rx, string)
ipv6_map = map(
lambda match: match[0] if isinstance(match, tuple) else match,
re.findall(ipv6_rx, string))
ipv6 = [x for x in ipv6_map]
return ipv4 + ipv6
def enrich(alert, known_ips):
'''Add information to alerts containing IP addresses that describes
the source location of the IP address if it can be determined based
on a configured mapping.
'''
def find_ips(value):
if isinstance(value, str):
return _find_ip_addresses(value)
if isinstance(value, list) or isinstance(value, tuple):
found = [find_ips(item) for item in value]
return functools.reduce(add, found, [])
if isinstance(value, dict):
found = [find_ips(item) for item in value.values()]
return functools.reduce(add, found, [])
return []
def ip_in_range(ip):
return lambda known: netaddr.IPAddress(ip) in netaddr.IPSet([known['range']])
ips = find_ips(alert)
alert = alert.copy()
if 'details' not in alert:
alert['details'] = {}
alert['details']['sites'] = []
for ip in set(ips):
matching_descriptions = filter(ip_in_range(ip), known_ips)
for desc in matching_descriptions:
enriched = desc['format'].format(ip, desc['site'])
alert['summary'] += '; ' + enriched
alert['details']['sites'].append({
'ip': ip,
'site': desc['site'],
})
return alert
def _load_config(file_path):
'''Private
Read and parse a file from disk as JSON into a dictionary.
'''
with open(file_path) as config_file:
return json.load(config_file)
class message(object):
'''Alert plugin interface that handles messages (alerts).
This plugin will look for IP addresses in any of the values of an
alert dictionary. For each IP address found, it will append some
text to the summary of the alert to provide more information
about where the IP originates from if it is recognized.
The expected format of the configuration file,
`ip_source_enrichment.json.conf`, is as follows:
```json
{
"known": [
{
"range": "1.2.3.4/8",
"site": "office1",
"format": "IPv4 {0} is from {1}"
},
{
"range": "1a2b:3c4d:123::/48",
"site": "office2",
"format": "IPv6 {0} is from {1}"
}
]
}
```
The format string can accept zero to two parameters. The first
will be the IP address found and the second will be the
value of the corresponding 'site'.
The modified alert will have a `details.sites` field added to it,
with the following form:
```json
{
"details": {
"sites": [
{
"ip": "1.2.3.4",
"site": "office1"
},
{
"ip": "1a2b:3c4d:123::",
"site": "office2"
}
]
}
}
```
'''
def __init__(self):
# Run plugin on all alerts
self.registration = ['*']
self._config = _load_config(CONFIG_FILE)
def onMessage(self, message):
known_ips = self._config.get('known', [])
return enrich(message, known_ips)