- Resolves #110
This commit is contained in:
Fred Park 2017-08-04 14:47:16 -07:00
Родитель c5fa85adcb
Коммит 8a396f0e18
5 изменённых файлов: 367 добавлений и 9 удалений

Просмотреть файл

@ -5,6 +5,8 @@
### Added
- `random` and `file` task factories. See task factory guide for more
information.
- Summary statistics: `pool stats` and `jobs stats`. See the usage doc for
more information.
- Delete unusable nodes from pool with `--all-unusable` option for
`pool delnode`
- CNTK-GPU-Infiniband-IntelMPI recipe
@ -34,6 +36,8 @@ guide for more information.
- Job priority support
- Job migration support
- Compute node fill type support
- New commands: `jobs enable` and `jobs disable`. Please see the usage doc
for more information.
- From Scratch: Step-by-Step guide
- Azure Cloud Shell information

Просмотреть файл

@ -45,6 +45,7 @@ import tempfile
import time
# non-stdlib imports
import azure.batch.models as batchmodels
import dateutil.tz
# local imports
from . import autoscale
from . import crypto
@ -820,6 +821,178 @@ def del_pool(batch_client, config, pool_id=None):
return True
def pool_stats(batch_client, config, pool_id=None):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
# str) -> None
"""Get pool stats
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str pool_id: pool id
"""
if util.is_none_or_empty(pool_id):
pool_id = settings.pool_id(config)
try:
pool = batch_client.pool.get(
pool_id=pool_id,
pool_get_options=batchmodels.PoolGetOptions(expand='stats'),
)
except batchmodels.batch_error.BatchErrorException as ex:
if 'The specified pool does not exist' in ex.message.value:
logger.error('pool {} does not exist'.format(pool_id))
return
if pool.stats is not None and pool.stats.usage_stats is not None:
usage_stats = '{} * Total core hours: {} (last updated: {})'.format(
os.linesep,
pool.stats.usage_stats.dedicated_core_time,
pool.stats.usage_stats.last_update_time,
)
else:
usage_stats = ''
nodes = list(batch_client.compute_node.list(pool_id))
nsc = []
runnable_nodes = 0
for key, value in _node_state_counts(nodes)._asdict().items():
if key == 'running' or key == 'idle':
runnable_nodes += value
nsc.append(' * {}: {}'.format(key, value))
node_up_times = []
node_alloc_times = []
node_start_times = []
tasks_run = []
tasks_running = []
now = datetime.datetime.now(dateutil.tz.tzutc())
for node in nodes:
if node.last_boot_time is not None:
node_up_times.append((now - node.last_boot_time).total_seconds())
if (node.start_task_info is not None and
node.start_task_info.end_time is not None):
node_alloc_times.append(
(node.start_task_info.end_time -
node.allocation_time).total_seconds()
)
node_start_times.append(
(node.start_task_info.end_time -
node.last_boot_time).total_seconds()
)
tasks_run.append(node.total_tasks_run)
tasks_running.append(node.running_tasks_count)
total_running_tasks = sum(tasks_running)
runnable_task_slots = runnable_nodes * pool.max_tasks_per_node
total_task_slots = (
pool.current_dedicated_nodes + pool.current_low_priority_nodes
) * pool.max_tasks_per_node
busy_task_slots_fraction = (
0 if total_task_slots == 0 else
total_running_tasks / runnable_task_slots
)
version = 'N/A'
for md in pool.metadata:
if md.name == settings.get_metadata_version_name():
version = md.value
break
log = [
'* Batch Shipyard version: {}'.format(version),
'* Total nodes: {}'.format(
pool.current_dedicated_nodes + pool.current_low_priority_nodes
),
' * Dedicated nodes: {0} ({1:.1f}% of target){2}'.format(
pool.current_dedicated_nodes,
100 * (
1 if pool.target_dedicated_nodes == 0 else
pool.current_dedicated_nodes / pool.target_dedicated_nodes),
usage_stats,
),
' * Low Priority nodes: {0} ({1:.1f}% of target)'.format(
pool.current_low_priority_nodes,
100 * (
1 if pool.target_low_priority_nodes == 0 else
pool.current_low_priority_nodes /
pool.target_low_priority_nodes)
),
'* Node states:',
os.linesep.join(nsc),
]
if len(node_up_times) > 0:
log.extend([
'* Node uptime:',
' * Mean: {}'.format(
datetime.timedelta(
seconds=(sum(node_up_times) / len(node_up_times)))
),
' * Min: {}'.format(
datetime.timedelta(seconds=min(node_up_times))
),
' * Max: {}'.format(
datetime.timedelta(seconds=max(node_up_times))
),
])
if len(node_alloc_times) > 0:
log.extend([
'* Time taken for node creation to ready:',
' * Mean: {}'.format(
datetime.timedelta(
seconds=(sum(node_alloc_times) / len(node_alloc_times)))
),
' * Min: {}'.format(
datetime.timedelta(seconds=min(node_alloc_times))
),
' * Max: {}'.format(
datetime.timedelta(seconds=max(node_alloc_times))
),
])
if len(node_start_times) > 0:
log.extend([
'* Time taken for last boot startup (includes prep):',
' * Mean: {}'.format(
datetime.timedelta(
seconds=(sum(node_start_times) / len(node_start_times)))
),
' * Min: {}'.format(
datetime.timedelta(seconds=min(node_start_times))
),
' * Max: {}'.format(
datetime.timedelta(seconds=max(node_start_times))
),
])
if len(tasks_running) > 0:
log.extend([
'* Running tasks:',
' * Sum: {}'.format(total_running_tasks),
' * Mean: {}'.format(total_running_tasks / len(tasks_running)),
' * Min: {}'.format(min(tasks_running)),
' * Max: {}'.format(max(tasks_running)),
])
if len(tasks_run) > 0:
log.extend([
'* Total tasks run:',
' * Sum: {}'.format(sum(tasks_run)),
' * Mean: {}'.format(sum(tasks_run) / len(tasks_run)),
' * Min: {}'.format(min(tasks_run)),
' * Max: {}'.format(max(tasks_run)),
])
log.extend([
'* Task scheduling slots:',
' * Busy: {0} ({1:.2f}% of runnable)'.format(
total_running_tasks, 100 * busy_task_slots_fraction
),
' * Available: {0} ({1:.2f}% of runnable)'.format(
runnable_task_slots - total_running_tasks,
100 * (1 - busy_task_slots_fraction)
),
' * Runnable: {0} ({1:.2f}% of total)'.format(
runnable_task_slots,
100 * (
runnable_task_slots / total_task_slots
if total_task_slots > 0 else 0
),
),
' * Total: {}'.format(total_task_slots),
])
logger.info('statistics summary for pool {}{}{}'.format(
pool_id, os.linesep, os.linesep.join(log)))
def pool_autoscale_disable(batch_client, config):
# type: (batch.BatchServiceClient, dict) -> None
"""Enable autoscale formula
@ -1069,6 +1242,131 @@ def update_job_with_pool(batch_client, config, jobid=None, poolid=None):
job_id, poolid))
def job_stats(batch_client, config, jobid=None):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
# str) -> None
"""Job stats
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str jobid: job id to query
"""
if jobid is not None:
try:
job = batch_client.job.get(
job_id=jobid,
job_get_options=batchmodels.JobGetOptions(expand='stats'),
)
except batchmodels.batch_error.BatchErrorException as ex:
if 'The specified job does not exist' in ex.message.value:
raise RuntimeError('job {} does not exist'.format(jobid))
jobs = [job]
else:
jobs = list(batch_client.job.list(
job_list_options=batchmodels.JobListOptions(expand='stats')))
job_count = 0
job_times = []
task_times = []
task_wall_times = []
task_counts = batchmodels.TaskCounts(0, 0, 0, 0, 0, 'validated')
total_tasks = 0
for job in jobs:
job_count += 1
# get task counts
tc = batch_client.job.get_task_counts(job_id=job.id)
task_counts.active += tc.active
task_counts.running += tc.running
task_counts.completed += tc.completed
task_counts.succeeded += tc.succeeded
task_counts.failed += tc.failed
total_tasks += tc.active + tc.running + tc.completed
if (tc.validation_status !=
batchmodels.TaskCountValidationStatus.validated):
task_counts.validation_status = tc.validation_status
if job.execution_info.end_time is not None:
job_times.append(
(job.execution_info.end_time -
job.execution_info.start_time).total_seconds())
# get task-level execution info
tasks = batch_client.task.list(
job_id=job.id,
task_list_options=batchmodels.TaskListOptions(
filter='(state eq \'running\') or (state eq \'completed\')',
select='id,state,stats,executionInfo',
))
for task in tasks:
if task.stats is not None:
task_wall_times.append(
task.stats.wall_clock_time.total_seconds())
if (task.execution_info is not None and
task.execution_info.end_time is not None):
task_times.append(
(task.execution_info.end_time -
task.execution_info.start_time).total_seconds())
log = [
'* Total jobs: {}'.format(job_count),
'* Total tasks: {} ({})'.format(
total_tasks, task_counts.validation_status
),
' * Active: {}'.format(task_counts.active),
' * Running: {}'.format(task_counts.running),
' * Completed: {}'.format(task_counts.completed),
' * Succeeded: {0} ({1:.2f}% of completed)'.format(
task_counts.succeeded,
100 * task_counts.succeeded / task_counts.completed
if task_counts.completed > 0 else 0
),
' * Failed: {0} ({1:.2f}% of completed)'.format(
task_counts.failed,
100 * task_counts.failed / task_counts.completed
if task_counts.completed > 0 else 0
),
]
if len(job_times) > 0:
log.extend([
'* Job creation to completion time:',
' * Mean: {}'.format(
datetime.timedelta(seconds=(sum(job_times) / len(job_times)))
),
' * Min: {}'.format(
datetime.timedelta(seconds=min(job_times))
),
' * Max: {}'.format(
datetime.timedelta(seconds=max(job_times))
),
])
if len(task_times) > 0:
log.extend([
'* Task end-to-end time:',
' * Mean: {}'.format(
datetime.timedelta(seconds=(sum(task_times) / len(task_times)))
),
' * Min: {}'.format(
datetime.timedelta(seconds=min(task_times))
),
' * Max: {}'.format(
datetime.timedelta(seconds=max(task_times))
),
])
if len(task_wall_times) > 0:
log.extend([
'* Task command walltime (running and completed):',
' * Mean: {}'.format(
datetime.timedelta(
seconds=(sum(task_wall_times) / len(task_wall_times)))
),
' * Min: {}'.format(
datetime.timedelta(seconds=min(task_wall_times))
),
' * Max: {}'.format(
datetime.timedelta(seconds=max(task_wall_times))
),
])
logger.info('statistics summary for {}{}{}'.format(
'job {}'.format(jobid) if jobid is not None else 'all jobs',
os.linesep, os.linesep.join(log)))
def disable_jobs(
batch_client, config, disable_tasks_action, jobid=None,
disabling_state_ok=False, term_tasks=False):
@ -1661,15 +1959,17 @@ def terminate_tasks(
raise
def list_nodes(batch_client, config, nodes=None):
# type: (batch.BatchServiceClient, dict, list) -> None
def list_nodes(batch_client, config, pool_id=None, nodes=None):
# type: (batch.BatchServiceClient, dict, str, list) -> None
"""Get a list of nodes
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param lsit nodes: list of nodes
:param str pool_id: pool id
:param list nodes: list of nodes
"""
pool_id = settings.pool_id(config)
if util.is_none_or_empty(pool_id):
pool_id = settings.pool_id(config)
logger.debug('listing nodes for pool {}'.format(pool_id))
if nodes is None:
nodes = batch_client.compute_node.list(pool_id)
@ -2325,7 +2625,6 @@ def check_jobs_for_auto_pool(config):
else:
autopool.append(True)
if autopool.count(False) == len(autopool):
logger.debug('autopool not detected for jobs')
return False
elif autopool.count(True) == len(autopool):
logger.debug('autopool detected for jobs')

Просмотреть файл

@ -2509,6 +2509,17 @@ def action_pool_listimages(batch_client, config):
_list_docker_images(batch_client, config)
def action_pool_stats(batch_client, config, pool_id):
# type: (batchsc.BatchServiceClient, dict, str) -> None
"""Action: Pool Stats
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param dict config: configuration dict
:param str pool_id: pool id
"""
batch.pool_stats(batch_client, config, pool_id=pool_id)
def action_pool_autoscale_disable(batch_client, config):
# type: (batchsc.BatchServiceClient, dict, str, str, bool) -> None
"""Action: Pool Autoscale Disable
@ -2867,6 +2878,17 @@ def action_jobs_enable(batch_client, config, jobid):
batch.enable_jobs(batch_client, config, jobid=jobid)
def action_jobs_stats(batch_client, config, job_id):
# type: (batchsc.BatchServiceClient, dict, str) -> None
"""Action: Jobs Stats
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param dict config: configuration dict
:param str job_id: job id
"""
batch.job_stats(batch_client, config, jobid=job_id)
def action_storage_del(
blob_client, queue_client, table_client, config, clear_tables, poolid):
# type: (azureblob.BlockBlobService, azurequeue.QueueService,

