Enable data ingress to remote storage clusters
- Refactor some constants to function access from the proper locations
This commit is contained in:
Родитель
e0490cf0b4
Коммит
9dc4673530
|
@ -485,7 +485,8 @@ def generate_ssh_tunnel_script(batch_client, pool, ssh_priv_key, nodes):
|
|||
nodes = batch_client.compute_node.list(pool.id)
|
||||
if ssh_priv_key is None:
|
||||
ssh_priv_key = pathlib.Path(
|
||||
pool.ssh.generated_file_export_path, crypto._SSH_KEY_PREFIX)
|
||||
pool.ssh.generated_file_export_path,
|
||||
crypto.get_ssh_key_prefix())
|
||||
ssh_args = [
|
||||
'ssh', '-o', 'StrictHostKeyChecking=no',
|
||||
'-o', 'UserKnownHostsFile={}'.format(os.devnull),
|
||||
|
|
|
@ -48,6 +48,7 @@ logger = logging.getLogger(__name__)
|
|||
util.setup_logger(logger)
|
||||
# global defines
|
||||
_SSH_KEY_PREFIX = 'id_rsa_shipyard'
|
||||
_REMOTEFS_SSH_KEY_PREFIX = '{}_remotefs'.format(_SSH_KEY_PREFIX)
|
||||
# named tuples
|
||||
PfxSettings = collections.namedtuple(
|
||||
'PfxSettings', ['filename', 'passphrase', 'sha1'])
|
||||
|
@ -62,6 +63,15 @@ def get_ssh_key_prefix():
|
|||
return _SSH_KEY_PREFIX
|
||||
|
||||
|
||||
def get_remotefs_ssh_key_prefix():
|
||||
# type: (None) -> str
|
||||
"""Get remote fs SSH key prefix
|
||||
:rtype: str
|
||||
:return: ssh key prefix for remote fs
|
||||
"""
|
||||
return _REMOTEFS_SSH_KEY_PREFIX
|
||||
|
||||
|
||||
def generate_ssh_keypair(export_path, prefix=None):
|
||||
# type: (str, str) -> tuple
|
||||
"""Generate an ssh keypair for use with user logins
|
||||
|
|
123
convoy/data.py
123
convoy/data.py
|
@ -45,8 +45,11 @@ except ImportError:
|
|||
from pipes import quote as shellquote
|
||||
import threading
|
||||
import time
|
||||
# non-stdlib imports
|
||||
import azure.batch.models as batchmodels
|
||||
# local imports
|
||||
from . import crypto
|
||||
from . import resource
|
||||
from . import settings
|
||||
from . import storage
|
||||
from . import util
|
||||
|
@ -75,7 +78,7 @@ def _get_gluster_paths(config):
|
|||
for sdvkey in sdv:
|
||||
if settings.is_shared_data_volume_gluster_on_compute(sdv, sdvkey):
|
||||
gluster_host = '$AZ_BATCH_NODE_SHARED_DIR/{}'.format(
|
||||
settings.get_gluster_volume())
|
||||
settings.get_gluster_on_compute_volume())
|
||||
gluster_container = settings.shared_data_volume_container_path(
|
||||
sdv, sdvkey).rstrip('/')
|
||||
break
|
||||
|
@ -775,14 +778,21 @@ def wait_for_storage_threads(storage_threads):
|
|||
|
||||
|
||||
def ingress_data(
|
||||
batch_client, config, rls=None, kind=None, current_dedicated=None):
|
||||
# type: (batch.BatchServiceClient, dict, dict, str, int) -> list
|
||||
"""Ingresses data into Azure Batch
|
||||
batch_client, compute_client, network_client, config, rls=None,
|
||||
kind=None, current_dedicated=None):
|
||||
# type: (batch.BatchServiceClient,
|
||||
# azure.mgmt.compute.ComputeManagementClient, dict, dict, str,
|
||||
# int) -> list
|
||||
"""Ingresses data into Azure
|
||||
:param batch_client: The batch client to use.
|
||||
:type batch_client: `batchserviceclient.BatchServiceClient`
|
||||
:param azure.mgmt.compute.ComputeManagementClient compute_client:
|
||||
compute client
|
||||
:param azure.mgmt.network.NetworkManagementClient network_client:
|
||||
network client
|
||||
:param dict config: configuration dict
|
||||
:param dict rls: remote login settings
|
||||
:param str kind: 'all', 'shared', or 'storage'
|
||||
:param str kind: 'all', 'shared', 'storage', or 'remotefs'
|
||||
:param int current_dedicated: current dedicated
|
||||
:rtype: list
|
||||
:return: list of storage threads
|
||||
|
@ -821,33 +831,15 @@ def ingress_data(
|
|||
'a shared data volume as the ingress destination '
|
||||
'instead.')
|
||||
if dest.shared_data_volume is not None or direct_single_node:
|
||||
if rls is None:
|
||||
logger.warning(
|
||||
'skipping data ingress from {} to {} for pool with no '
|
||||
'remote login settings or non-existent pool'.format(
|
||||
source.path, dest.shared_data_volume))
|
||||
continue
|
||||
if kind == 'storage':
|
||||
logger.warning(
|
||||
'skipping data ingress from {} to {} for pool as ingress '
|
||||
'to shared file system not specified'.format(
|
||||
source.path, dest.shared_data_volume))
|
||||
continue
|
||||
if pool.ssh.username is None:
|
||||
raise RuntimeError(
|
||||
'cannot ingress data to shared data volume without a '
|
||||
'valid SSH user')
|
||||
if dest.data_transfer.ssh_private_key is None:
|
||||
# use default name for private key
|
||||
ssh_private_key = pathlib.Path(crypto.get_ssh_key_prefix())
|
||||
else:
|
||||
ssh_private_key = dest.data_transfer.ssh_private_key
|
||||
if not ssh_private_key.exists():
|
||||
raise RuntimeError(
|
||||
'ssh private key does not exist at: {}'.format(
|
||||
ssh_private_key))
|
||||
logger.debug('using ssh_private_key from: {}'.format(
|
||||
ssh_private_key))
|
||||
# get rfs settings
|
||||
rfs = settings.remotefs_settings(config)
|
||||
dst_rfs = False
|
||||
# set base dst path
|
||||
dst = '{}/batch/tasks/'.format(
|
||||
settings.temp_disk_mountpoint(config))
|
||||
|
@ -858,13 +850,80 @@ def ingress_data(
|
|||
if sdvkey == dest.shared_data_volume:
|
||||
if settings.is_shared_data_volume_gluster_on_compute(
|
||||
sdv, sdvkey):
|
||||
if kind == 'remotefs':
|
||||
continue
|
||||
dst = '{}shared/{}/'.format(
|
||||
dst, settings.get_gluster_volume())
|
||||
dst, settings.get_gluster_on_compute_volume())
|
||||
elif settings.is_shared_data_volume_storage_cluster(
|
||||
sdv, sdvkey):
|
||||
if kind != 'remotefs':
|
||||
continue
|
||||
dst = rfs.storage_cluster.file_server.mountpoint
|
||||
# add trailing directory separator if needed
|
||||
if dst[-1] != '/':
|
||||
dst = dst + '/'
|
||||
dst_rfs = True
|
||||
else:
|
||||
raise RuntimeError(
|
||||
'data ingress to {} not supported'.format(
|
||||
sdvkey))
|
||||
break
|
||||
# skip entries that are a mismatch if remotefs transfer
|
||||
# is selected
|
||||
if kind == 'remotefs':
|
||||
if not dst_rfs:
|
||||
continue
|
||||
else:
|
||||
if dst_rfs:
|
||||
continue
|
||||
# set ssh info
|
||||
ssh_private_key = None
|
||||
# use default name for private key if not specified
|
||||
if dst_rfs:
|
||||
username = rfs.storage_cluster.ssh.username
|
||||
if dest.data_transfer.ssh_private_key is None:
|
||||
ssh_private_key = pathlib.Path(
|
||||
crypto.get_remotefs_ssh_key_prefix())
|
||||
# retrieve public ips from all vms in named storage cluster
|
||||
rls = {}
|
||||
for i in range(rfs.storage_cluster.vm_count):
|
||||
vm_name = '{}-vm{}'.format(
|
||||
rfs.storage_cluster.hostname_prefix, i)
|
||||
vm = compute_client.virtual_machines.get(
|
||||
resource_group_name=rfs.storage_cluster.resource_group,
|
||||
vm_name=vm_name,
|
||||
)
|
||||
_, pip = resource.get_nic_and_pip_from_virtual_machine(
|
||||
network_client, rfs.storage_cluster.resource_group, vm)
|
||||
# create compute node rls settings with sc vm ip/port
|
||||
rls[vm_name] = \
|
||||
batchmodels.ComputeNodeGetRemoteLoginSettingsResult(
|
||||
remote_login_ip_address=pip.ip_address,
|
||||
remote_login_port=22,
|
||||
)
|
||||
else:
|
||||
username = pool.ssh.username
|
||||
if dest.data_transfer.ssh_private_key is None:
|
||||
ssh_private_key = pathlib.Path(
|
||||
crypto.get_ssh_key_prefix())
|
||||
if rls is None:
|
||||
logger.warning(
|
||||
'skipping data ingress from {} to {} for pool with no '
|
||||
'remote login settings or non-existent pool'.format(
|
||||
source.path, dest.shared_data_volume))
|
||||
continue
|
||||
if username is None:
|
||||
raise RuntimeError(
|
||||
'cannot ingress data to shared data volume without a '
|
||||
'valid SSH user')
|
||||
if ssh_private_key is None:
|
||||
ssh_private_key = dest.data_transfer.ssh_private_key
|
||||
if ssh_private_key is None or not ssh_private_key.exists():
|
||||
raise RuntimeError(
|
||||
'ssh private key is invalid or does not exist: {}'.format(
|
||||
ssh_private_key))
|
||||
logger.debug('using ssh_private_key from: {}'.format(
|
||||
ssh_private_key))
|
||||
if (dest.data_transfer.method == 'scp' or
|
||||
dest.data_transfer.method == 'rsync+ssh'):
|
||||
# split/source include/exclude will force multinode
|
||||
|
@ -873,17 +932,17 @@ def ingress_data(
|
|||
source.include is not None or
|
||||
source.exclude is not None):
|
||||
_multinode_transfer(
|
||||
'multinode_' + dest.data_transfer.method, dest, source,
|
||||
dst, pool.ssh.username, ssh_private_key, rls, 1)
|
||||
'multinode_' + dest.data_transfer.method, dest,
|
||||
source, dst, username, ssh_private_key, rls, 1)
|
||||
else:
|
||||
_singlenode_transfer(
|
||||
dest, source.path, dst, pool.ssh.username,
|
||||
ssh_private_key, rls)
|
||||
dest, source.path, dst, username, ssh_private_key,
|
||||
rls)
|
||||
elif (dest.data_transfer.method == 'multinode_scp' or
|
||||
dest.data_transfer.method == 'multinode_rsync+ssh'):
|
||||
_multinode_transfer(
|
||||
dest.data_transfer.method, dest, source, dst,
|
||||
pool.ssh.username, ssh_private_key, rls,
|
||||
username, ssh_private_key, rls,
|
||||
dest.data_transfer.max_parallel_transfers_per_node)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
|
|
|
@ -579,7 +579,7 @@ def _create_storage_cluster_mount_args(
|
|||
try:
|
||||
volname = sc.file_server.server_options['glusterfs']['volume_name']
|
||||
except KeyError:
|
||||
volname = remotefs._GLUSTER_DEFAULT_VOLNAME
|
||||
volname = settings.get_gluster_default_volume_name()
|
||||
# construct mount string for fstab, srcpath is the gluster volume
|
||||
fstab_mount = (
|
||||
'{remoteip}:/{srcpath} $AZ_BATCH_NODE_SHARED_DIR/{scid} '
|
||||
|
@ -1858,7 +1858,7 @@ def action_pool_ssh(batch_client, config, cardinal, nodeid):
|
|||
raise ValueError('invalid cardinal option value')
|
||||
pool = settings.pool_settings(config)
|
||||
ssh_priv_key = pathlib.Path(
|
||||
pool.ssh.generated_file_export_path, crypto._SSH_KEY_PREFIX)
|
||||
pool.ssh.generated_file_export_path, crypto.get_ssh_key_prefix())
|
||||
if not ssh_priv_key.exists():
|
||||
raise RuntimeError('SSH private key file not found at: {}'.format(
|
||||
ssh_priv_key))
|
||||
|
@ -2135,33 +2135,50 @@ def action_data_getfilenode(batch_client, config, all, nodeid):
|
|||
batch.get_file_via_node(batch_client, config, nodeid)
|
||||
|
||||
|
||||
def action_data_ingress(batch_client, config):
|
||||
# type: (batchsc.BatchServiceClient, dict) -> None
|
||||
def action_data_ingress(
|
||||
batch_client, compute_client, network_client, config, to_fs):
|
||||
# type: (batchsc.BatchServiceClient,
|
||||
# azure.mgmt.compute.ComputeManagementClient,
|
||||
# azure.mgmt.network.NetworkManagementClient, dict, bool) -> None
|
||||
"""Action: Data Ingress
|
||||
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
|
||||
batch client
|
||||
:param azure.mgmt.compute.ComputeManagementClient compute_client:
|
||||
compute client
|
||||
:param azure.mgmt.network.NetworkManagementClient network_client:
|
||||
network client
|
||||
:param dict config: configuration dict
|
||||
:param bool to_fs: ingress to remote filesystem
|
||||
"""
|
||||
pool_cd = None
|
||||
try:
|
||||
# get pool current dedicated
|
||||
pool = batch_client.pool.get(settings.pool_id(config))
|
||||
pool_cd = pool.current_dedicated
|
||||
del pool
|
||||
# ensure there are remote login settings
|
||||
rls = batch.get_remote_login_settings(
|
||||
batch_client, config, nodes=None)
|
||||
# ensure nodes are at least idle/running for shared ingress
|
||||
kind = 'all'
|
||||
if not batch.check_pool_nodes_runnable(
|
||||
batch_client, config):
|
||||
kind = 'storage'
|
||||
except batchmodels.BatchErrorException as ex:
|
||||
if 'The specified pool does not exist' in ex.message.value:
|
||||
rls = None
|
||||
kind = 'storage'
|
||||
else:
|
||||
raise
|
||||
if not to_fs:
|
||||
try:
|
||||
# get pool current dedicated
|
||||
pool = batch_client.pool.get(settings.pool_id(config))
|
||||
pool_cd = pool.current_dedicated
|
||||
del pool
|
||||
# ensure there are remote login settings
|
||||
rls = batch.get_remote_login_settings(
|
||||
batch_client, config, nodes=None)
|
||||
# ensure nodes are at least idle/running for shared ingress
|
||||
kind = 'all'
|
||||
if not batch.check_pool_nodes_runnable(
|
||||
batch_client, config):
|
||||
kind = 'storage'
|
||||
except batchmodels.BatchErrorException as ex:
|
||||
if 'The specified pool does not exist' in ex.message.value:
|
||||
rls = None
|
||||
kind = 'storage'
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
rls = None
|
||||
kind = 'remotefs'
|
||||
if compute_client is None or network_client is None:
|
||||
raise RuntimeError(
|
||||
'required ARM clients are invalid, please provide management '
|
||||
'AAD credentials')
|
||||
storage_threads = data.ingress_data(
|
||||
batch_client, config, rls=rls, kind=kind, current_dedicated=pool_cd)
|
||||
batch_client, compute_client, network_client, config, rls=rls,
|
||||
kind=kind, current_dedicated=pool_cd)
|
||||
data.wait_for_storage_threads(storage_threads)
|
||||
|
|
|
@ -53,9 +53,6 @@ from . import util
|
|||
# create logger
|
||||
logger = logging.getLogger(__name__)
|
||||
util.setup_logger(logger)
|
||||
# global defines
|
||||
_SSH_KEY_PREFIX = 'id_rsa_shipyard_remotefs'
|
||||
_GLUSTER_DEFAULT_VOLNAME = 'gv0'
|
||||
|
||||
|
||||
def _create_managed_disk(compute_client, rfs, disk_name):
|
||||
|
@ -526,7 +523,7 @@ def _create_virtual_machine_extension(
|
|||
try:
|
||||
server_options.append(so[st]['volume_name'])
|
||||
except KeyError:
|
||||
server_options.append(_GLUSTER_DEFAULT_VOLNAME)
|
||||
server_options.append(settings.get_gluster_default_volume_name())
|
||||
try:
|
||||
server_options.append(so[st]['volume_type'])
|
||||
except KeyError:
|
||||
|
@ -755,7 +752,7 @@ def create_storage_cluster(
|
|||
if util.is_none_or_empty(rfs.storage_cluster.ssh.ssh_public_key):
|
||||
_, ssh_pub_key = crypto.generate_ssh_keypair(
|
||||
rfs.storage_cluster.ssh.generated_file_export_path,
|
||||
_SSH_KEY_PREFIX)
|
||||
crypto.get_remotefs_ssh_key_prefix())
|
||||
else:
|
||||
ssh_pub_key = rfs.storage_cluster.ssh.ssh_public_key
|
||||
with open(ssh_pub_key, 'rb') as fd:
|
||||
|
@ -1716,7 +1713,8 @@ def _get_ssh_info(
|
|||
network_client, rfs.storage_cluster.resource_group, vm)
|
||||
# connect to vm
|
||||
ssh_priv_key = pathlib.Path(
|
||||
rfs.storage_cluster.ssh.generated_file_export_path, _SSH_KEY_PREFIX)
|
||||
rfs.storage_cluster.ssh.generated_file_export_path,
|
||||
crypto.get_remotefs_ssh_key_prefix())
|
||||
if not ssh_priv_key.exists():
|
||||
raise RuntimeError('SSH private key file not found at: {}'.format(
|
||||
ssh_priv_key))
|
||||
|
|
|
@ -40,7 +40,8 @@ except ImportError:
|
|||
from . import util
|
||||
|
||||
# global defines
|
||||
_GLUSTER_VOLUME = '.gluster/gv0'
|
||||
_GLUSTER_DEFAULT_VOLNAME = 'gv0'
|
||||
_GLUSTER_ON_COMPUTE_VOLUME = '.gluster/{}'.format(_GLUSTER_DEFAULT_VOLNAME)
|
||||
_GPU_COMPUTE_INSTANCES = frozenset((
|
||||
'standard_nc6', 'standard_nc12', 'standard_nc24', 'standard_nc24r',
|
||||
))
|
||||
|
@ -248,13 +249,22 @@ def _kv_read(conf, key, default=None):
|
|||
return ret
|
||||
|
||||
|
||||
def get_gluster_volume():
|
||||
def get_gluster_default_volume_name():
|
||||
# type: (None) -> str
|
||||
"""Get gluster volume mount suffix
|
||||
"""Get gluster default volume name
|
||||
:rtype: str
|
||||
:return: gluster volume mount
|
||||
:return: gluster default volume name
|
||||
"""
|
||||
return _GLUSTER_VOLUME
|
||||
return _GLUSTER_DEFAULT_VOLNAME
|
||||
|
||||
|
||||
def get_gluster_on_compute_volume():
|
||||
# type: (None) -> str
|
||||
"""Get gluster on compute volume mount suffix
|
||||
:rtype: str
|
||||
:return: gluster on compute volume mount
|
||||
"""
|
||||
return _GLUSTER_ON_COMPUTE_VOLUME
|
||||
|
||||
|
||||
def can_tune_tcp(vm_size):
|
||||
|
@ -1306,7 +1316,7 @@ def files_destination_settings(fdict):
|
|||
try:
|
||||
ssh_private_key = pathlib.Path(
|
||||
conf['data_transfer']['ssh_private_key'])
|
||||
except KeyError:
|
||||
except (KeyError, TypeError):
|
||||
ssh_private_key = None
|
||||
try:
|
||||
container = conf['data_transfer']['container']
|
||||
|
@ -2101,7 +2111,7 @@ def task_settings(cloud_pool, config, poolconf, conf, missing_images):
|
|||
if is_shared_data_volume_gluster_on_compute(sdv, sdvkey):
|
||||
run_opts.append('-v {}/{}:{}'.format(
|
||||
'$AZ_BATCH_NODE_SHARED_DIR',
|
||||
get_gluster_volume(),
|
||||
get_gluster_on_compute_volume(),
|
||||
shared_data_volume_container_path(sdv, sdvkey)))
|
||||
else:
|
||||
run_opts.append('-v {}:{}'.format(
|
||||
|
|
|
@ -197,6 +197,8 @@ The `data` command has the following sub-commands:
|
|||
* `--filespec <nodeid>,<filename>` can be given to download one
|
||||
specific file from compute node
|
||||
* `ingress` will ingress data as specified in configuration files
|
||||
* `--to-fs` transfers data as specified in configuration files to
|
||||
a remote file system instead of Azure Storage
|
||||
* `listfiles` will list files for all tasks in jobs
|
||||
* `--jobid` force scope to just this job id
|
||||
* `--taskid` force scope to just this task id
|
||||
|
|
|
@ -1382,15 +1382,19 @@ def data_getfilenode(ctx, all, filespec):
|
|||
|
||||
|
||||
@data.command('ingress')
|
||||
@click.option(
|
||||
'--to-fs', is_flag=True, help='Ingress data to a remote filesystem')
|
||||
@common_options
|
||||
@batch_options
|
||||
@keyvault_options
|
||||
@aad_options
|
||||
@pass_cli_context
|
||||
def data_ingress(ctx):
|
||||
def data_ingress(ctx, to_fs):
|
||||
"""Ingress data into Azure"""
|
||||
ctx.initialize_for_batch()
|
||||
convoy.fleet.action_data_ingress(ctx.batch_client, ctx.config)
|
||||
convoy.fleet.action_data_ingress(
|
||||
ctx.batch_client, ctx.compute_client, ctx.network_client, ctx.config,
|
||||
to_fs)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Загрузка…
Ссылка в новой задаче