Integrate blob storage ingress with pool creation

This commit is contained in:
Fred Park 2016-10-14 09:36:38 -07:00
Родитель b0d3b9ba69
Коммит 9d0c6d3ca6
4 изменённых файлов: 122 добавлений и 20 удалений

Просмотреть файл

@ -143,6 +143,28 @@ def _wait_for_pool_ready(batch_client, node_state, pool_id, reboot_on_failed):
time.sleep(10)
def check_pool_nodes_runnable(batch_client, config):
# type: (batch.BatchServiceClient, dict) -> bool
"""Check that all pool nodes in idle/running state
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:rtype: bool
:return: all pool nodes are runnable
"""
pool_id = config['pool_specification']['id']
node_state = frozenset(
(batchmodels.ComputeNodeState.idle,
batchmodels.ComputeNodeState.running)
)
pool = batch_client.pool.get(pool_id)
nodes = list(batch_client.compute_node.list(pool_id))
if (len(nodes) >= pool.target_dedicated and
all(node.state in node_state for node in nodes)):
return True
return False
def create_pool(batch_client, config, pool):
# type: (batch.BatchServiceClient, dict, batchmodels.PoolAddParameter) ->
# List[batchmodels.ComputeNode]

Просмотреть файл

@ -38,6 +38,7 @@ try:
except ImportError:
from pipes import quote as shellquote
import threading
import time
# local imports
import convoy.batch
import convoy.util
@ -412,6 +413,7 @@ def _azure_blob_storage_transfer(
args=(storage_settings, container, src, src_incl, eo)
)
thr.start()
return thr
def _wrap_blobxfer_subprocess(storage_settings, container, src, src_incl, eo):
@ -428,10 +430,15 @@ def _wrap_blobxfer_subprocess(storage_settings, container, src, src_incl, eo):
rsrc = psrc.relative_to(psrc.parent)
env = os.environ.copy()
env['BLOBXFER_STORAGEACCOUNTKEY'] = storage_settings['account_key']
cmd = ['blobxfer {} {} {} --upload --no-progressbar {} {}'.format(
cmd = [
('blobxfer {} {} {} --endpoint {} --upload --no-progressbar '
'{} {}').format(
storage_settings['account'], container, rsrc,
'--include \'{}\''.format(src_incl) if src_incl is not None else '',
eo)]
storage_settings['endpoint'],
'--include \'{}\''.format(src_incl)
if src_incl is not None else '',
eo)
]
logger.info('begin ingressing data from {} to container {}'.format(
src, container))
proc = convoy.util.subprocess_nowait_pipe_stdout(
@ -445,13 +452,41 @@ def _wrap_blobxfer_subprocess(storage_settings, container, src, src_incl, eo):
logger.info(stdout.decode('utf8'))
def ingress_data(batch_client, config, rls=None):
# type: (batch.BatchServiceClient, dict, dict) -> None
def wait_for_storage_threads(storage_threads):
# type: (list) -> None
"""Wait for storage processes to complete
:param list storage_threads: list of storage threads
"""
i = 0
nthreads = len(storage_threads)
while nthreads > 0:
alive = sum(thr.is_alive() for thr in storage_threads)
if alive > 0:
i += 1
if i % 10 == 0:
i = 0
logger.debug(
'waiting for Azure Blob Storage transfer processes '
'to complete: {} active, {} completed'.format(
alive, nthreads - alive))
time.sleep(1)
else:
for thr in storage_threads:
thr.join()
logger.info('Azure Blob Storage transfer completed')
break
def ingress_data(batch_client, config, rls=None, kind=None):
# type: (batch.BatchServiceClient, dict, dict, str) -> list
"""Ingresses data into Azure Batch
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
:param dict rls: remote login settings
:param str kind: 'all', 'shared', or 'storage'
:rtype: list
:return: list of storage threads
"""
try:
files = config['global_resources']['files']
@ -462,6 +497,7 @@ def ingress_data(batch_client, config, rls=None):
username = config['pool_specification']['ssh']['username']
except KeyError:
username = None
storage_threads = []
for fdict in files:
src = fdict['source']['path']
try:
@ -489,6 +525,17 @@ def ingress_data(batch_client, config, rls=None):
'cannot specify both shared data volume and storage for the '
'destination for source: {}'.format(src))
if shared is not None:
if rls is None:
logger.warning(
'skipping data ingress from {} to {} for pool with no '
'remote login settings or non-existent pool'.format(
src, shared))
continue
if kind == 'storage':
logger.warning(
'skipping data ingress from {} to {} for pool as ingress '
'to shared file system not specified'.format(src, shared))
continue
if username is None:
raise RuntimeError(
'cannot ingress data to shared data volume without a '
@ -563,9 +610,6 @@ def ingress_data(batch_client, config, rls=None):
else:
raise RuntimeError(
'data ingress to {} not supported'.format(driver))
if rls is None:
rls = convoy.batch.get_remote_login_settings(
batch_client, config)
if method == 'scp' or method == 'rsync+ssh':
# split/source include/exclude will force multinode
# transfer with mpt=1
@ -586,6 +630,11 @@ def ingress_data(batch_client, config, rls=None):
raise RuntimeError(
'unknown transfer method: {}'.format(method))
elif storage is not None:
if kind == 'shared':
logger.warning(
'skipping data ingress from {} to {} for pool as ingress '
'to Azure Blob Storage not specified'.format(src, storage))
continue
try:
container = fdict['destination']['data_transfer']['container']
if container is not None and len(container) == 0:
@ -612,9 +661,11 @@ def ingress_data(batch_client, config, rls=None):
eo = ''
except KeyError:
eo = ''
_azure_blob_storage_transfer(
thr = _azure_blob_storage_transfer(
config['credentials']['storage'][storage], container, src,
src_incl, eo)
storage_threads.append(thr)
else:
raise RuntimeError(
'invalid file transfer configuration: {}'.format(fdict))
return storage_threads

Просмотреть файл

@ -412,8 +412,13 @@ from entering ready state until all Docker images are loaded. This defaults
to `true`.
* (optional) `transfer_files_on_pool_creation` will ingress all `files`
specified in the `global_resources` section of the configuration json when
the pool is created to the compute nodes of the pool. If this property is set
to `true` then `block_until_all_global_resources_loaded` will be force
the pool is created. If files are to be ingressed to Azure Blob Storage,
then data movement operations are overlapped with the creation of the pool.
If files are to be ingressed to a shared file system on the compute nodes,
then the files are ingressed after the pool is created and the shared file
system is ready. Files can be ingressed to both Azure Blob Storage and a
shared file system during the same pool creation invocation. If this property
is set to `true` then `block_until_all_global_resources_loaded` will be force
disabled. If omitted, this property defaults to `false`.
* (optional) `ssh` is the property for creating a user to accomodate SSH
sessions to compute nodes. If this property is absent, then an SSH user is not

Просмотреть файл

@ -303,6 +303,16 @@ def add_pool(batch_client, blob_client, config):
sku = config['pool_specification']['sku']
vm_count = config['pool_specification']['vm_count']
vm_size = config['pool_specification']['vm_size']
try:
ingress_files = config[
'pool_specification']['transfer_files_on_pool_creation']
except KeyError:
ingress_files = False
# ingress data to Azure Blob Storage if specified
storage_threads = []
if ingress_files:
storage_threads = convoy.data.ingress_data(
batch_client, config, rls=None, kind='storage')
try:
maxtasks = config['pool_specification']['max_tasks_per_node']
except KeyError:
@ -548,14 +558,11 @@ def add_pool(batch_client, blob_client, config):
convoy.batch.add_ssh_user(batch_client, config, nodes)
# log remote login settings
rls = convoy.batch.get_remote_login_settings(batch_client, config, nodes)
# ingress data if specified
try:
ingress_files = config[
'pool_specification']['transfer_files_on_pool_creation']
except KeyError:
ingress_files = False
# ingress data to shared fs if specified
if ingress_files:
convoy.data.ingress_data(batch_client, config, rls)
convoy.data.ingress_data(batch_client, config, rls=rls, kind='shared')
# wait for storage ingress processes
convoy.data.wait_for_storage_threads(storage_threads)
def _setup_glusterfs(batch_client, blob_client, config, nodes):
@ -889,7 +896,24 @@ def main():
elif args.action == 'getnodefile':
convoy.batch.get_file_via_node(batch_client, config, args.nodeid)
elif args.action == 'ingressdata':
convoy.data.ingress_data(batch_client, config)
try:
# ensure there are remote login settings
rls = convoy.batch.get_remote_login_settings(
batch_client, config, nodes=None)
# ensure nodes are at least idle/running for shared ingress
kind = 'all'
if not convoy.batch.check_pool_nodes_runnable(
batch_client, config):
kind = 'storage'
except batchmodels.BatchErrorException as ex:
if 'The specified pool does not exist' in ex.message.value:
rls = None
kind = 'storage'
else:
raise
storage_threads = convoy.data.ingress_data(
batch_client, config, rls=rls, kind=kind)
convoy.data.wait_for_storage_threads(storage_threads)
elif args.action == 'listjobs':
convoy.batch.list_jobs(batch_client, config)
elif args.action == 'listtasks':