Merge pull request #27 from Microsoft/peterj/handle_killing_finished_events
Using deploymentMonitor only for logging events and not making any de…
This commit is contained in:
Коммит
c8889e54c6
|
@ -205,7 +205,6 @@ class Marathon(object):
|
|||
DeploymentMonitor that streams events from Marathon endpoint and monitors when
|
||||
apps fail or succeed to deploy. Monitor also logs any app status changes.
|
||||
"""
|
||||
|
||||
# Get the deploymentId, so we can uniquely identify deployment
|
||||
# we want to monitor
|
||||
deployment_json = deployment_response.json()
|
||||
|
@ -228,22 +227,24 @@ class Marathon(object):
|
|||
return
|
||||
|
||||
deployment_completed = False
|
||||
timeout_exceeded = False
|
||||
processor_catchup = False # Did we already give processor an extra second to finish up or not?
|
||||
processor = DeploymentMonitor(self, app_ids, deployment_id)
|
||||
processor.start()
|
||||
|
||||
while processor.is_running() and not deployment_completed:
|
||||
while not deployment_completed:
|
||||
if self._wait_time_exceeded(self.deployment_max_wait_time, start_timestamp):
|
||||
raise Exception('Timeout exceeded waiting for deployment to complete')
|
||||
timeout_exceeded = True
|
||||
break
|
||||
get_deployments_response = self.get_deployments().json()
|
||||
a_deployment = [dep for dep in get_deployments_response if dep['id'] == deployment_id]
|
||||
if len(a_deployment) == 0:
|
||||
# If deployment completed, but we don't know if it was a failure or
|
||||
# success, we give the processor another second to catch up on events.
|
||||
if not processor.deployment_failed() and \
|
||||
not processor.deployment_succeeded() and not processor_catchup:
|
||||
logging.debug('Giving deployment monitor another second to catch-up on events')
|
||||
time.sleep(1)
|
||||
if not processor_catchup:
|
||||
logging.info('Giving deployment monitor more time to catch-up on events')
|
||||
for _ in range(0, 5):
|
||||
if not processor.deployment_succeeded():
|
||||
time.sleep(1)
|
||||
# TODO:Check that the group was deployed correctly (instance count, healthcheck)
|
||||
processor_catchup = True
|
||||
continue
|
||||
else:
|
||||
|
@ -251,29 +252,12 @@ class Marathon(object):
|
|||
break
|
||||
time.sleep(1)
|
||||
|
||||
if processor.deployment_failed():
|
||||
failed_event = processor.get_failed_event()
|
||||
failed_task = self.mesos.get_task(
|
||||
failed_event.task_id(), failed_event.slave_id())
|
||||
stderr_contents = self.mesos.get_task_log_file(failed_task, 'stderr')
|
||||
logging.error(stderr_contents)
|
||||
raise Exception('Deployment failed to complete')
|
||||
|
||||
if processor.deployment_succeeded():
|
||||
logging.info('Deployment succeeded')
|
||||
return
|
||||
processor.stopped = True
|
||||
if timeout_exceeded:
|
||||
raise Exception('Timeout exceeded waiting for deployment to complete')
|
||||
|
||||
if deployment_completed:
|
||||
# If deployment completed but we didn't catch the succeeded/failed event
|
||||
# we need to check for the failed_event or failure message
|
||||
if not processor.deployment_failed() and not processor.deployment_succeeded():
|
||||
failure_message = processor.get_failure_message()
|
||||
if failure_message:
|
||||
logging.error(failure_message)
|
||||
raise Exception('Deployment failed to complete')
|
||||
else:
|
||||
logging.info('Deployment ended')
|
||||
return
|
||||
logging.info('Deployment ended')
|
||||
|
||||
def _wait_time_exceeded(self, max_wait, timestamp):
|
||||
"""
|
||||
|
|
|
@ -51,6 +51,18 @@ class MarathonEvent(object):
|
|||
"""
|
||||
return self._get_event_type() == 'status_update_event'
|
||||
|
||||
def is_group_change_success(self):
|
||||
"""
|
||||
True if event represents a group change success
|
||||
"""
|
||||
return self._get_event_type() == 'group_change_success'
|
||||
|
||||
def is_app_terminated(self):
|
||||
"""
|
||||
True if event represents an app terminated event
|
||||
"""
|
||||
return self._get_event_type() == 'app_terminated_event'
|
||||
|
||||
def is_task_failed(self):
|
||||
"""
|
||||
True if task is failed, false otherwise
|
||||
|
@ -126,6 +138,9 @@ class MarathonEvent(object):
|
|||
else:
|
||||
event_status = 'Service "{}" task is finished: {}'.format(
|
||||
self.app_id(), self.data['message'])
|
||||
elif self.is_app_terminated():
|
||||
event_status = 'Service "{}" was terminated.'.format(self.app_id())
|
||||
|
||||
return event_status
|
||||
|
||||
class DeploymentMonitor(object):
|
||||
|
@ -135,13 +150,10 @@ class DeploymentMonitor(object):
|
|||
"""
|
||||
def __init__(self, marathon, app_ids, deployment_id):
|
||||
self._marathon = marathon
|
||||
self._deployment_failed = False
|
||||
self._deployment_succeeded = False
|
||||
self._app_ids = app_ids
|
||||
self._deployment_id = deployment_id
|
||||
self.stopped = False
|
||||
self._failed_event = None
|
||||
self._failure_message = None
|
||||
self._thread = threading.Thread(
|
||||
target=DeploymentMonitor._process_events, args=(self,))
|
||||
|
||||
|
@ -152,30 +164,6 @@ class DeploymentMonitor(object):
|
|||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def is_running(self):
|
||||
"""
|
||||
True if monitor is running, false otherwise.
|
||||
"""
|
||||
return not self.stopped
|
||||
|
||||
def get_failed_event(self):
|
||||
"""
|
||||
Gets the last failed event
|
||||
"""
|
||||
return self._failed_event
|
||||
|
||||
def get_failure_message(self):
|
||||
"""
|
||||
Gets the failure message
|
||||
"""
|
||||
return self._failure_message
|
||||
|
||||
def deployment_failed(self):
|
||||
"""
|
||||
True if deployment failed, false otherwise
|
||||
"""
|
||||
return self._deployment_failed
|
||||
|
||||
def deployment_succeeded(self):
|
||||
"""
|
||||
True if deployment succeeded, false otherwise
|
||||
|
@ -189,39 +177,32 @@ class DeploymentMonitor(object):
|
|||
events = self._get_event_stream()
|
||||
for event in events:
|
||||
try:
|
||||
if self.stopped:
|
||||
break
|
||||
if self._handle_event(event):
|
||||
self.stopped = True
|
||||
break
|
||||
self._log_event(event)
|
||||
except:
|
||||
# Ignore any exceptions
|
||||
pass
|
||||
|
||||
def _handle_event(self, event):
|
||||
def _log_event(self, event):
|
||||
"""
|
||||
Handles single event from Marathon by logging it
|
||||
and/or signaling to stop the deployment monitor
|
||||
Logs events from Marathon
|
||||
"""
|
||||
deployment_finished = False
|
||||
if event.is_status_update():
|
||||
if event.is_status_update() or event.is_app_terminated():
|
||||
if event.app_id() in self._app_ids:
|
||||
# Log the event information
|
||||
logging.info(event.status())
|
||||
if event.is_task_failed() or event.is_task_killed():
|
||||
self._deployment_failed = True
|
||||
self._failed_event = event
|
||||
deployment_finished = True
|
||||
self._log_stderr(event)
|
||||
elif event.is_deployment_succeeded():
|
||||
if self._deployment_id == event.data['id']:
|
||||
self._deployment_succeeded = True
|
||||
deployment_finished = True
|
||||
elif event.is_deployment_failed():
|
||||
if self._deployment_id == event.data['id']:
|
||||
self._deployment_failed = True
|
||||
self._failure_message = 'Deployment failed.'
|
||||
deployment_finished = True
|
||||
return deployment_finished
|
||||
|
||||
def _log_stderr(self, event):
|
||||
"""
|
||||
Logs the stderr of the failed event
|
||||
"""
|
||||
failed_task = self._marathon.mesos.get_task(
|
||||
event.task_id(), event.slave_id())
|
||||
stderr = self._marathon.mesos.get_task_log_file(failed_task, 'stderr')
|
||||
logging.error(stderr)
|
||||
|
||||
def _get_event_stream(self):
|
||||
"""
|
||||
|
@ -231,5 +212,7 @@ class DeploymentMonitor(object):
|
|||
events_url = self._marathon.get_url('service/marathon/v2/events')
|
||||
messages = sseclient.SSEClient(events_url)
|
||||
for msg in messages:
|
||||
if self.stopped:
|
||||
break
|
||||
event = MarathonEvent(json.loads(msg.data))
|
||||
yield event
|
||||
|
|
Загрузка…
Ссылка в новой задаче