diff --git a/CHANGELOG.md b/CHANGELOG.md index 776857b..dde49cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### Added - `random` and `file` task factories. See task factory guide for more information. +- Summary statistics: `pool stats` and `jobs stats`. See the usage doc for +more information. - Delete unusable nodes from pool with `--all-unusable` option for `pool delnode` - CNTK-GPU-Infiniband-IntelMPI recipe @@ -34,6 +36,8 @@ guide for more information. - Job priority support - Job migration support - Compute node fill type support +- New commands: `jobs enable` and `jobs disable`. Please see the usage doc +for more information. - From Scratch: Step-by-Step guide - Azure Cloud Shell information diff --git a/convoy/batch.py b/convoy/batch.py index dd8dc16..b28803e 100644 --- a/convoy/batch.py +++ b/convoy/batch.py @@ -45,6 +45,7 @@ import tempfile import time # non-stdlib imports import azure.batch.models as batchmodels +import dateutil.tz # local imports from . import autoscale from . import crypto @@ -820,6 +821,178 @@ def del_pool(batch_client, config, pool_id=None): return True +def pool_stats(batch_client, config, pool_id=None): + # type: (azure.batch.batch_service_client.BatchServiceClient, dict, + # str) -> None + """Get pool stats + :param batch_client: The batch client to use. + :type batch_client: `azure.batch.batch_service_client.BatchServiceClient` + :param dict config: configuration dict + :param str pool_id: pool id + """ + if util.is_none_or_empty(pool_id): + pool_id = settings.pool_id(config) + try: + pool = batch_client.pool.get( + pool_id=pool_id, + pool_get_options=batchmodels.PoolGetOptions(expand='stats'), + ) + except batchmodels.batch_error.BatchErrorException as ex: + if 'The specified pool does not exist' in ex.message.value: + logger.error('pool {} does not exist'.format(pool_id)) + return + if pool.stats is not None and pool.stats.usage_stats is not None: + usage_stats = '{} * Total core hours: {} (last updated: {})'.format( + os.linesep, + pool.stats.usage_stats.dedicated_core_time, + pool.stats.usage_stats.last_update_time, + ) + else: + usage_stats = '' + nodes = list(batch_client.compute_node.list(pool_id)) + nsc = [] + runnable_nodes = 0 + for key, value in _node_state_counts(nodes)._asdict().items(): + if key == 'running' or key == 'idle': + runnable_nodes += value + nsc.append(' * {}: {}'.format(key, value)) + node_up_times = [] + node_alloc_times = [] + node_start_times = [] + tasks_run = [] + tasks_running = [] + now = datetime.datetime.now(dateutil.tz.tzutc()) + for node in nodes: + if node.last_boot_time is not None: + node_up_times.append((now - node.last_boot_time).total_seconds()) + if (node.start_task_info is not None and + node.start_task_info.end_time is not None): + node_alloc_times.append( + (node.start_task_info.end_time - + node.allocation_time).total_seconds() + ) + node_start_times.append( + (node.start_task_info.end_time - + node.last_boot_time).total_seconds() + ) + tasks_run.append(node.total_tasks_run) + tasks_running.append(node.running_tasks_count) + total_running_tasks = sum(tasks_running) + runnable_task_slots = runnable_nodes * pool.max_tasks_per_node + total_task_slots = ( + pool.current_dedicated_nodes + pool.current_low_priority_nodes + ) * pool.max_tasks_per_node + busy_task_slots_fraction = ( + 0 if total_task_slots == 0 else + total_running_tasks / runnable_task_slots + ) + version = 'N/A' + for md in pool.metadata: + if md.name == settings.get_metadata_version_name(): + version = md.value + break + log = [ + '* Batch Shipyard version: {}'.format(version), + '* Total nodes: {}'.format( + pool.current_dedicated_nodes + pool.current_low_priority_nodes + ), + ' * Dedicated nodes: {0} ({1:.1f}% of target){2}'.format( + pool.current_dedicated_nodes, + 100 * ( + 1 if pool.target_dedicated_nodes == 0 else + pool.current_dedicated_nodes / pool.target_dedicated_nodes), + usage_stats, + ), + ' * Low Priority nodes: {0} ({1:.1f}% of target)'.format( + pool.current_low_priority_nodes, + 100 * ( + 1 if pool.target_low_priority_nodes == 0 else + pool.current_low_priority_nodes / + pool.target_low_priority_nodes) + ), + '* Node states:', + os.linesep.join(nsc), + ] + if len(node_up_times) > 0: + log.extend([ + '* Node uptime:', + ' * Mean: {}'.format( + datetime.timedelta( + seconds=(sum(node_up_times) / len(node_up_times))) + ), + ' * Min: {}'.format( + datetime.timedelta(seconds=min(node_up_times)) + ), + ' * Max: {}'.format( + datetime.timedelta(seconds=max(node_up_times)) + ), + ]) + if len(node_alloc_times) > 0: + log.extend([ + '* Time taken for node creation to ready:', + ' * Mean: {}'.format( + datetime.timedelta( + seconds=(sum(node_alloc_times) / len(node_alloc_times))) + ), + ' * Min: {}'.format( + datetime.timedelta(seconds=min(node_alloc_times)) + ), + ' * Max: {}'.format( + datetime.timedelta(seconds=max(node_alloc_times)) + ), + ]) + if len(node_start_times) > 0: + log.extend([ + '* Time taken for last boot startup (includes prep):', + ' * Mean: {}'.format( + datetime.timedelta( + seconds=(sum(node_start_times) / len(node_start_times))) + ), + ' * Min: {}'.format( + datetime.timedelta(seconds=min(node_start_times)) + ), + ' * Max: {}'.format( + datetime.timedelta(seconds=max(node_start_times)) + ), + ]) + if len(tasks_running) > 0: + log.extend([ + '* Running tasks:', + ' * Sum: {}'.format(total_running_tasks), + ' * Mean: {}'.format(total_running_tasks / len(tasks_running)), + ' * Min: {}'.format(min(tasks_running)), + ' * Max: {}'.format(max(tasks_running)), + ]) + if len(tasks_run) > 0: + log.extend([ + '* Total tasks run:', + ' * Sum: {}'.format(sum(tasks_run)), + ' * Mean: {}'.format(sum(tasks_run) / len(tasks_run)), + ' * Min: {}'.format(min(tasks_run)), + ' * Max: {}'.format(max(tasks_run)), + ]) + log.extend([ + '* Task scheduling slots:', + ' * Busy: {0} ({1:.2f}% of runnable)'.format( + total_running_tasks, 100 * busy_task_slots_fraction + ), + ' * Available: {0} ({1:.2f}% of runnable)'.format( + runnable_task_slots - total_running_tasks, + 100 * (1 - busy_task_slots_fraction) + ), + ' * Runnable: {0} ({1:.2f}% of total)'.format( + runnable_task_slots, + 100 * ( + runnable_task_slots / total_task_slots + if total_task_slots > 0 else 0 + ), + ), + ' * Total: {}'.format(total_task_slots), + ]) + logger.info('statistics summary for pool {}{}{}'.format( + pool_id, os.linesep, os.linesep.join(log))) + + def pool_autoscale_disable(batch_client, config): # type: (batch.BatchServiceClient, dict) -> None """Enable autoscale formula @@ -1069,6 +1242,131 @@ def update_job_with_pool(batch_client, config, jobid=None, poolid=None): job_id, poolid)) +def job_stats(batch_client, config, jobid=None): + # type: (azure.batch.batch_service_client.BatchServiceClient, dict, + # str) -> None + """Job stats + :param batch_client: The batch client to use. + :type batch_client: `azure.batch.batch_service_client.BatchServiceClient` + :param dict config: configuration dict + :param str jobid: job id to query + """ + if jobid is not None: + try: + job = batch_client.job.get( + job_id=jobid, + job_get_options=batchmodels.JobGetOptions(expand='stats'), + ) + except batchmodels.batch_error.BatchErrorException as ex: + if 'The specified job does not exist' in ex.message.value: + raise RuntimeError('job {} does not exist'.format(jobid)) + jobs = [job] + else: + jobs = list(batch_client.job.list( + job_list_options=batchmodels.JobListOptions(expand='stats'))) + job_count = 0 + job_times = [] + task_times = [] + task_wall_times = [] + task_counts = batchmodels.TaskCounts(0, 0, 0, 0, 0, 'validated') + total_tasks = 0 + for job in jobs: + job_count += 1 + # get task counts + tc = batch_client.job.get_task_counts(job_id=job.id) + task_counts.active += tc.active + task_counts.running += tc.running + task_counts.completed += tc.completed + task_counts.succeeded += tc.succeeded + task_counts.failed += tc.failed + total_tasks += tc.active + tc.running + tc.completed + if (tc.validation_status != + batchmodels.TaskCountValidationStatus.validated): + task_counts.validation_status = tc.validation_status + if job.execution_info.end_time is not None: + job_times.append( + (job.execution_info.end_time - + job.execution_info.start_time).total_seconds()) + # get task-level execution info + tasks = batch_client.task.list( + job_id=job.id, + task_list_options=batchmodels.TaskListOptions( + filter='(state eq \'running\') or (state eq \'completed\')', + select='id,state,stats,executionInfo', + )) + for task in tasks: + if task.stats is not None: + task_wall_times.append( + task.stats.wall_clock_time.total_seconds()) + if (task.execution_info is not None and + task.execution_info.end_time is not None): + task_times.append( + (task.execution_info.end_time - + task.execution_info.start_time).total_seconds()) + log = [ + '* Total jobs: {}'.format(job_count), + '* Total tasks: {} ({})'.format( + total_tasks, task_counts.validation_status + ), + ' * Active: {}'.format(task_counts.active), + ' * Running: {}'.format(task_counts.running), + ' * Completed: {}'.format(task_counts.completed), + ' * Succeeded: {0} ({1:.2f}% of completed)'.format( + task_counts.succeeded, + 100 * task_counts.succeeded / task_counts.completed + if task_counts.completed > 0 else 0 + ), + ' * Failed: {0} ({1:.2f}% of completed)'.format( + task_counts.failed, + 100 * task_counts.failed / task_counts.completed + if task_counts.completed > 0 else 0 + ), + ] + if len(job_times) > 0: + log.extend([ + '* Job creation to completion time:', + ' * Mean: {}'.format( + datetime.timedelta(seconds=(sum(job_times) / len(job_times))) + ), + ' * Min: {}'.format( + datetime.timedelta(seconds=min(job_times)) + ), + ' * Max: {}'.format( + datetime.timedelta(seconds=max(job_times)) + ), + ]) + if len(task_times) > 0: + log.extend([ + '* Task end-to-end time:', + ' * Mean: {}'.format( + datetime.timedelta(seconds=(sum(task_times) / len(task_times))) + ), + ' * Min: {}'.format( + datetime.timedelta(seconds=min(task_times)) + ), + ' * Max: {}'.format( + datetime.timedelta(seconds=max(task_times)) + ), + ]) + if len(task_wall_times) > 0: + log.extend([ + '* Task command walltime (running and completed):', + ' * Mean: {}'.format( + datetime.timedelta( + seconds=(sum(task_wall_times) / len(task_wall_times))) + ), + ' * Min: {}'.format( + datetime.timedelta(seconds=min(task_wall_times)) + ), + ' * Max: {}'.format( + datetime.timedelta(seconds=max(task_wall_times)) + ), + ]) + logger.info('statistics summary for {}{}{}'.format( + 'job {}'.format(jobid) if jobid is not None else 'all jobs', + os.linesep, os.linesep.join(log))) + + def disable_jobs( batch_client, config, disable_tasks_action, jobid=None, disabling_state_ok=False, term_tasks=False): @@ -1661,15 +1959,17 @@ def terminate_tasks( raise -def list_nodes(batch_client, config, nodes=None): - # type: (batch.BatchServiceClient, dict, list) -> None +def list_nodes(batch_client, config, pool_id=None, nodes=None): + # type: (batch.BatchServiceClient, dict, str, list) -> None """Get a list of nodes :param batch_client: The batch client to use. :type batch_client: `azure.batch.batch_service_client.BatchServiceClient` :param dict config: configuration dict - :param lsit nodes: list of nodes + :param str pool_id: pool id + :param list nodes: list of nodes """ - pool_id = settings.pool_id(config) + if util.is_none_or_empty(pool_id): + pool_id = settings.pool_id(config) logger.debug('listing nodes for pool {}'.format(pool_id)) if nodes is None: nodes = batch_client.compute_node.list(pool_id) @@ -2325,7 +2625,6 @@ def check_jobs_for_auto_pool(config): else: autopool.append(True) if autopool.count(False) == len(autopool): - logger.debug('autopool not detected for jobs') return False elif autopool.count(True) == len(autopool): logger.debug('autopool detected for jobs') diff --git a/convoy/fleet.py b/convoy/fleet.py index cddaaa2..8596317 100644 --- a/convoy/fleet.py +++ b/convoy/fleet.py @@ -2509,6 +2509,17 @@ def action_pool_listimages(batch_client, config): _list_docker_images(batch_client, config) +def action_pool_stats(batch_client, config, pool_id): + # type: (batchsc.BatchServiceClient, dict, str) -> None + """Action: Pool Stats + :param azure.batch.batch_service_client.BatchServiceClient batch_client: + batch client + :param dict config: configuration dict + :param str pool_id: pool id + """ + batch.pool_stats(batch_client, config, pool_id=pool_id) + + def action_pool_autoscale_disable(batch_client, config): # type: (batchsc.BatchServiceClient, dict, str, str, bool) -> None """Action: Pool Autoscale Disable @@ -2867,6 +2878,17 @@ def action_jobs_enable(batch_client, config, jobid): batch.enable_jobs(batch_client, config, jobid=jobid) +def action_jobs_stats(batch_client, config, job_id): + # type: (batchsc.BatchServiceClient, dict, str) -> None + """Action: Jobs Stats + :param azure.batch.batch_service_client.BatchServiceClient batch_client: + batch client + :param dict config: configuration dict + :param str job_id: job id + """ + batch.job_stats(batch_client, config, jobid=job_id) + + def action_storage_del( blob_client, queue_client, table_client, config, clear_tables, poolid): # type: (azureblob.BlockBlobService, azurequeue.QueueService, diff --git a/docs/20-batch-shipyard-usage.md b/docs/20-batch-shipyard-usage.md index 7a082a6..965e7ff 100644 --- a/docs/20-batch-shipyard-usage.md +++ b/docs/20-batch-shipyard-usage.md @@ -388,6 +388,8 @@ file * `--requeue` requeue running tasks * `--terminate` terminate running tasks * `--wait` wait for running tasks to complete +* `stats` will generate a statistics summary of a job or jobs + * `--jobid` will query the specified job instead of all jobs * `term` will terminate jobs found in the jobs configuration file. If an autopool is specified for all jobs and a jobid option is not specified, the storage associated with the autopool will be cleaned up. @@ -456,12 +458,13 @@ The `pool` command has the following sub-commands: dsu Delete an SSH user from all nodes in pool grls Get remote login settings for all nodes in... list List all pools in the Batch account - listimages List Docker images in the pool + listimages List Docker images in a pool listnodes List nodes in pool listskus List available VM configurations available to... rebootnode Reboot a node or nodes in a pool resize Resize a pool - ssh Interactively login via SSH to a node in the... + ssh Interactively login via SSH to a node in a... + stats Get statistics about a pool udi Update Docker images in a pool ``` * `add` will add the pool defined in the pool configuration file to the @@ -514,6 +517,9 @@ configuration file the pool to connect to as listed by `grls` * `--nodeid` is the node id to connect to in the pool * `--tty` allocates a pseudo-terminal +* `stats` will generate a statistics summary of the pool + * `--poolid` will query the specified pool instead of the pool from the + pool configuration file * `udi` will update Docker images on all compute nodes of the pool. This command requires a valid SSH user. * `--image` will restrict the update to just the image or image:tag diff --git a/shipyard.py b/shipyard.py index 8f90f87..4a2eafe 100755 --- a/shipyard.py +++ b/shipyard.py @@ -1178,7 +1178,7 @@ def pool_dsu(ctx): @aad_options @pass_cli_context def pool_ssh(ctx, cardinal, nodeid, tty, command): - """Interactively login via SSH to a node in the pool""" + """Interactively login via SSH to a node in a pool""" ctx.initialize_for_batch() convoy.fleet.action_pool_ssh( ctx.batch_client, ctx.config, cardinal, nodeid, tty, command) @@ -1253,11 +1253,25 @@ def pool_udi(ctx, image, digest, ssh): @aad_options @pass_cli_context def pool_listimages(ctx): - """List Docker images in the pool""" + """List Docker images in a pool""" ctx.initialize_for_batch() convoy.fleet.action_pool_listimages(ctx.batch_client, ctx.config) +@pool.command('stats') +@click.option('--poolid', help='Get stats on specified pool') +@common_options +@batch_options +@keyvault_options +@aad_options +@pass_cli_context +def pool_stats(ctx, poolid): + """Get statistics about a pool""" + ctx.initialize_for_batch() + convoy.fleet.action_pool_stats( + ctx.batch_client, ctx.config, pool_id=poolid) + + @pool.group() @pass_cli_context def autoscale(ctx): @@ -1530,6 +1544,19 @@ def jobs_enable(ctx, jobid): convoy.fleet.action_jobs_enable(ctx.batch_client, ctx.config, jobid) +@jobs.command('stats') +@click.option('--jobid', help='Get stats only on the specified job id') +@common_options +@batch_options +@keyvault_options +@aad_options +@pass_cli_context +def jobs_stats(ctx, jobid): + """Get statistics about jobs""" + ctx.initialize_for_batch() + convoy.fleet.action_jobs_stats(ctx.batch_client, ctx.config, job_id=jobid) + + @cli.group() @pass_cli_context def data(ctx):