Rename alert plugins to alert actions again

This commit is contained in:
Michal Purzynski 2019-03-12 15:53:17 -07:00
Родитель 6c2509bc40
Коммит ec5be706a3
12 изменённых файлов: 0 добавлений и 364 удалений

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -1,12 +0,0 @@
{
"risk": "high", // low,medium,high values used to control display in dashboard
"description": "This alert is created based on geo ip information about the last login of a user.", // will be used to store in future
"url": "https://www.mozilla.org", // the url of the button
"url_title": "Get Help", // the button text
"duplicate": true, // wether or not multiple alerts of the same type, will show up in dashboard, or just one
"auth_id_prefix": "",
"db_table": "<DYNAMODB TABLE NAME>",
"aws_region": "<AWS REGION>",
"aws_access_key_id": "<AWS ACCESS KEY ID>",
"aws_secret_access_key": "<AWS SECRET ACCESS KEY>"
}

Просмотреть файл

@ -1,129 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2014 Mozilla Corporation
import hjson
import os
from binascii import b2a_hex
import boto3
import datetime
import json
from ipwhois import IPWhois
from mozdef_util.utilities.logger import logger
from mozdef_util.utilities.toUTC import toUTC
class message(object):
def __init__(self):
'''
sends geomodel alert to SSO dashboard
'''
self.alert_classname = 'AlertGeomodel'
config_file_path = os.path.join(os.path.dirname(__file__), 'dashboard_geomodel.json')
json_obj = {}
with open(config_file_path, "r") as fd:
try:
json_obj = hjson.load(fd)
except ValueError:
logger.error("FAILED to open the configuration file" + str(config_file_path))
self.config = json_obj
self.connect_db()
self.registration = 'geomodel'
self.priority = 1
def connect_db(self):
boto_session = boto3.session.Session(
aws_access_key_id=self.config['aws_access_key_id'],
aws_secret_access_key=self.config['aws_secret_access_key'],
region_name=self.config['aws_region']
)
dynamodb_resource = boto_session.resource('dynamodb')
table = dynamodb_resource.Table(self.config['db_table'])
self.dynamodb = table
def write_db_entry(self, alert_record):
self.dynamodb.put_item(Item=alert_record)
def onMessage(self, message):
if 'details' not in message:
return message
if 'principal' not in message['details']:
return message
if 'category' not in message['details']:
return message
if message['details']['category'].lower() != 'newcountry':
return message
full_email = message['details']['principal']
username = full_email.split('@')[0]
auth_full_username = self.config['auth_id_prefix'] + username
if 'city' not in message['details']['locality_details']:
return message
if 'country' not in message['details']['locality_details']:
return message
if 'source_ip' not in message['details']:
return message
city = message['details']['locality_details']['city']
country = message['details']['locality_details']['country']
source_ip = message['details']['source_ip']
new_ip_info = ""
try:
whois = IPWhois(source_ip).lookup_whois()
whois_str = whois['nets'][0]['description']
source_ip_isp = whois_str.replace('\n', ', ').replace('\r', '')
new_ip_info = u'{} ({})'.format(source_ip, source_ip_isp)
except Exception:
new_ip_info = u'{}'.format(source_ip)
new_location_str = u""
if city.lower() == 'unknown':
new_location_str += u'{0}'.format(country)
else:
new_location_str += u'{0}, {1}'.format(city, country)
event_timestamp = toUTC(message['events'][0]['documentsource']['details']['event_time'])
event_day = event_timestamp.strftime('%B %d, %Y')
summary = u'On {0} (UTC), did you login from {1} ({2})?'.format(event_day, new_location_str, source_ip)
previous_city = message['details']['previous_locality_details']['city']
previous_country = message['details']['previous_locality_details']['country']
if previous_city.lower() == 'unknown':
previous_location_str = u'{0}'.format(previous_country)
else:
previous_location_str = u'{0}, {1}'.format(previous_city, previous_country)
alert_record = {
'alert_id': b2a_hex(os.urandom(15)),
'alert_code': b2a_hex(self.alert_classname),
'user_id': auth_full_username,
'risk': self.config['risk'],
'summary': summary,
'description': self.config['description'],
'date': str(datetime.date.today()),
'url': self.config['url'],
'url_title': self.config['url_title'],
'duplicate': self.config['duplicate'],
'alert_str_json': json.dumps(message),
'details': {
'Timestamp': event_timestamp.strftime('%A, %B %d %Y %H:%M UTC'),
'New Location': new_location_str,
'New IP': new_ip_info,
'Previous Location': previous_location_str
}
}
self.write_db_entry(alert_record)
return message

