output_data to storage blob support for native
This commit is contained in:
Родитель
0a9689f5a8
Коммит
cccc5fa3b8
|
@ -3510,6 +3510,7 @@ def add_jobs(
|
|||
' ' + task.command) if task.command else '')
|
||||
]
|
||||
taskenv = None
|
||||
output_files = None
|
||||
# get docker login if missing images
|
||||
if not native and len(missing_images) > 0 and allow_run_on_missing:
|
||||
taskenv, logincmd = generate_docker_login_settings(config)
|
||||
|
@ -3525,14 +3526,12 @@ def add_jobs(
|
|||
'native container pools')
|
||||
task_commands.insert(0, addlcmds)
|
||||
# digest any output data
|
||||
addlcmds = data.process_output_data(
|
||||
config, bxfile, _task)
|
||||
addlcmds = data.process_output_data(config, bxfile, _task)
|
||||
if addlcmds is not None:
|
||||
if native:
|
||||
raise RuntimeError(
|
||||
'output_data at task-level is not supported on '
|
||||
'native container pools')
|
||||
task_commands.append(addlcmds)
|
||||
output_files = addlcmds
|
||||
else:
|
||||
task_commands.append(addlcmds)
|
||||
del addlcmds
|
||||
# set environment variables for native
|
||||
if native:
|
||||
|
@ -3558,12 +3557,6 @@ def add_jobs(
|
|||
)
|
||||
del gpu_env
|
||||
del env_vars
|
||||
# set task constraints
|
||||
task_constraints = batchmodels.TaskConstraints(
|
||||
retention_time=task.retention_time,
|
||||
max_task_retry_count=task.max_task_retries,
|
||||
max_wall_clock_time=task.max_wall_time,
|
||||
)
|
||||
# create task
|
||||
batchtask = batchmodels.TaskAddParameter(
|
||||
id=task.id,
|
||||
|
@ -3571,8 +3564,13 @@ def add_jobs(
|
|||
user_identity=_RUN_ELEVATED,
|
||||
resource_files=[],
|
||||
multi_instance_settings=mis,
|
||||
constraints=task_constraints,
|
||||
constraints=batchmodels.TaskConstraints(
|
||||
retention_time=task.retention_time,
|
||||
max_task_retry_count=task.max_task_retries,
|
||||
max_wall_clock_time=task.max_wall_time,
|
||||
),
|
||||
environment_settings=taskenv,
|
||||
output_files=output_files,
|
||||
)
|
||||
if native:
|
||||
batchtask.container_settings = \
|
||||
|
|
|
@ -220,18 +220,19 @@ def process_input_data(config, bxfile, spec, on_task=False):
|
|||
return None
|
||||
|
||||
|
||||
def _process_storage_output_data(config, output_data):
|
||||
# type: (dict, dict, bool) -> str
|
||||
def _process_storage_output_data(config, native, output_data):
|
||||
# type: (dict, bool, dict) -> str
|
||||
"""Process output data to egress to Azure storage
|
||||
:param dict config: configuration dict
|
||||
:param bool native: is native container pool
|
||||
:param dict output_data: config spec with output_data
|
||||
:rtype: list
|
||||
:return: args to pass to blobxfer script
|
||||
:return: OutputFiles or args to pass to blobxfer script
|
||||
"""
|
||||
# get gluster host/container paths
|
||||
# get gluster host/container paths and encryption settings
|
||||
gluster_host, gluster_container = _get_gluster_paths(config)
|
||||
# parse storage output data blocks
|
||||
encrypt = settings.batch_shipyard_encryption_enabled(config)
|
||||
# parse storage output data blocks
|
||||
args = []
|
||||
for xfer in output_data:
|
||||
storage_settings = settings.credentials_storage(
|
||||
|
@ -245,8 +246,14 @@ def _process_storage_output_data(config, output_data):
|
|||
'cannot specify both container and file_share at the '
|
||||
'same time')
|
||||
eo = settings.data_blobxfer_extra_options(xfer)
|
||||
if native and util.is_not_empty(eo):
|
||||
raise ValueError(
|
||||
'native Docker pool does not support blobxfer_extra_options')
|
||||
# configure for file share
|
||||
if fshare is not None:
|
||||
if native:
|
||||
raise ValueError(
|
||||
'native Docker pool does not support fileshares')
|
||||
if '--fileshare' not in eo:
|
||||
eo = '--fileshare {}'.format(eo)
|
||||
# create saskey for file share with rwdl perm
|
||||
|
@ -265,16 +272,41 @@ def _process_storage_output_data(config, output_data):
|
|||
if (util.is_not_empty(gluster_container) and
|
||||
src.startswith(gluster_container)):
|
||||
src = src.replace(gluster_container, gluster_host, 1)
|
||||
# construct argument
|
||||
# kind:encrypted:<sa:ep:saskey:container>:include:eo:src
|
||||
creds = crypto.encrypt_string(
|
||||
encrypt,
|
||||
'{}:{}:{}:{}'.format(
|
||||
storage_settings.account, storage_settings.endpoint,
|
||||
saskey, container),
|
||||
config)
|
||||
args.append('"{}:e:{}:{}:{}:{}:{}"'.format(
|
||||
_BLOBXFER_VERSION, encrypt, creds, include, eo, src))
|
||||
if native:
|
||||
if util.is_none_or_empty(include):
|
||||
include = '**/*'
|
||||
if not src.endswith('/'):
|
||||
fp = '/'.join((src, include))
|
||||
else:
|
||||
fp = ''.join((src, include))
|
||||
of = batchmodels.OutputFile(
|
||||
file_pattern=fp,
|
||||
destination=batchmodels.OutputFileDestination(
|
||||
container=batchmodels.OutputFileBlobContainerDestination(
|
||||
path='',
|
||||
container_url='{}?{}'.format(
|
||||
storage.generate_blob_container_uri(
|
||||
storage_settings, container),
|
||||
saskey)
|
||||
)
|
||||
),
|
||||
upload_options=batchmodels.OutputFileUploadOptions(
|
||||
upload_condition=batchmodels.
|
||||
OutputFileUploadCondition.task_success
|
||||
),
|
||||
)
|
||||
args.append(of)
|
||||
else:
|
||||
# construct argument
|
||||
# kind:encrypted:<sa:ep:saskey:container>:include:eo:src
|
||||
creds = crypto.encrypt_string(
|
||||
encrypt,
|
||||
'{}:{}:{}:{}'.format(
|
||||
storage_settings.account, storage_settings.endpoint,
|
||||
saskey, container),
|
||||
config)
|
||||
args.append('"{}:e:{}:{}:{}:{}:{}"'.format(
|
||||
_BLOBXFER_VERSION, encrypt, creds, include, eo, src))
|
||||
return args
|
||||
|
||||
|
||||
|
@ -284,24 +316,31 @@ def process_output_data(config, bxfile, spec):
|
|||
:param dict config: configuration dict
|
||||
:param tuple bxfile: blobxfer script
|
||||
:param dict spec: config spec with input_data
|
||||
:rtype: str
|
||||
:return: additonal command
|
||||
:rtype: str or list
|
||||
:return: additonal commands or list of OutputFiles
|
||||
"""
|
||||
native = settings.is_native_docker_pool(config)
|
||||
ret = []
|
||||
output_data = settings.output_data(spec)
|
||||
if util.is_not_empty(output_data):
|
||||
for key in output_data:
|
||||
if key == 'azure_storage':
|
||||
args = _process_storage_output_data(
|
||||
config, output_data[key])
|
||||
ret.append(
|
||||
('set -f; $AZ_BATCH_NODE_STARTUP_DIR/wd/{} {}; '
|
||||
'set +f').format(bxfile[0], ' '.join(args)))
|
||||
config, native, output_data[key])
|
||||
if native:
|
||||
ret.extend(args)
|
||||
else:
|
||||
ret.append(
|
||||
('set -f; $AZ_BATCH_NODE_STARTUP_DIR/wd/{} {}; '
|
||||
'set +f').format(bxfile[0], ' '.join(args)))
|
||||
else:
|
||||
raise ValueError(
|
||||
'unknown output_data method: {}'.format(key))
|
||||
if len(ret) > 0:
|
||||
return ';'.join(ret)
|
||||
if native:
|
||||
return ret
|
||||
else:
|
||||
return ';'.join(ret)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
|
|
@ -152,6 +152,22 @@ def get_storageaccount_endpoint():
|
|||
return _STORAGEACCOUNTEP
|
||||
|
||||
|
||||
def generate_blob_container_uri(storage_settings, container):
|
||||
# type: (StorageCredentialsSettings, str) -> str
|
||||
"""Create a uri to a blob container
|
||||
:param StorageCredentialsSettings storage_settings: storage settings
|
||||
:param str container: container
|
||||
:rtype: str
|
||||
:return: blob container uri
|
||||
"""
|
||||
blob_client = azureblob.BlockBlobService(
|
||||
account_name=storage_settings.account,
|
||||
account_key=storage_settings.account_key,
|
||||
endpoint_suffix=storage_settings.endpoint)
|
||||
return '{}://{}/{}'.format(
|
||||
blob_client.protocol, blob_client.primary_endpoint, container)
|
||||
|
||||
|
||||
def create_blob_container_saskey(
|
||||
storage_settings, container, kind, create_container=False):
|
||||
# type: (StorageCredentialsSettings, str, str, bool) -> str
|
||||
|
|
Загрузка…
Ссылка в новой задаче