diff --git a/alerts/lib/celery_scheduler/alert_schedule_entry.py b/alerts/lib/celery_scheduler/alert_schedule_entry.py index d56bdf26..19827bba 100644 --- a/alerts/lib/celery_scheduler/alert_schedule_entry.py +++ b/alerts/lib/celery_scheduler/alert_schedule_entry.py @@ -33,7 +33,7 @@ class AlertScheduleEntry(ScheduleEntry): month_of_year=self._task['crontab']['month_of_year'] ) elif self._task['schedule_type'] == 'interval': - self.schedule = celery.schedules.schedule(datetime.timedelta(**{'seconds': self._task['interval']['every']})) + self.schedule = celery.schedules.schedule(datetime.timedelta(**{self._task['interval']['period']: self._task['interval']['every']})) self.args = self._task['args'] self.kwargs = self._task['kwargs'] diff --git a/alerts/lib/celery_scheduler/celery_rest_client.py b/alerts/lib/celery_scheduler/celery_rest_client.py index 5c945855..ca338e30 100644 --- a/alerts/lib/celery_scheduler/celery_rest_client.py +++ b/alerts/lib/celery_scheduler/celery_rest_client.py @@ -5,7 +5,6 @@ import requests import json from importlib import import_module -from bson.objectid import ObjectId from celery.schedules import crontab, timedelta from mozdef_util.utilities.logger import logger from lib.config import ALERTS, RESTAPI_URL @@ -16,6 +15,7 @@ class CeleryRestClient(object): if hasattr(current_app.conf, "CELERY_RESTAPI_JWT"): self._restapi_jwt = JWTAuth(current_app.conf.CELERY_RESTAPI_JWT) self._restapi_jwt.set_header_format('Bearer %s') + get_logger(__name__).info("setting JWT value") else: self._restapi_jwt = None @@ -26,9 +26,9 @@ class CeleryRestClient(object): raise Exception("Need to define CELERY_RESTAPI_URL") def fetch_schedule_dict(self): - resp = requests.get(self._restapi_url + "/alertsschedules", auth=self._restapi_jwt) + resp = requests.get(self._restapi_url + "/alertschedules", auth=self._restapi_jwt) if not resp.ok: - raise Exception("Received error {0} from rest api when fetching alerts schedules".format(resp.status_code)) + raise Exception("Received error {0} from rest api when fetching alert schedules".format(resp.status_code)) api_results = json.loads(resp.text) return api_results @@ -36,25 +36,11 @@ class CeleryRestClient(object): schedule = self.fetch_schedule_dict() get_logger(__name__).info("**** Current Alert Schedule ****") for alert_name, details in schedule.items(): - schedule_str = 'UNKNOWN' - if 'crontab' in details: - schedule_str = 'crontab: {0} {1} {2} {3} {4}'.format( - details['crontab']['minute'], - details['crontab']['hour'], - details['crontab']['day_of_week'], - details['crontab']['day_of_month'], - details['crontab']['month_of_year'], - ) - elif 'interval' in details: - schedule_str = 'interval: {0} {1}'.format( - details['interval']['every'], - details['interval']['period'], - ) - get_logger(__name__).info("\t{0}: {1}".format(alert_name, schedule_str)) + get_logger(__name__).info("\t{0}: {1}".format(alert_name, details['schedule_string'])) def load_and_register_alerts(self): - existing_alerts_schedules = self.fetch_schedule_dict() - alerts_schedules = {} + existing_alert_schedules = self.fetch_schedule_dict() + alert_schedules = {} for alert_name, params in ALERTS.items(): # Register alerts in celery try: @@ -77,13 +63,24 @@ class CeleryRestClient(object): "name": full_path_name, "task": full_path_name, "enabled": True, + "args": [], + "kwargs": [], } + if 'args' in params: + alert_schedule['args'] = params['args'] + if 'kwargs' in params: + alert_schedule['kwargs'] = params['kwargs'] + if isinstance(params['schedule'], timedelta): alert_schedule['schedule_type'] = 'interval' alert_schedule['interval'] = { "every": params['schedule'].total_seconds(), "period": "seconds" } + alert_schedule['schedule_string'] = "{0} {1}".format( + params['schedule'].total_seconds(), + "seconds" + ) elif isinstance(params['schedule'], crontab): alert_schedule['schedule_type'] = 'crontab' alert_schedule['crontab'] = { @@ -93,18 +90,27 @@ class CeleryRestClient(object): "day_of_month": params['schedule']._orig_day_of_month, "month_of_year": params['schedule']._orig_month_of_year, } + alert_schedule['schedule_string'] = "{0} {1} {2} {3} {4}".format( + params['schedule']._orig_minute, + params['schedule']._orig_hour, + params['schedule']._orig_day_of_week, + params['schedule']._orig_day_of_month, + params['schedule']._orig_month_of_year, + ) - if alert_name not in existing_alerts_schedules: - alert_schedule['_id'] = str(ObjectId()) + if alert_name not in existing_alert_schedules: logger.debug("Inserting schedule for {0} into mongodb".format(full_path_name)) updated_alert_schedule = alert_schedule else: # Update schedule if it differs from file to api - del existing_alerts_schedules[alert_name][existing_alerts_schedules[alert_name]['schedule_type']] - existing_alerts_schedules[alert_name]['schedule_type'] = alert_schedule['schedule_type'] - existing_alerts_schedules[alert_name][alert_schedule['schedule_type']] = alert_schedule[alert_schedule['schedule_type']] - updated_alert_schedule = existing_alerts_schedules[alert_name] - alerts_schedules[alert_name] = updated_alert_schedule - resp = requests.post(url=RESTAPI_URL + "/updatealertsschedules", data=json.dumps(alerts_schedules), auth=self._restapi_jwt) + del existing_alert_schedules[alert_name][existing_alert_schedules[alert_name]['schedule_type']] + existing_alert_schedules[alert_name]['schedule_type'] = alert_schedule['schedule_type'] + existing_alert_schedules[alert_name][alert_schedule['schedule_type']] = alert_schedule[alert_schedule['schedule_type']] + existing_alert_schedules[alert_name]['schedule_string'] = alert_schedule['schedule_string'] + existing_alert_schedules[alert_name]['enabled'] = alert_schedule['enabled'] + updated_alert_schedule = existing_alert_schedules[alert_name] + + alert_schedules[alert_name] = updated_alert_schedule + resp = requests.post(url=RESTAPI_URL + "/updatealertschedules", data=json.dumps(alert_schedules), auth=self._restapi_jwt) if not resp.ok: raise Exception("Received error {0} from rest api when updating alerts schedules".format(resp.status_code)) diff --git a/rest/index.py b/rest/index.py index eb0fb917..cf14b968 100644 --- a/rest/index.py +++ b/rest/index.py @@ -20,7 +20,6 @@ from ipwhois import IPWhois from operator import itemgetter from pymongo import MongoClient from bson import json_util -from bson.objectid import ObjectId from mozdef_util.elasticsearch_client import ElasticsearchClient, ElasticsearchInvalidIndex from mozdef_util.query_models import SearchQuery, TermMatch @@ -239,67 +238,47 @@ def index(): return response -@route('/alertsschedules') -@route('/alertsschedules/') +@route('/alertschedules') +@route('/alertschedules/') @enable_cors def index(): - '''an endpoint to return alerts schedules''' + '''an endpoint to return alert schedules''' if request.body: request.body.read() request.body.close() response.content_type = "application/json" mongoclient = MongoClient(options.mongohost, options.mongoport) - schedulers_db = mongoclient.meteor['alerts_schedules'] + schedulers_db = mongoclient.meteor['alertschedules'] mongodb_alerts = schedulers_db.find() - alerts_schedules_dict = {} + alert_schedules_dict = {} for mongodb_alert in mongodb_alerts: - alerts_schedules_dict[mongodb_alert['name']] = { - '_id': str(mongodb_alert['_id']), - 'name': mongodb_alert['name'], - 'task': mongodb_alert['task'], - 'enabled': mongodb_alert['enabled'], - 'args': [], - 'kwargs': [], - } - if 'args' in mongodb_alert: - alerts_schedules_dict[mongodb_alert['name']]['args'] = mongodb_alert['args'] - if 'kwargs' in mongodb_alert: - alerts_schedules_dict[mongodb_alert['name']]['kwargs'] = mongodb_alert['kwargs'] - if 'crontab' in mongodb_alert: - alerts_schedules_dict[mongodb_alert['name']]['schedule_type'] = 'crontab' - alerts_schedules_dict[mongodb_alert['name']]['crontab'] = mongodb_alert['crontab'] - elif 'interval' in mongodb_alert: - alerts_schedules_dict[mongodb_alert['name']]['schedule_type'] = 'interval' - alerts_schedules_dict[mongodb_alert['name']]['interval'] = mongodb_alert['interval'] + mongodb_alert['_id'] = str(mongodb_alert['_id']) + alert_schedules_dict[mongodb_alert['name']] = mongodb_alert - response.body = json.dumps(alerts_schedules_dict) + response.body = json.dumps(alert_schedules_dict) response.status = 200 return response -@post('/updatealertsschedules', methods=['POST']) -@post('/updatealertsschedules/', methods=['POST']) +@post('/updatealertschedules', methods=['POST']) +@post('/updatealertschedules/', methods=['POST']) @enable_cors -def update_alerts_schedules(): +def update_alert_schedules(): '''an endpoint to return alerts schedules''' if not request.body: response.status = 503 return response - alerts_schedules = json.loads(request.body.read()) + alert_schedules = json.loads(request.body.read()) request.body.close() response.content_type = "application/json" mongoclient = MongoClient(options.mongohost, options.mongoport) - schedulers_db = mongoclient.meteor['alerts_schedules'] + schedulers_db = mongoclient.meteor['alertschedules'] schedulers_db.remove() - for alert_name, alert_schedule in alerts_schedules.items(): - if '_id' not in alert_schedule: - alert_schedule['_id'] = ObjectId() - else: - alert_schedule['_id'] = ObjectId(alert_schedule['_id']) + for alert_name, alert_schedule in alert_schedules.items(): logger.debug("Inserting schedule for {0} into mongodb".format(alert_name)) schedulers_db.insert(alert_schedule)