Просмотреть файл

@ -1,5 +0,0 @@
[options]
serviceKey = <yourapiservicekey>
docs = {"alertcategory1": "https://website.with.runbooks.com/alert-runbook"}
keywords = keyword1 keyword2
clienturl = https://a.link.to.your.siem.com/alert

Просмотреть файл

@ -1,77 +0,0 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Copyright (c) 2014 Mozilla Corporation
import requests
import json
import os
from configlib import getConfig, OptionParser
class message(object):
def __init__(self):
'''
takes an incoming alert
and uses it to trigger an event using
the pager duty event api
'''
# set my own conf file
# relative path to the rest index.py file
self.configfile = os.path.join(os.path.dirname(__file__), 'pagerDutyTriggerEvent.conf')
self.options = None
if os.path.exists(self.configfile):
self.initConfiguration()
self.registration = self.options.keywords.split(" ")
self.priority = 1
def initConfiguration(self):
myparser = OptionParser()
# setup self.options by sending empty list [] to parse_args
(self.options, args) = myparser.parse_args([])
# fill self.options with plugin-specific options
# change this to your default zone for when it's not specified
self.options.serviceKey = getConfig('serviceKey', 'APIKEYHERE', self.configfile)
self.options.keywords = getConfig('keywords', 'KEYWORDS', self.configfile)
self.options.clienturl = getConfig('clienturl', 'CLIENTURL', self.configfile)
try:
self.options.docs = json.loads(getConfig('docs', {}, self.configfile))
except:
self.options.docs = {}
def onMessage(self, message):
# here is where you do something with the incoming alert message
doclink = 'unknown'
if message['category'] in self.options.docs:
doclink = self.options.docs[message['category']]
if 'summary' in message:
headers = {
'Content-type': 'application/json',
}
payload = json.dumps({
"service_key": "{0}".format(self.options.serviceKey),
"event_type": "trigger",
"description": "{0}".format(message['summary']),
"client": "MozDef",
"client_url": "https://" + self.options.clienturl + "/{0}".format(message['events'][0]['documentsource']['alerts'][0]['id']),
"contexts": [
{
"type": "link",
"href": "https://" + "{0}".format(doclink),
"text": "View runbook on mana"
}
]
})
requests.post(
'https://events.pagerduty.com/generic/2010-04-15/create_event.json',
headers=headers,
data=payload,
)
# you can modify the message if needed
# plugins registered with lower (>2) priority
# will receive the message and can also act on it
# but even if not modified, you must return it
return message

Просмотреть файл

