Update alert scheduler to be cleaner

This commit is contained in:
Brandon Myers 2019-08-19 15:10:13 -05:00
Родитель 1a041cf36d
Коммит b4d13f78ea
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 8AA79AD83045BBC7
3 изменённых файлов: 49 добавлений и 64 удалений

Просмотреть файл

@ -33,7 +33,7 @@ class AlertScheduleEntry(ScheduleEntry):
month_of_year=self._task['crontab']['month_of_year']
)
elif self._task['schedule_type'] == 'interval':
self.schedule = celery.schedules.schedule(datetime.timedelta(**{'seconds': self._task['interval']['every']}))
self.schedule = celery.schedules.schedule(datetime.timedelta(**{self._task['interval']['period']: self._task['interval']['every']}))
self.args = self._task['args']
self.kwargs = self._task['kwargs']

Просмотреть файл

@ -5,7 +5,6 @@ import requests
import json
from importlib import import_module
from bson.objectid import ObjectId
from celery.schedules import crontab, timedelta
from mozdef_util.utilities.logger import logger
from lib.config import ALERTS, RESTAPI_URL
@ -16,6 +15,7 @@ class CeleryRestClient(object):
if hasattr(current_app.conf, "CELERY_RESTAPI_JWT"):
self._restapi_jwt = JWTAuth(current_app.conf.CELERY_RESTAPI_JWT)
self._restapi_jwt.set_header_format('Bearer %s')
get_logger(__name__).info("setting JWT value")
else:
self._restapi_jwt = None
@ -26,9 +26,9 @@ class CeleryRestClient(object):
raise Exception("Need to define CELERY_RESTAPI_URL")
def fetch_schedule_dict(self):
resp = requests.get(self._restapi_url + "/alertsschedules", auth=self._restapi_jwt)
resp = requests.get(self._restapi_url + "/alertschedules", auth=self._restapi_jwt)
if not resp.ok:
raise Exception("Received error {0} from rest api when fetching alerts schedules".format(resp.status_code))
raise Exception("Received error {0} from rest api when fetching alert schedules".format(resp.status_code))
api_results = json.loads(resp.text)
return api_results
@ -36,25 +36,11 @@ class CeleryRestClient(object):
schedule = self.fetch_schedule_dict()
get_logger(__name__).info("**** Current Alert Schedule ****")
for alert_name, details in schedule.items():
schedule_str = 'UNKNOWN'
if 'crontab' in details:
schedule_str = 'crontab: {0} {1} {2} {3} {4}'.format(
details['crontab']['minute'],
details['crontab']['hour'],
details['crontab']['day_of_week'],
details['crontab']['day_of_month'],
details['crontab']['month_of_year'],
)
elif 'interval' in details:
schedule_str = 'interval: {0} {1}'.format(
details['interval']['every'],
details['interval']['period'],
)
get_logger(__name__).info("\t{0}: {1}".format(alert_name, schedule_str))
get_logger(__name__).info("\t{0}: {1}".format(alert_name, details['schedule_string']))
def load_and_register_alerts(self):
existing_alerts_schedules = self.fetch_schedule_dict()
alerts_schedules = {}
existing_alert_schedules = self.fetch_schedule_dict()
alert_schedules = {}
for alert_name, params in ALERTS.items():
# Register alerts in celery
try:
@ -77,13 +63,24 @@ class CeleryRestClient(object):
"name": full_path_name,
"task": full_path_name,
"enabled": True,
"args": [],
"kwargs": [],
}
if 'args' in params:
alert_schedule['args'] = params['args']
if 'kwargs' in params:
alert_schedule['kwargs'] = params['kwargs']
if isinstance(params['schedule'], timedelta):
alert_schedule['schedule_type'] = 'interval'
alert_schedule['interval'] = {
"every": params['schedule'].total_seconds(),
"period": "seconds"
}
alert_schedule['schedule_string'] = "{0} {1}".format(
params['schedule'].total_seconds(),
"seconds"
)
elif isinstance(params['schedule'], crontab):
alert_schedule['schedule_type'] = 'crontab'
alert_schedule['crontab'] = {
@ -93,18 +90,27 @@ class CeleryRestClient(object):
"day_of_month": params['schedule']._orig_day_of_month,
"month_of_year": params['schedule']._orig_month_of_year,
}
alert_schedule['schedule_string'] = "{0} {1} {2} {3} {4}".format(
params['schedule']._orig_minute,
params['schedule']._orig_hour,
params['schedule']._orig_day_of_week,
params['schedule']._orig_day_of_month,
params['schedule']._orig_month_of_year,
)
if alert_name not in existing_alerts_schedules:
alert_schedule['_id'] = str(ObjectId())
if alert_name not in existing_alert_schedules:
logger.debug("Inserting schedule for {0} into mongodb".format(full_path_name))
updated_alert_schedule = alert_schedule
else:
# Update schedule if it differs from file to api
del existing_alerts_schedules[alert_name][existing_alerts_schedules[alert_name]['schedule_type']]
existing_alerts_schedules[alert_name]['schedule_type'] = alert_schedule['schedule_type']
existing_alerts_schedules[alert_name][alert_schedule['schedule_type']] = alert_schedule[alert_schedule['schedule_type']]
updated_alert_schedule = existing_alerts_schedules[alert_name]
alerts_schedules[alert_name] = updated_alert_schedule
resp = requests.post(url=RESTAPI_URL + "/updatealertsschedules", data=json.dumps(alerts_schedules), auth=self._restapi_jwt)
del existing_alert_schedules[alert_name][existing_alert_schedules[alert_name]['schedule_type']]
existing_alert_schedules[alert_name]['schedule_type'] = alert_schedule['schedule_type']
existing_alert_schedules[alert_name][alert_schedule['schedule_type']] = alert_schedule[alert_schedule['schedule_type']]
existing_alert_schedules[alert_name]['schedule_string'] = alert_schedule['schedule_string']
existing_alert_schedules[alert_name]['enabled'] = alert_schedule['enabled']
updated_alert_schedule = existing_alert_schedules[alert_name]
alert_schedules[alert_name] = updated_alert_schedule
resp = requests.post(url=RESTAPI_URL + "/updatealertschedules", data=json.dumps(alert_schedules), auth=self._restapi_jwt)
if not resp.ok:
raise Exception("Received error {0} from rest api when updating alerts schedules".format(resp.status_code))