Просмотреть файл

@ -388,6 +388,8 @@ file
* `--requeue` requeue running tasks
* `--terminate` terminate running tasks
* `--wait` wait for running tasks to complete
* `stats` will generate a statistics summary of a job or jobs
* `--jobid` will query the specified job instead of all jobs
* `term` will terminate jobs found in the jobs configuration file. If an
autopool is specified for all jobs and a jobid option is not specified,
the storage associated with the autopool will be cleaned up.
@ -456,12 +458,13 @@ The `pool` command has the following sub-commands:
dsu Delete an SSH user from all nodes in pool
grls Get remote login settings for all nodes in...
list List all pools in the Batch account
listimages List Docker images in the pool
listimages List Docker images in a pool
listnodes List nodes in pool
listskus List available VM configurations available to...
rebootnode Reboot a node or nodes in a pool
resize Resize a pool
ssh Interactively login via SSH to a node in the...
ssh Interactively login via SSH to a node in a...
stats Get statistics about a pool
udi Update Docker images in a pool
```
* `add` will add the pool defined in the pool configuration file to the
@ -514,6 +517,9 @@ configuration file
the pool to connect to as listed by `grls`
* `--nodeid` is the node id to connect to in the pool
* `--tty` allocates a pseudo-terminal
* `stats` will generate a statistics summary of the pool
* `--poolid` will query the specified pool instead of the pool from the
pool configuration file
* `udi` will update Docker images on all compute nodes of the pool. This
command requires a valid SSH user.
* `--image` will restrict the update to just the image or image:tag

Просмотреть файл

@ -1178,7 +1178,7 @@ def pool_dsu(ctx):
@aad_options
@pass_cli_context
def pool_ssh(ctx, cardinal, nodeid, tty, command):
"""Interactively login via SSH to a node in the pool"""
"""Interactively login via SSH to a node in a pool"""
ctx.initialize_for_batch()
convoy.fleet.action_pool_ssh(
ctx.batch_client, ctx.config, cardinal, nodeid, tty, command)
@ -1253,11 +1253,25 @@ def pool_udi(ctx, image, digest, ssh):
@aad_options
@pass_cli_context
def pool_listimages(ctx):
"""List Docker images in the pool"""
"""List Docker images in a pool"""
ctx.initialize_for_batch()
convoy.fleet.action_pool_listimages(ctx.batch_client, ctx.config)
@pool.command('stats')
@click.option('--poolid', help='Get stats on specified pool')
@common_options
@batch_options
@keyvault_options
@aad_options
@pass_cli_context
def pool_stats(ctx, poolid):
"""Get statistics about a pool"""
ctx.initialize_for_batch()
convoy.fleet.action_pool_stats(
ctx.batch_client, ctx.config, pool_id=poolid)
@pool.group()
@pass_cli_context
def autoscale(ctx):
@ -1530,6 +1544,19 @@ def jobs_enable(ctx, jobid):
convoy.fleet.action_jobs_enable(ctx.batch_client, ctx.config, jobid)
@jobs.command('stats')
@click.option('--jobid', help='Get stats only on the specified job id')
@common_options
@batch_options
@keyvault_options
@aad_options
@pass_cli_context
def jobs_stats(ctx, jobid):
"""Get statistics about jobs"""
ctx.initialize_for_batch()
convoy.fleet.action_jobs_stats(ctx.batch_client, ctx.config, job_id=jobid)
@cli.group()
@pass_cli_context
def data(ctx):