@ -1,141 +0,0 @@
import json
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../../../alerts/plugins"))
from dashboard_geomodel import message
class TestDashboardGeomodel(object):
def mock_write_db_entry(self, alert_record):
self.test_result_record = alert_record
def mock_connect_db(self):
self.test_connect_called = True
def setup(self):
# Mock boto service required methods
message.connect_db = self.mock_connect_db
message.write_db_entry = self.mock_write_db_entry
self.test_result_record = None
self.test_connect_called = False
self.plugin = message()
self.good_message_dict = {
"category": "geomodel",
"tags": ['geomodel'],
"summary": u"ttesterson@mozilla.com NEWCOUNTRY Diamond Bar, United States access from 1.2.3.4 (duo) [deviation:12.07010770457331] last activity was from Ottawa, Canada (3763 km away) approx 23.43 hours before",
"events": [
{
u'documentsource': {
u'details': {
u'event_time': u'2018-08-08T02:11:41.85Z',
}
}
}
],
"details": {
"category": "NEWCOUNTRY",
'previous_locality_details': {
u'city': u'Oakland',
u'country': u'United States'
},
"locality_details": {
"city": "Diamond Bar",
"country": "United States"
},
'source_ip': '1.2.3.4',
"principal": "ttesterson@mozilla.com",
}
}
def test_message_good(self):
assert self.test_result_record is None
result_message = self.plugin.onMessage(self.good_message_dict)
assert result_message == self.good_message_dict
assert self.test_connect_called is True
result_db_entry = self.test_result_record
assert type(result_db_entry['alert_code']) is str
assert len(result_db_entry['alert_code']) == 26
assert type(result_db_entry['alert_id']) is str
assert len(result_db_entry['alert_id']) == 30
assert result_db_entry['alert_code'] != result_db_entry['alert_id']
assert type(result_db_entry['date']) == str
assert result_db_entry['description'] == 'This alert is created based on geo ip information about the last login of a user.'
assert result_db_entry['duplicate'] is True
assert result_db_entry['risk'] == 'high'
assert result_db_entry['summary'] == 'On August 08, 2018 (UTC), did you login from Diamond Bar, United States (1.2.3.4)?'
assert result_db_entry['url'] == 'https://www.mozilla.org'
assert result_db_entry['url_title'] == 'Get Help'
assert result_db_entry['user_id'] == 'ttesterson'
assert result_db_entry['alert_str_json'] == json.dumps(self.good_message_dict)
def test_unknown_new_city_message(self):
message_dict = self.good_message_dict
message_dict['details']['locality_details']['city'] = 'UNKNOWN'
assert self.test_result_record is None
result_message = self.plugin.onMessage(message_dict)
assert result_message == self.good_message_dict
assert self.test_connect_called is True
result_db_entry = self.test_result_record
assert type(result_db_entry['alert_code']) is str
assert result_db_entry['summary'] == 'On August 08, 2018 (UTC), did you login from United States (1.2.3.4)?'
def test_malformed_message_bad(self):
message_dict = {
"category": "geomodel",
"tags": ['geomodel'],
"summary": "ttesterson@mozilla.com MOVEMENT Diamond Bar, United States access from 1.2.3.4 (duo) [deviation:12.07010770457331] last activity was from Ottawa, Canada (3763 km away) approx 23.43 hours before",
"details": {
"category": "MOVEMENT",
"locality_details": {
"city": "Diamond Bar",
"country": "United States"
},
"principal": "ttesterson@mozilla.com",
}
}
assert self.test_result_record is None
result_message = self.plugin.onMessage(message_dict)
assert result_message == message_dict
assert self.test_connect_called is True
assert self.test_result_record is None
def test_unicode_location(self):
self.good_message_dict['summary'] = u"ttesterson@mozilla.com NEWCOUNTRY \u0107abcd, \xe4Spain access from 1.2.3.4 (duo) [deviation:12.07010770457331] last activity was from Ottawa, Canada (3763 km away) approx 23.43 hours before"
self.good_message_dict['details']['locality_details']['city'] = u'\u0107abcd'
self.good_message_dict['details']['locality_details']['country'] = u'\xe4Spain'
assert self.test_result_record is None
result_message = self.plugin.onMessage(self.good_message_dict)
assert result_message == self.good_message_dict
assert self.test_connect_called is True
assert self.test_result_record is not None
assert type(result_message['summary']) is unicode
assert type(result_message['details']['locality_details']['city']) is unicode
assert type(result_message['details']['locality_details']['country']) is unicode
def test_unicode_username(self):
self.good_message_dict['details']['principal'] = u'\xfcttesterson@mozilla.com'
assert self.test_result_record is None
result_message = self.plugin.onMessage(self.good_message_dict)
assert result_message == self.good_message_dict
assert self.test_connect_called is True
assert self.test_result_record is not None
assert type(result_message['summary']) is unicode
assert type(result_message['details']['principal']) is unicode
def test_written_details(self):
assert self.test_result_record is None
result_message = self.plugin.onMessage(self.good_message_dict)
assert result_message == self.good_message_dict
assert self.test_connect_called is True
assert self.test_result_record is not None
result_db_entry = self.test_result_record
assert result_db_entry['details'] == {
'New IP': u'1.2.3.4 (APNIC Debogon Project, APNIC Pty Ltd)',
'New Location': u'Diamond Bar, United States',
'Previous Location': u'Oakland, United States',
'Timestamp': 'Wednesday, August 08 2018 02:11 UTC'
}