Showing service statuses as they change during deployment. Logging the stdout and stderr in case any task fails during deployment
This commit is contained in:
Родитель
660af9dd64
Коммит
2b33770ca6
|
@ -3,6 +3,8 @@ import logging
|
|||
import os
|
||||
import time
|
||||
|
||||
from dcos.mesos import Mesos
|
||||
|
||||
|
||||
class Marathon(object):
|
||||
"""
|
||||
|
@ -13,6 +15,7 @@ class Marathon(object):
|
|||
|
||||
def __init__(self, acs_client):
|
||||
self.acs_client = acs_client
|
||||
self.mesos = Mesos(self.acs_client)
|
||||
|
||||
def get_request(self, path, endpoint='marathon/v2'):
|
||||
"""
|
||||
|
@ -94,7 +97,6 @@ class Marathon(object):
|
|||
data = json.load(json_file)
|
||||
return data
|
||||
|
||||
|
||||
def deploy_app(self, app_json):
|
||||
"""
|
||||
Deploys an app to marathon
|
||||
|
@ -104,12 +106,7 @@ class Marathon(object):
|
|||
|
||||
start_timestamp = time.time()
|
||||
response = self.post_request('apps', post_data=app_json)
|
||||
response_json = response.json()
|
||||
|
||||
if 'deployments' not in response_json:
|
||||
raise Exception('Key "deployments" is missing from response: {}'.format(response_json))
|
||||
deployment_id = response_json['deployments'][0]['id']
|
||||
self._wait_for_deployment_complete(deployment_id, start_timestamp)
|
||||
self._wait_for_deployment_complete(response, start_timestamp)
|
||||
|
||||
def update_group(self, marathon_json):
|
||||
"""
|
||||
|
@ -138,12 +135,7 @@ class Marathon(object):
|
|||
else:
|
||||
raise ValueError('Invalid method "{}"'.format(method))
|
||||
|
||||
response_json = response.json()
|
||||
|
||||
if 'deploymentId' not in response_json:
|
||||
raise Exception('Key "deploymentId" is missing from response: {}'.format(response_json))
|
||||
deployment_id = response_json['deploymentId']
|
||||
self._wait_for_deployment_complete(deployment_id, start_timestamp)
|
||||
self._wait_for_deployment_complete(response, start_timestamp)
|
||||
return response
|
||||
|
||||
def _get_all_group_ids(self, data):
|
||||
|
@ -183,8 +175,7 @@ class Marathon(object):
|
|||
"""
|
||||
start_timestamp = time.time()
|
||||
response = self.put_request('groups/{}'.format(group_id), json={'scaleBy': scale_factor})
|
||||
deployment_id = response.json().get('deploymentId')
|
||||
self._wait_for_deployment_complete(deployment_id, start_timestamp)
|
||||
self._wait_for_deployment_complete(response, start_timestamp)
|
||||
return response.json()
|
||||
|
||||
def is_group_id_unique(self, group_id):
|
||||
|
@ -199,32 +190,83 @@ class Marathon(object):
|
|||
|
||||
return False
|
||||
|
||||
def _wait_for_deployment_complete(self, deployment_id, start_timestamp):
|
||||
def _wait_for_deployment_complete(self, deployment_response, start_timestamp):
|
||||
"""
|
||||
Waits for the deployment to complete.
|
||||
"""
|
||||
sleep_time = 5
|
||||
other_deployment_in_progress = False
|
||||
timeout_exceeded = True
|
||||
task_failed = False
|
||||
deployment_json = deployment_response.json()
|
||||
|
||||
if 'deploymentId' in deployment_json:
|
||||
deployment_id = deployment_json['deploymentId']
|
||||
elif 'deployments' in deployment_json:
|
||||
deployment_id = deployment_json['deployments'][0]['id']
|
||||
else:
|
||||
raise Exception(
|
||||
'Could not find "deploymentId" in {}'.format(deployment_json))
|
||||
|
||||
service_states = {}
|
||||
get_deployments_response = self.get_deployments().json()
|
||||
|
||||
while not self._wait_time_exceeded(self.deployment_max_wait_time, start_timestamp) \
|
||||
and not other_deployment_in_progress:
|
||||
response = self.get_deployments().json()
|
||||
if response:
|
||||
for a_deployment in response:
|
||||
if deployment_id in a_deployment['id']:
|
||||
logging.info('Waiting for deployment "%s" to complete ...', deployment_id)
|
||||
time.sleep(5)
|
||||
else:
|
||||
logging.info('Another service is being deployed. Continuing ...')
|
||||
other_deployment_in_progress = True
|
||||
timeout_exceeded = False
|
||||
break
|
||||
else:
|
||||
and not other_deployment_in_progress and not task_failed:
|
||||
if not get_deployments_response:
|
||||
timeout_exceeded = False
|
||||
break
|
||||
|
||||
if timeout_exceeded:
|
||||
raise Exception('Timeout exceeded waiting for deployment "{}" to complete'.format(
|
||||
deployment_id))
|
||||
a_deployment = [dep for dep in get_deployments_response if dep['id'] == deployment_id]
|
||||
|
||||
if len(a_deployment) == 0:
|
||||
logging.info('Another service is being deployed. Continuing ...')
|
||||
other_deployment_in_progress = True
|
||||
break
|
||||
|
||||
a_deployment = a_deployment[0]
|
||||
|
||||
for affected_app in a_deployment['affectedApps']:
|
||||
service_id = affected_app.strip('/').replace('/', '_')
|
||||
|
||||
if not service_id in service_states:
|
||||
service_states[service_id] = None
|
||||
|
||||
current_task = self.mesos.get_latest_task(service_id)
|
||||
|
||||
if not current_task:
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
|
||||
current_state = current_task.state
|
||||
if service_states[service_id] != current_state:
|
||||
logging.info('Service "%s" is in state: "%s"',
|
||||
service_id, current_state)
|
||||
|
||||
if current_task.is_failed() or\
|
||||
current_task.is_killed():
|
||||
logging.error('Service "%s" failed with status "%s".',
|
||||
service_id, current_state)
|
||||
|
||||
# Write out app logs
|
||||
stdout = self.mesos.get_task_log_file(
|
||||
current_task, 'stdout')
|
||||
stderr = self.mesos.get_task_log_file(
|
||||
current_task, 'stderr')
|
||||
|
||||
logging.info('stdout:\n%s', stdout)
|
||||
logging.info('stderr:\n%s', stderr)
|
||||
task_failed = True
|
||||
break
|
||||
|
||||
service_states[service_id] = current_task.get_state()
|
||||
|
||||
time.sleep(sleep_time)
|
||||
get_deployments_response = self.get_deployments().json()
|
||||
|
||||
if task_failed or timeout_exceeded:
|
||||
raise Exception('Deployment failed to complete')
|
||||
|
||||
return
|
||||
|
||||
def _wait_time_exceeded(self, max_wait, timestamp):
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
from mesos_task import MesosTask
|
||||
|
||||
class Mesos(object):
|
||||
def __init__(self, acs_client):
|
||||
self.acs_client = acs_client
|
||||
|
||||
def _get_request(self, endpoint, path):
|
||||
"""
|
||||
Makes a GET request to ACS
|
||||
"""
|
||||
return self.acs_client.get_request('{}/{}'.format(endpoint, path))
|
||||
|
||||
def get_task_log_file(self, task, filename):
|
||||
"""
|
||||
Gets the contents of a log file from the tasks sandbox
|
||||
"""
|
||||
url_path = task.get_sandbox_path(filename)
|
||||
try:
|
||||
log_file_response = self._get_request('slave', url_path)
|
||||
log_json = log_file_response.json()
|
||||
except:
|
||||
return '<empty>'
|
||||
|
||||
return '<empty>' if not log_json['data'].strip() else log_json['data'].strip()
|
||||
|
||||
def _get_slave_ids(self):
|
||||
"""
|
||||
Gets all slave IDs in the cluster
|
||||
"""
|
||||
# GET /mesos/slaves/state.json
|
||||
response = self._get_request('mesos/slaves', 'state.json')
|
||||
response.raise_for_status()
|
||||
|
||||
all_slaves = response.json()
|
||||
return [slave['id'] for slave in all_slaves['slaves']]
|
||||
|
||||
def _get_slave_state(self, slave_id):
|
||||
"""
|
||||
Gets the state.json for specified slave
|
||||
"""
|
||||
slave_state_response = self._get_request(
|
||||
'slave', '{}/state.json'.format(slave_id))
|
||||
slave_state_response.raise_for_status()
|
||||
|
||||
slave_state_json = slave_state_response.json()
|
||||
return slave_state_json
|
||||
|
||||
def get_latest_task(self, service_id):
|
||||
"""
|
||||
Go through all frameworks and executors and get all tasks that
|
||||
start with the service_id. Returns the latest task with information
|
||||
needed to get the files from the sandbox
|
||||
"""
|
||||
framework_name = 'marathon'
|
||||
slave_ids = self._get_slave_ids()
|
||||
found_tasks = []
|
||||
|
||||
for slave_id in slave_ids:
|
||||
slave_state_json = self._get_slave_state(slave_id)
|
||||
|
||||
# Get all 'marathon' frameworks
|
||||
marathon_frameworks = []
|
||||
marathon_frameworks.extend(
|
||||
[f for f in slave_state_json['frameworks'] if f['name'] == framework_name])
|
||||
marathon_frameworks.extend(
|
||||
[f for f in slave_state_json['completed_frameworks'] if f['name'] == framework_name])
|
||||
|
||||
# Get all executors and completed executors where 'id' of the task
|
||||
# starts with the service_id
|
||||
executors = []
|
||||
for framework in marathon_frameworks:
|
||||
executors.extend(
|
||||
[e for e in framework['executors'] if e['id'].startswith(service_id)])
|
||||
executors.extend(
|
||||
[e for e in framework['completed_executors'] if e['id'].startswith(service_id)])
|
||||
|
||||
for executor in executors:
|
||||
for task in executor['tasks']:
|
||||
found_tasks.append(MesosTask(task, executor['directory']))
|
||||
|
||||
for task in executor['completed_tasks']:
|
||||
found_tasks.append(MesosTask(task, executor['directory']))
|
||||
|
||||
# Sort the tasks, so the newest are on top
|
||||
found_tasks.sort(key=lambda task: task.timestamp, reverse=True)
|
||||
if len(found_tasks) == 0:
|
||||
return None
|
||||
|
||||
return found_tasks[0]
|
|
@ -0,0 +1,53 @@
|
|||
|
||||
class MesosTask(object):
|
||||
"""
|
||||
Class represents a Mesos task
|
||||
"""
|
||||
def __init__(self, task, directory):
|
||||
if not 'id' in task:
|
||||
raise ValueError('Task is missing "id" ')
|
||||
if not 'slave_id' in task:
|
||||
raise ValueError('Task is missing "slave_id"')
|
||||
if not 'framework_id' in task:
|
||||
raise ValueError('Task is missing "framework_id"')
|
||||
if not 'state' in task:
|
||||
raise ValueError('Task is missing "state"')
|
||||
if not 'statuses' in task:
|
||||
raise ValueError('Task is missing "statuses"')
|
||||
|
||||
self.task_id = task['id']
|
||||
self.slave_id = task['slave_id']
|
||||
self.framework_id = task['framework_id']
|
||||
self.directory = directory
|
||||
self.state = task['state']
|
||||
|
||||
statuses = [ts for ts in task['statuses']]
|
||||
if len(statuses) == 0:
|
||||
timestamp = -1
|
||||
else:
|
||||
statuses.sort(key=lambda s: s['timestamp'], reverse=True)
|
||||
timestamp = statuses[0]['timestamp']
|
||||
self.timestamp = timestamp
|
||||
|
||||
def get_sandbox_path(self, filename):
|
||||
"""
|
||||
Gets the path to the sandbox
|
||||
"""
|
||||
if not filename:
|
||||
raise ValueError('Filename is not set')
|
||||
|
||||
url_template = '{}/files/read.json?path={}/{}&length=999999&offset=0'
|
||||
return url_template.format(
|
||||
self.slave_id, self.directory, filename)
|
||||
|
||||
def is_failed(self):
|
||||
"""
|
||||
Returns True if task failed, False otherwise
|
||||
"""
|
||||
return self.state == 'TASK_FAILED'
|
||||
|
||||
def is_killed(self):
|
||||
"""
|
||||
Returns True if task is killed or being killed, false otherwise
|
||||
"""
|
||||
return self.state == 'TASK_KILLED' or self.state == 'TASK_KILLING'
|
|
@ -0,0 +1,156 @@
|
|||
import unittest
|
||||
from mesos_task import MesosTask
|
||||
|
||||
class MesosTaskTest(unittest.TestCase):
|
||||
def test_not_none(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'mystate',
|
||||
'statuses': []
|
||||
}
|
||||
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertIsNotNone(task)
|
||||
|
||||
def test_missing_id(self):
|
||||
base_task = {
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'mystate',
|
||||
'statuses': []
|
||||
}
|
||||
self.assertRaises(ValueError, MesosTask, base_task, 'directory')
|
||||
|
||||
def test_missing_slave_id(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'mystate',
|
||||
'statuses': []
|
||||
}
|
||||
self.assertRaises(ValueError, MesosTask, base_task, 'directory')
|
||||
|
||||
def test_missing_framework_id(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'state': 'mystate',
|
||||
'statuses': []
|
||||
}
|
||||
self.assertRaises(ValueError, MesosTask, base_task, 'directory')
|
||||
|
||||
def test_missing_state(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'statuses': []
|
||||
}
|
||||
self.assertRaises(ValueError, MesosTask, base_task, 'directory')
|
||||
|
||||
def test_missing_statuses(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'mystate'
|
||||
}
|
||||
self.assertRaises(ValueError, MesosTask, base_task, 'directory')
|
||||
|
||||
def test_values_set(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'mystate',
|
||||
'statuses': []
|
||||
}
|
||||
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertEqual(task.task_id, 'mytask_id')
|
||||
self.assertEqual(task.slave_id, 'myslave_id')
|
||||
self.assertEqual(task.framework_id, 'myframework_id')
|
||||
self.assertEqual(task.state, 'mystate')
|
||||
self.assertEqual(task.timestamp, -1)
|
||||
|
||||
def test_sandbox_path(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'mystate',
|
||||
'statuses': []
|
||||
}
|
||||
task = MesosTask(base_task, 'directory')
|
||||
expected = 'myslave_id/files/read.json?path=directory/myfile&length=999999&offset=0'
|
||||
actual = task.get_sandbox_path('myfile')
|
||||
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_sandbox_path_empty_filename(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'mystate',
|
||||
'statuses': []
|
||||
}
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertRaises(ValueError, task.get_sandbox_path, None)
|
||||
|
||||
def test_is_failed(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'TASK_FAILED',
|
||||
'statuses': []
|
||||
}
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertTrue(task.is_failed())
|
||||
|
||||
def test_is_failed_false(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'TASK_SOMETHING',
|
||||
'statuses': []
|
||||
}
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertFalse(task.is_failed())
|
||||
|
||||
def test_is_killed(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'TASK_KILLED',
|
||||
'statuses': []
|
||||
}
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertTrue(task.is_killed())
|
||||
|
||||
def test_is_killing(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'TASK_KILLING',
|
||||
'statuses': []
|
||||
}
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertTrue(task.is_killed())
|
||||
|
||||
def test_is_killed_false(self):
|
||||
base_task = {
|
||||
'id': 'mytask_id',
|
||||
'slave_id': 'myslave_id',
|
||||
'framework_id': 'myframework_id',
|
||||
'state': 'TASK_FALSE',
|
||||
'statuses': []
|
||||
}
|
||||
task = MesosTask(base_task, 'directory')
|
||||
self.assertFalse(task.is_killed())
|
Загрузка…
Ссылка в новой задаче