diff --git a/src/tasks/dockerDeploy/acs-dcos/marathon.py b/src/tasks/dockerDeploy/acs-dcos/marathon.py index 075b047..38255fd 100644 --- a/src/tasks/dockerDeploy/acs-dcos/marathon.py +++ b/src/tasks/dockerDeploy/acs-dcos/marathon.py @@ -3,6 +3,8 @@ import logging import os import time +from dcos.mesos import Mesos + class Marathon(object): """ @@ -13,6 +15,7 @@ class Marathon(object): def __init__(self, acs_client): self.acs_client = acs_client + self.mesos = Mesos(self.acs_client) def get_request(self, path, endpoint='marathon/v2'): """ @@ -94,7 +97,6 @@ class Marathon(object): data = json.load(json_file) return data - def deploy_app(self, app_json): """ Deploys an app to marathon @@ -104,12 +106,7 @@ class Marathon(object): start_timestamp = time.time() response = self.post_request('apps', post_data=app_json) - response_json = response.json() - - if 'deployments' not in response_json: - raise Exception('Key "deployments" is missing from response: {}'.format(response_json)) - deployment_id = response_json['deployments'][0]['id'] - self._wait_for_deployment_complete(deployment_id, start_timestamp) + self._wait_for_deployment_complete(response, start_timestamp) def update_group(self, marathon_json): """ @@ -138,12 +135,7 @@ class Marathon(object): else: raise ValueError('Invalid method "{}"'.format(method)) - response_json = response.json() - - if 'deploymentId' not in response_json: - raise Exception('Key "deploymentId" is missing from response: {}'.format(response_json)) - deployment_id = response_json['deploymentId'] - self._wait_for_deployment_complete(deployment_id, start_timestamp) + self._wait_for_deployment_complete(response, start_timestamp) return response def _get_all_group_ids(self, data): @@ -183,8 +175,7 @@ class Marathon(object): """ start_timestamp = time.time() response = self.put_request('groups/{}'.format(group_id), json={'scaleBy': scale_factor}) - deployment_id = response.json().get('deploymentId') - self._wait_for_deployment_complete(deployment_id, start_timestamp) + self._wait_for_deployment_complete(response, start_timestamp) return response.json() def is_group_id_unique(self, group_id): @@ -199,32 +190,83 @@ class Marathon(object): return False - def _wait_for_deployment_complete(self, deployment_id, start_timestamp): + def _wait_for_deployment_complete(self, deployment_response, start_timestamp): """ Waits for the deployment to complete. """ + sleep_time = 5 other_deployment_in_progress = False timeout_exceeded = True + task_failed = False + deployment_json = deployment_response.json() + + if 'deploymentId' in deployment_json: + deployment_id = deployment_json['deploymentId'] + elif 'deployments' in deployment_json: + deployment_id = deployment_json['deployments'][0]['id'] + else: + raise Exception( + 'Could not find "deploymentId" in {}'.format(deployment_json)) + + service_states = {} + get_deployments_response = self.get_deployments().json() + while not self._wait_time_exceeded(self.deployment_max_wait_time, start_timestamp) \ - and not other_deployment_in_progress: - response = self.get_deployments().json() - if response: - for a_deployment in response: - if deployment_id in a_deployment['id']: - logging.info('Waiting for deployment "%s" to complete ...', deployment_id) - time.sleep(5) - else: - logging.info('Another service is being deployed. Continuing ...') - other_deployment_in_progress = True - timeout_exceeded = False - break - else: + and not other_deployment_in_progress and not task_failed: + if not get_deployments_response: timeout_exceeded = False break - if timeout_exceeded: - raise Exception('Timeout exceeded waiting for deployment "{}" to complete'.format( - deployment_id)) + a_deployment = [dep for dep in get_deployments_response if dep['id'] == deployment_id] + + if len(a_deployment) == 0: + logging.info('Another service is being deployed. Continuing ...') + other_deployment_in_progress = True + break + + a_deployment = a_deployment[0] + + for affected_app in a_deployment['affectedApps']: + service_id = affected_app.strip('/').replace('/', '_') + + if not service_id in service_states: + service_states[service_id] = None + + current_task = self.mesos.get_latest_task(service_id) + + if not current_task: + time.sleep(sleep_time) + continue + + current_state = current_task.state + if service_states[service_id] != current_state: + logging.info('Service "%s" is in state: "%s"', + service_id, current_state) + + if current_task.is_failed() or\ + current_task.is_killed(): + logging.error('Service "%s" failed with status "%s".', + service_id, current_state) + + # Write out app logs + stdout = self.mesos.get_task_log_file( + current_task, 'stdout') + stderr = self.mesos.get_task_log_file( + current_task, 'stderr') + + logging.info('stdout:\n%s', stdout) + logging.info('stderr:\n%s', stderr) + task_failed = True + break + + service_states[service_id] = current_task.get_state() + + time.sleep(sleep_time) + get_deployments_response = self.get_deployments().json() + + if task_failed or timeout_exceeded: + raise Exception('Deployment failed to complete') + return def _wait_time_exceeded(self, max_wait, timestamp): diff --git a/src/tasks/dockerDeploy/acs-dcos/mesos.py b/src/tasks/dockerDeploy/acs-dcos/mesos.py new file mode 100644 index 0000000..91eab66 --- /dev/null +++ b/src/tasks/dockerDeploy/acs-dcos/mesos.py @@ -0,0 +1,89 @@ +from mesos_task import MesosTask + +class Mesos(object): + def __init__(self, acs_client): + self.acs_client = acs_client + + def _get_request(self, endpoint, path): + """ + Makes a GET request to ACS + """ + return self.acs_client.get_request('{}/{}'.format(endpoint, path)) + + def get_task_log_file(self, task, filename): + """ + Gets the contents of a log file from the tasks sandbox + """ + url_path = task.get_sandbox_path(filename) + try: + log_file_response = self._get_request('slave', url_path) + log_json = log_file_response.json() + except: + return '' + + return '' if not log_json['data'].strip() else log_json['data'].strip() + + def _get_slave_ids(self): + """ + Gets all slave IDs in the cluster + """ + # GET /mesos/slaves/state.json + response = self._get_request('mesos/slaves', 'state.json') + response.raise_for_status() + + all_slaves = response.json() + return [slave['id'] for slave in all_slaves['slaves']] + + def _get_slave_state(self, slave_id): + """ + Gets the state.json for specified slave + """ + slave_state_response = self._get_request( + 'slave', '{}/state.json'.format(slave_id)) + slave_state_response.raise_for_status() + + slave_state_json = slave_state_response.json() + return slave_state_json + + def get_latest_task(self, service_id): + """ + Go through all frameworks and executors and get all tasks that + start with the service_id. Returns the latest task with information + needed to get the files from the sandbox + """ + framework_name = 'marathon' + slave_ids = self._get_slave_ids() + found_tasks = [] + + for slave_id in slave_ids: + slave_state_json = self._get_slave_state(slave_id) + + # Get all 'marathon' frameworks + marathon_frameworks = [] + marathon_frameworks.extend( + [f for f in slave_state_json['frameworks'] if f['name'] == framework_name]) + marathon_frameworks.extend( + [f for f in slave_state_json['completed_frameworks'] if f['name'] == framework_name]) + + # Get all executors and completed executors where 'id' of the task + # starts with the service_id + executors = [] + for framework in marathon_frameworks: + executors.extend( + [e for e in framework['executors'] if e['id'].startswith(service_id)]) + executors.extend( + [e for e in framework['completed_executors'] if e['id'].startswith(service_id)]) + + for executor in executors: + for task in executor['tasks']: + found_tasks.append(MesosTask(task, executor['directory'])) + + for task in executor['completed_tasks']: + found_tasks.append(MesosTask(task, executor['directory'])) + + # Sort the tasks, so the newest are on top + found_tasks.sort(key=lambda task: task.timestamp, reverse=True) + if len(found_tasks) == 0: + return None + + return found_tasks[0] diff --git a/src/tasks/dockerDeploy/acs-dcos/mesos_task.py b/src/tasks/dockerDeploy/acs-dcos/mesos_task.py new file mode 100644 index 0000000..2489e20 --- /dev/null +++ b/src/tasks/dockerDeploy/acs-dcos/mesos_task.py @@ -0,0 +1,53 @@ + +class MesosTask(object): + """ + Class represents a Mesos task + """ + def __init__(self, task, directory): + if not 'id' in task: + raise ValueError('Task is missing "id" ') + if not 'slave_id' in task: + raise ValueError('Task is missing "slave_id"') + if not 'framework_id' in task: + raise ValueError('Task is missing "framework_id"') + if not 'state' in task: + raise ValueError('Task is missing "state"') + if not 'statuses' in task: + raise ValueError('Task is missing "statuses"') + + self.task_id = task['id'] + self.slave_id = task['slave_id'] + self.framework_id = task['framework_id'] + self.directory = directory + self.state = task['state'] + + statuses = [ts for ts in task['statuses']] + if len(statuses) == 0: + timestamp = -1 + else: + statuses.sort(key=lambda s: s['timestamp'], reverse=True) + timestamp = statuses[0]['timestamp'] + self.timestamp = timestamp + + def get_sandbox_path(self, filename): + """ + Gets the path to the sandbox + """ + if not filename: + raise ValueError('Filename is not set') + + url_template = '{}/files/read.json?path={}/{}&length=999999&offset=0' + return url_template.format( + self.slave_id, self.directory, filename) + + def is_failed(self): + """ + Returns True if task failed, False otherwise + """ + return self.state == 'TASK_FAILED' + + def is_killed(self): + """ + Returns True if task is killed or being killed, false otherwise + """ + return self.state == 'TASK_KILLED' or self.state == 'TASK_KILLING' diff --git a/src/tasks/dockerDeploy/acs-dcos/test_mesos_task.py b/src/tasks/dockerDeploy/acs-dcos/test_mesos_task.py new file mode 100644 index 0000000..41d3358 --- /dev/null +++ b/src/tasks/dockerDeploy/acs-dcos/test_mesos_task.py @@ -0,0 +1,156 @@ +import unittest +from mesos_task import MesosTask + +class MesosTaskTest(unittest.TestCase): + def test_not_none(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'mystate', + 'statuses': [] + } + + task = MesosTask(base_task, 'directory') + self.assertIsNotNone(task) + + def test_missing_id(self): + base_task = { + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'mystate', + 'statuses': [] + } + self.assertRaises(ValueError, MesosTask, base_task, 'directory') + + def test_missing_slave_id(self): + base_task = { + 'id': 'mytask_id', + 'framework_id': 'myframework_id', + 'state': 'mystate', + 'statuses': [] + } + self.assertRaises(ValueError, MesosTask, base_task, 'directory') + + def test_missing_framework_id(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'state': 'mystate', + 'statuses': [] + } + self.assertRaises(ValueError, MesosTask, base_task, 'directory') + + def test_missing_state(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'statuses': [] + } + self.assertRaises(ValueError, MesosTask, base_task, 'directory') + + def test_missing_statuses(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'mystate' + } + self.assertRaises(ValueError, MesosTask, base_task, 'directory') + + def test_values_set(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'mystate', + 'statuses': [] + } + + task = MesosTask(base_task, 'directory') + self.assertEqual(task.task_id, 'mytask_id') + self.assertEqual(task.slave_id, 'myslave_id') + self.assertEqual(task.framework_id, 'myframework_id') + self.assertEqual(task.state, 'mystate') + self.assertEqual(task.timestamp, -1) + + def test_sandbox_path(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'mystate', + 'statuses': [] + } + task = MesosTask(base_task, 'directory') + expected = 'myslave_id/files/read.json?path=directory/myfile&length=999999&offset=0' + actual = task.get_sandbox_path('myfile') + + self.assertEqual(actual, expected) + + def test_sandbox_path_empty_filename(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'mystate', + 'statuses': [] + } + task = MesosTask(base_task, 'directory') + self.assertRaises(ValueError, task.get_sandbox_path, None) + + def test_is_failed(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'TASK_FAILED', + 'statuses': [] + } + task = MesosTask(base_task, 'directory') + self.assertTrue(task.is_failed()) + + def test_is_failed_false(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'TASK_SOMETHING', + 'statuses': [] + } + task = MesosTask(base_task, 'directory') + self.assertFalse(task.is_failed()) + + def test_is_killed(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'TASK_KILLED', + 'statuses': [] + } + task = MesosTask(base_task, 'directory') + self.assertTrue(task.is_killed()) + + def test_is_killing(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'TASK_KILLING', + 'statuses': [] + } + task = MesosTask(base_task, 'directory') + self.assertTrue(task.is_killed()) + + def test_is_killed_false(self): + base_task = { + 'id': 'mytask_id', + 'slave_id': 'myslave_id', + 'framework_id': 'myframework_id', + 'state': 'TASK_FALSE', + 'statuses': [] + } + task = MesosTask(base_task, 'directory') + self.assertFalse(task.is_killed()) \ No newline at end of file