Просмотреть файл

@ -20,7 +20,6 @@ from ipwhois import IPWhois
from operator import itemgetter
from pymongo import MongoClient
from bson import json_util
from bson.objectid import ObjectId
from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchInvalidIndex
from mozdef_util.query_models import SearchQuery, TermMatch
@ -239,67 +238,47 @@ def index():
return response
@route('/alertsschedules')
@route('/alertsschedules/')
@route('/alertschedules')
@route('/alertschedules/')
@enable_cors
def index():
'''an endpoint to return alerts schedules'''
'''an endpoint to return alert schedules'''
if request.body:
request.body.read()
request.body.close()
response.content_type = "application/json"
mongoclient = MongoClient(options.mongohost, options.mongoport)
schedulers_db = mongoclient.meteor['alerts_schedules']
schedulers_db = mongoclient.meteor['alertschedules']
mongodb_alerts = schedulers_db.find()
alerts_schedules_dict = {}
alert_schedules_dict = {}
for mongodb_alert in mongodb_alerts:
alerts_schedules_dict[mongodb_alert['name']] = {
'_id': str(mongodb_alert['_id']),
'name': mongodb_alert['name'],
'task': mongodb_alert['task'],
'enabled': mongodb_alert['enabled'],
'args': [],
'kwargs': [],
}
if 'args' in mongodb_alert:
alerts_schedules_dict[mongodb_alert['name']]['args'] = mongodb_alert['args']
if 'kwargs' in mongodb_alert:
alerts_schedules_dict[mongodb_alert['name']]['kwargs'] = mongodb_alert['kwargs']
if 'crontab' in mongodb_alert:
alerts_schedules_dict[mongodb_alert['name']]['schedule_type'] = 'crontab'
alerts_schedules_dict[mongodb_alert['name']]['crontab'] = mongodb_alert['crontab']
elif 'interval' in mongodb_alert:
alerts_schedules_dict[mongodb_alert['name']]['schedule_type'] = 'interval'
alerts_schedules_dict[mongodb_alert['name']]['interval'] = mongodb_alert['interval']
mongodb_alert['_id'] = str(mongodb_alert['_id'])
alert_schedules_dict[mongodb_alert['name']] = mongodb_alert
response.body = json.dumps(alerts_schedules_dict)
response.body = json.dumps(alert_schedules_dict)
response.status = 200
return response
@post('/updatealertsschedules', methods=['POST'])
@post('/updatealertsschedules/', methods=['POST'])
@post('/updatealertschedules', methods=['POST'])
@post('/updatealertschedules/', methods=['POST'])
@enable_cors
def update_alerts_schedules():
def update_alert_schedules():
'''an endpoint to return alerts schedules'''
if not request.body:
response.status = 503
return response
alerts_schedules = json.loads(request.body.read())
alert_schedules = json.loads(request.body.read())
request.body.close()
response.content_type = "application/json"
mongoclient = MongoClient(options.mongohost, options.mongoport)
schedulers_db = mongoclient.meteor['alerts_schedules']
schedulers_db = mongoclient.meteor['alertschedules']
schedulers_db.remove()
for alert_name, alert_schedule in alerts_schedules.items():
if '_id' not in alert_schedule:
alert_schedule['_id'] = ObjectId()
else:
alert_schedule['_id'] = ObjectId(alert_schedule['_id'])
for alert_name, alert_schedule in alert_schedules.items():
logger.debug("Inserting schedule for {0} into mongodb".format(alert_name))
schedulers_db.insert(alert_schedule)