Add include/exclude filter support for source path
This commit is contained in:
Родитель
261984020e
Коммит
1bc8e60fe2
|
@ -33,7 +33,11 @@
|
|||
],
|
||||
"files": [
|
||||
{
|
||||
"source": "/some/local/path/dir",
|
||||
"source": {
|
||||
"path": "/some/local/path/dir",
|
||||
"include": ["*.dat"],
|
||||
"exclude": ["*.bak"]
|
||||
},
|
||||
"destination": {
|
||||
"shared_data_volume": "glustervol",
|
||||
"data_transfer": {
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
# stdlib imports
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
import datetime
|
||||
import fnmatch
|
||||
import logging
|
||||
try:
|
||||
import pathlib
|
||||
|
@ -45,10 +46,13 @@ convoy.util.setup_logger(logger)
|
|||
|
||||
|
||||
def _singlenode_transfer(
|
||||
method, src, dst, username, ssh_private_key, rls, eo, reo):
|
||||
method, src, src_incl, src_excl, dst, username, ssh_private_key, rls,
|
||||
eo, reo):
|
||||
# type: (str, str, str, pathlib.Path, dict, str, str) -> None
|
||||
"""Transfer data to a single node
|
||||
:param str src: source path
|
||||
:param list src_incl: source include
|
||||
:param list src_excl: source exclude
|
||||
:param str dst: destination path
|
||||
:param str username: username
|
||||
:param pathlib.Path: ssh private key
|
||||
|
@ -56,6 +60,12 @@ def _singlenode_transfer(
|
|||
:param str eo: ssh extra options
|
||||
:param str reo: rsync extra options
|
||||
"""
|
||||
# source include/exclude specified will force multinode transfer with mpt=1
|
||||
if src_incl is not None or src_excl is not None:
|
||||
_multinode_transfer(
|
||||
'multinode_' + method, src, src_incl, src_excl, dst, username,
|
||||
ssh_private_key, rls, eo, reo, 1)
|
||||
return
|
||||
recursive = '-r' if pathlib.Path(src).is_dir() else ''
|
||||
_rls = next(iter(rls.values()))
|
||||
ip = _rls.remote_login_ip_address
|
||||
|
@ -91,11 +101,14 @@ def _singlenode_transfer(
|
|||
|
||||
|
||||
def _multinode_transfer(
|
||||
method, src, dst, username, ssh_private_key, rls, eo, reo, mpt):
|
||||
method, src, src_incl, src_excl, dst, username, ssh_private_key, rls,
|
||||
eo, reo, mpt):
|
||||
# type: (str, str, str, str, pathlib.Path, dict, str, str, int) -> None
|
||||
"""Transfer data to multiple destination nodes simultaneously
|
||||
:param str method: transfer method
|
||||
:param str src: source path
|
||||
:param list src_incl: source include
|
||||
:param list src_excl: source exclude
|
||||
:param str dst: destination path
|
||||
:param str username: username
|
||||
:param pathlib.Path: ssh private key
|
||||
|
@ -106,8 +119,14 @@ def _multinode_transfer(
|
|||
"""
|
||||
psrc = pathlib.Path(src)
|
||||
if not psrc.is_dir():
|
||||
if src_incl is None and src_excl is None:
|
||||
_singlenode_transfer(
|
||||
method, src, dst, username, ssh_private_key, rls, eo, reo)
|
||||
method.split('_')[-1], src, src_incl, src_excl, dst, username,
|
||||
ssh_private_key, rls, eo, reo)
|
||||
else:
|
||||
logger.error(
|
||||
'cannot specify include and exclude filters for a source '
|
||||
'path which is not a directory')
|
||||
return
|
||||
buckets = {}
|
||||
files = {}
|
||||
|
@ -127,6 +146,18 @@ def _multinode_transfer(
|
|||
if sparent != '.':
|
||||
dirs.add(sparent)
|
||||
if entry.is_file():
|
||||
# check filters
|
||||
if src_excl is not None:
|
||||
inc = not any(
|
||||
[fnmatch.fnmatch(entry.path, x) for x in src_excl])
|
||||
else:
|
||||
inc = True
|
||||
if src_incl is not None:
|
||||
inc = any([fnmatch.fnmatch(entry.path, x) for x in src_incl])
|
||||
if not inc:
|
||||
logger.debug('skipping file {} due to filters'.format(
|
||||
entry.path))
|
||||
continue
|
||||
dstpath = '{}{}/{}'.format(dst, psrc.name, rel)
|
||||
# get key of min bucket values
|
||||
key = min(buckets, key=buckets.get)
|
||||
|
@ -134,6 +165,9 @@ def _multinode_transfer(
|
|||
files[key].append((entry.path, dstpath))
|
||||
total_files += 1
|
||||
total_size = sum(buckets.values())
|
||||
if total_files == 0:
|
||||
logger.error('no files to ingress')
|
||||
return
|
||||
# create remote directories via ssh
|
||||
logger.debug('creating remote directories: {}'.format(dirs))
|
||||
dirs = ['mkdir -p {}/{}'.format(psrc.name, x) for x in list(dirs)]
|
||||
|
@ -268,7 +302,19 @@ def ingress_data(batch_client, config, rls=None):
|
|||
if rls is None:
|
||||
rls = convoy.batch.get_remote_login_settings(batch_client, config)
|
||||
for fdict in files:
|
||||
src = fdict['source']
|
||||
src = fdict['source']['path']
|
||||
try:
|
||||
src_incl = fdict['source']['include']
|
||||
if src_incl is not None and len(src_incl) == 0:
|
||||
src_incl = None
|
||||
except KeyError:
|
||||
src_incl = None
|
||||
try:
|
||||
src_excl = fdict['source']['exclude']
|
||||
if src_excl is not None and len(src_excl) == 0:
|
||||
src_excl = None
|
||||
except KeyError:
|
||||
src_excl = None
|
||||
try:
|
||||
shared = fdict['destination']['shared_data_volume']
|
||||
except KeyError:
|
||||
|
@ -348,11 +394,12 @@ def ingress_data(batch_client, config, rls=None):
|
|||
'data ingress to {} not supported'.format(driver))
|
||||
if method == 'scp' or method == 'rsync+ssh':
|
||||
_singlenode_transfer(
|
||||
method, src, dst, username, ssh_private_key, rls, eo, reo)
|
||||
method, src, src_incl, src_excl, dst, username,
|
||||
ssh_private_key, rls, eo, reo)
|
||||
elif method == 'multinode_scp' or method == 'multinode_rsync+ssh':
|
||||
_multinode_transfer(
|
||||
method, src, dst, username, ssh_private_key, rls, eo,
|
||||
reo, mpt)
|
||||
method, src, src_incl, src_excl, dst, username,
|
||||
ssh_private_key, rls, eo, reo, mpt)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
'unknown transfer method: {}'.format(method))
|
||||
|
|
|
@ -93,7 +93,11 @@ The global config schema is as follows:
|
|||
],
|
||||
"files": [
|
||||
{
|
||||
"source": "/some/local/path/dir",
|
||||
"source": {
|
||||
"path": "/some/local/path/dir",
|
||||
"include": ["*.dat"],
|
||||
"exclude": ["*.bak"]
|
||||
},
|
||||
"destination": {
|
||||
"shared_data_volume": "glustervol",
|
||||
"data_transfer": {
|
||||
|
@ -209,9 +213,22 @@ from a location accessible by the local machine (i.e., machine invoking
|
|||
in the pool). `files` is a json list of objects, which allows for multiple
|
||||
sources to destinations to be ingressed during the same invocation. Each
|
||||
object within the `files` list contains the following members:
|
||||
* (required) `source` property is a local path. A single file or a directory
|
||||
can be specified. No globbing/wildcards are currently supported.
|
||||
* (required) `destination` property containing the following members:
|
||||
* (required) `source` property contains the following members:
|
||||
* (required) `path` is a local path. A single file or a directory
|
||||
can be specified. Filters below cannot be specified for single file to
|
||||
transfer.
|
||||
* (optional) `include` is an array of
|
||||
[Unix shell-style wildcard filters](https://docs.python.org/3.5/library/fnmatch.html)
|
||||
where only files matching a filter are included in the data transfer.
|
||||
Filters specified in `include` have precedence over `exclude` described
|
||||
next. In this example, all files ending in `.dat` are ingressed.
|
||||
* (optional) `exclude` is an array of
|
||||
[Unix shell-style wildcard filters](https://docs.python.org/3.5/library/fnmatch.html)
|
||||
where files matching a filter are excluded from the data transfer. Filters
|
||||
specified in `include` have precedence over filters specified in
|
||||
`exclude`. In this example, all files ending in `.bak` are skipped for
|
||||
ingress.
|
||||
* (required) `destination` property contains the following members:
|
||||
* (required) `shared_data_volume` is a GlusterFS volume name. Please see
|
||||
below in the `shared_data_volumes` for information on how to set up a
|
||||
GlusterFS share.
|
||||
|
|
Загрузка…
Ссылка в новой задаче