diff --git a/convoy/batch.py b/convoy/batch.py index e840c6a..8979b10 100644 --- a/convoy/batch.py +++ b/convoy/batch.py @@ -143,6 +143,28 @@ def _wait_for_pool_ready(batch_client, node_state, pool_id, reboot_on_failed): time.sleep(10) +def check_pool_nodes_runnable(batch_client, config): + # type: (batch.BatchServiceClient, dict) -> bool + """Check that all pool nodes in idle/running state + :param batch_client: The batch client to use. + :type batch_client: `azure.batch.batch_service_client.BatchServiceClient` + :param dict config: configuration dict + :rtype: bool + :return: all pool nodes are runnable + """ + pool_id = config['pool_specification']['id'] + node_state = frozenset( + (batchmodels.ComputeNodeState.idle, + batchmodels.ComputeNodeState.running) + ) + pool = batch_client.pool.get(pool_id) + nodes = list(batch_client.compute_node.list(pool_id)) + if (len(nodes) >= pool.target_dedicated and + all(node.state in node_state for node in nodes)): + return True + return False + + def create_pool(batch_client, config, pool): # type: (batch.BatchServiceClient, dict, batchmodels.PoolAddParameter) -> # List[batchmodels.ComputeNode] diff --git a/convoy/data.py b/convoy/data.py index f74f258..b03f558 100644 --- a/convoy/data.py +++ b/convoy/data.py @@ -38,6 +38,7 @@ try: except ImportError: from pipes import quote as shellquote import threading +import time # local imports import convoy.batch import convoy.util @@ -412,6 +413,7 @@ def _azure_blob_storage_transfer( args=(storage_settings, container, src, src_incl, eo) ) thr.start() + return thr def _wrap_blobxfer_subprocess(storage_settings, container, src, src_incl, eo): @@ -428,10 +430,15 @@ def _wrap_blobxfer_subprocess(storage_settings, container, src, src_incl, eo): rsrc = psrc.relative_to(psrc.parent) env = os.environ.copy() env['BLOBXFER_STORAGEACCOUNTKEY'] = storage_settings['account_key'] - cmd = ['blobxfer {} {} {} --upload --no-progressbar {} {}'.format( - storage_settings['account'], container, rsrc, - '--include \'{}\''.format(src_incl) if src_incl is not None else '', - eo)] + cmd = [ + ('blobxfer {} {} {} --endpoint {} --upload --no-progressbar ' + '{} {}').format( + storage_settings['account'], container, rsrc, + storage_settings['endpoint'], + '--include \'{}\''.format(src_incl) + if src_incl is not None else '', + eo) + ] logger.info('begin ingressing data from {} to container {}'.format( src, container)) proc = convoy.util.subprocess_nowait_pipe_stdout( @@ -445,13 +452,41 @@ def _wrap_blobxfer_subprocess(storage_settings, container, src, src_incl, eo): logger.info(stdout.decode('utf8')) -def ingress_data(batch_client, config, rls=None): - # type: (batch.BatchServiceClient, dict, dict) -> None +def wait_for_storage_threads(storage_threads): + # type: (list) -> None + """Wait for storage processes to complete + :param list storage_threads: list of storage threads + """ + i = 0 + nthreads = len(storage_threads) + while nthreads > 0: + alive = sum(thr.is_alive() for thr in storage_threads) + if alive > 0: + i += 1 + if i % 10 == 0: + i = 0 + logger.debug( + 'waiting for Azure Blob Storage transfer processes ' + 'to complete: {} active, {} completed'.format( + alive, nthreads - alive)) + time.sleep(1) + else: + for thr in storage_threads: + thr.join() + logger.info('Azure Blob Storage transfer completed') + break + + +def ingress_data(batch_client, config, rls=None, kind=None): + # type: (batch.BatchServiceClient, dict, dict, str) -> list """Ingresses data into Azure Batch :param batch_client: The batch client to use. :type batch_client: `batchserviceclient.BatchServiceClient` :param dict config: configuration dict :param dict rls: remote login settings + :param str kind: 'all', 'shared', or 'storage' + :rtype: list + :return: list of storage threads """ try: files = config['global_resources']['files'] @@ -462,6 +497,7 @@ def ingress_data(batch_client, config, rls=None): username = config['pool_specification']['ssh']['username'] except KeyError: username = None + storage_threads = [] for fdict in files: src = fdict['source']['path'] try: @@ -489,6 +525,17 @@ def ingress_data(batch_client, config, rls=None): 'cannot specify both shared data volume and storage for the ' 'destination for source: {}'.format(src)) if shared is not None: + if rls is None: + logger.warning( + 'skipping data ingress from {} to {} for pool with no ' + 'remote login settings or non-existent pool'.format( + src, shared)) + continue + if kind == 'storage': + logger.warning( + 'skipping data ingress from {} to {} for pool as ingress ' + 'to shared file system not specified'.format(src, shared)) + continue if username is None: raise RuntimeError( 'cannot ingress data to shared data volume without a ' @@ -563,9 +610,6 @@ def ingress_data(batch_client, config, rls=None): else: raise RuntimeError( 'data ingress to {} not supported'.format(driver)) - if rls is None: - rls = convoy.batch.get_remote_login_settings( - batch_client, config) if method == 'scp' or method == 'rsync+ssh': # split/source include/exclude will force multinode # transfer with mpt=1 @@ -586,6 +630,11 @@ def ingress_data(batch_client, config, rls=None): raise RuntimeError( 'unknown transfer method: {}'.format(method)) elif storage is not None: + if kind == 'shared': + logger.warning( + 'skipping data ingress from {} to {} for pool as ingress ' + 'to Azure Blob Storage not specified'.format(src, storage)) + continue try: container = fdict['destination']['data_transfer']['container'] if container is not None and len(container) == 0: @@ -612,9 +661,11 @@ def ingress_data(batch_client, config, rls=None): eo = '' except KeyError: eo = '' - _azure_blob_storage_transfer( + thr = _azure_blob_storage_transfer( config['credentials']['storage'][storage], container, src, src_incl, eo) + storage_threads.append(thr) else: raise RuntimeError( 'invalid file transfer configuration: {}'.format(fdict)) + return storage_threads diff --git a/docs/10-batch-shipyard-configuration.md b/docs/10-batch-shipyard-configuration.md index 18374f6..d476e2c 100644 --- a/docs/10-batch-shipyard-configuration.md +++ b/docs/10-batch-shipyard-configuration.md @@ -412,8 +412,13 @@ from entering ready state until all Docker images are loaded. This defaults to `true`. * (optional) `transfer_files_on_pool_creation` will ingress all `files` specified in the `global_resources` section of the configuration json when -the pool is created to the compute nodes of the pool. If this property is set -to `true` then `block_until_all_global_resources_loaded` will be force +the pool is created. If files are to be ingressed to Azure Blob Storage, +then data movement operations are overlapped with the creation of the pool. +If files are to be ingressed to a shared file system on the compute nodes, +then the files are ingressed after the pool is created and the shared file +system is ready. Files can be ingressed to both Azure Blob Storage and a +shared file system during the same pool creation invocation. If this property +is set to `true` then `block_until_all_global_resources_loaded` will be force disabled. If omitted, this property defaults to `false`. * (optional) `ssh` is the property for creating a user to accomodate SSH sessions to compute nodes. If this property is absent, then an SSH user is not diff --git a/shipyard.py b/shipyard.py index 391612c..7cafb12 100755 --- a/shipyard.py +++ b/shipyard.py @@ -303,6 +303,16 @@ def add_pool(batch_client, blob_client, config): sku = config['pool_specification']['sku'] vm_count = config['pool_specification']['vm_count'] vm_size = config['pool_specification']['vm_size'] + try: + ingress_files = config[ + 'pool_specification']['transfer_files_on_pool_creation'] + except KeyError: + ingress_files = False + # ingress data to Azure Blob Storage if specified + storage_threads = [] + if ingress_files: + storage_threads = convoy.data.ingress_data( + batch_client, config, rls=None, kind='storage') try: maxtasks = config['pool_specification']['max_tasks_per_node'] except KeyError: @@ -548,14 +558,11 @@ def add_pool(batch_client, blob_client, config): convoy.batch.add_ssh_user(batch_client, config, nodes) # log remote login settings rls = convoy.batch.get_remote_login_settings(batch_client, config, nodes) - # ingress data if specified - try: - ingress_files = config[ - 'pool_specification']['transfer_files_on_pool_creation'] - except KeyError: - ingress_files = False + # ingress data to shared fs if specified if ingress_files: - convoy.data.ingress_data(batch_client, config, rls) + convoy.data.ingress_data(batch_client, config, rls=rls, kind='shared') + # wait for storage ingress processes + convoy.data.wait_for_storage_threads(storage_threads) def _setup_glusterfs(batch_client, blob_client, config, nodes): @@ -889,7 +896,24 @@ def main(): elif args.action == 'getnodefile': convoy.batch.get_file_via_node(batch_client, config, args.nodeid) elif args.action == 'ingressdata': - convoy.data.ingress_data(batch_client, config) + try: + # ensure there are remote login settings + rls = convoy.batch.get_remote_login_settings( + batch_client, config, nodes=None) + # ensure nodes are at least idle/running for shared ingress + kind = 'all' + if not convoy.batch.check_pool_nodes_runnable( + batch_client, config): + kind = 'storage' + except batchmodels.BatchErrorException as ex: + if 'The specified pool does not exist' in ex.message.value: + rls = None + kind = 'storage' + else: + raise + storage_threads = convoy.data.ingress_data( + batch_client, config, rls=rls, kind=kind) + convoy.data.wait_for_storage_threads(storage_threads) elif args.action == 'listjobs': convoy.batch.list_jobs(batch_client, config) elif args.action == 'listtasks':