From 1bc8e60fe2a3cd85bf40995db7953a98ef46daf0 Mon Sep 17 00:00:00 2001 From: Fred Park Date: Wed, 12 Oct 2016 09:23:07 -0700 Subject: [PATCH] Add include/exclude filter support for source path --- config_templates/config.json | 6 ++- convoy/data.py | 63 +++++++++++++++++++++---- docs/10-batch-shipyard-configuration.md | 25 ++++++++-- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/config_templates/config.json b/config_templates/config.json index a0a69b6..895c14f 100644 --- a/config_templates/config.json +++ b/config_templates/config.json @@ -33,7 +33,11 @@ ], "files": [ { - "source": "/some/local/path/dir", + "source": { + "path": "/some/local/path/dir", + "include": ["*.dat"], + "exclude": ["*.bak"] + }, "destination": { "shared_data_volume": "glustervol", "data_transfer": { diff --git a/convoy/data.py b/convoy/data.py index 1a92e34..5140858 100644 --- a/convoy/data.py +++ b/convoy/data.py @@ -25,6 +25,7 @@ # stdlib imports from __future__ import division, print_function, unicode_literals import datetime +import fnmatch import logging try: import pathlib @@ -45,10 +46,13 @@ convoy.util.setup_logger(logger) def _singlenode_transfer( - method, src, dst, username, ssh_private_key, rls, eo, reo): + method, src, src_incl, src_excl, dst, username, ssh_private_key, rls, + eo, reo): # type: (str, str, str, pathlib.Path, dict, str, str) -> None """Transfer data to a single node :param str src: source path + :param list src_incl: source include + :param list src_excl: source exclude :param str dst: destination path :param str username: username :param pathlib.Path: ssh private key @@ -56,6 +60,12 @@ def _singlenode_transfer( :param str eo: ssh extra options :param str reo: rsync extra options """ + # source include/exclude specified will force multinode transfer with mpt=1 + if src_incl is not None or src_excl is not None: + _multinode_transfer( + 'multinode_' + method, src, src_incl, src_excl, dst, username, + ssh_private_key, rls, eo, reo, 1) + return recursive = '-r' if pathlib.Path(src).is_dir() else '' _rls = next(iter(rls.values())) ip = _rls.remote_login_ip_address @@ -91,11 +101,14 @@ def _singlenode_transfer( def _multinode_transfer( - method, src, dst, username, ssh_private_key, rls, eo, reo, mpt): + method, src, src_incl, src_excl, dst, username, ssh_private_key, rls, + eo, reo, mpt): # type: (str, str, str, str, pathlib.Path, dict, str, str, int) -> None """Transfer data to multiple destination nodes simultaneously :param str method: transfer method :param str src: source path + :param list src_incl: source include + :param list src_excl: source exclude :param str dst: destination path :param str username: username :param pathlib.Path: ssh private key @@ -106,8 +119,14 @@ def _multinode_transfer( """ psrc = pathlib.Path(src) if not psrc.is_dir(): - _singlenode_transfer( - method, src, dst, username, ssh_private_key, rls, eo, reo) + if src_incl is None and src_excl is None: + _singlenode_transfer( + method.split('_')[-1], src, src_incl, src_excl, dst, username, + ssh_private_key, rls, eo, reo) + else: + logger.error( + 'cannot specify include and exclude filters for a source ' + 'path which is not a directory') return buckets = {} files = {} @@ -127,6 +146,18 @@ def _multinode_transfer( if sparent != '.': dirs.add(sparent) if entry.is_file(): + # check filters + if src_excl is not None: + inc = not any( + [fnmatch.fnmatch(entry.path, x) for x in src_excl]) + else: + inc = True + if src_incl is not None: + inc = any([fnmatch.fnmatch(entry.path, x) for x in src_incl]) + if not inc: + logger.debug('skipping file {} due to filters'.format( + entry.path)) + continue dstpath = '{}{}/{}'.format(dst, psrc.name, rel) # get key of min bucket values key = min(buckets, key=buckets.get) @@ -134,6 +165,9 @@ def _multinode_transfer( files[key].append((entry.path, dstpath)) total_files += 1 total_size = sum(buckets.values()) + if total_files == 0: + logger.error('no files to ingress') + return # create remote directories via ssh logger.debug('creating remote directories: {}'.format(dirs)) dirs = ['mkdir -p {}/{}'.format(psrc.name, x) for x in list(dirs)] @@ -268,7 +302,19 @@ def ingress_data(batch_client, config, rls=None): if rls is None: rls = convoy.batch.get_remote_login_settings(batch_client, config) for fdict in files: - src = fdict['source'] + src = fdict['source']['path'] + try: + src_incl = fdict['source']['include'] + if src_incl is not None and len(src_incl) == 0: + src_incl = None + except KeyError: + src_incl = None + try: + src_excl = fdict['source']['exclude'] + if src_excl is not None and len(src_excl) == 0: + src_excl = None + except KeyError: + src_excl = None try: shared = fdict['destination']['shared_data_volume'] except KeyError: @@ -348,11 +394,12 @@ def ingress_data(batch_client, config, rls=None): 'data ingress to {} not supported'.format(driver)) if method == 'scp' or method == 'rsync+ssh': _singlenode_transfer( - method, src, dst, username, ssh_private_key, rls, eo, reo) + method, src, src_incl, src_excl, dst, username, + ssh_private_key, rls, eo, reo) elif method == 'multinode_scp' or method == 'multinode_rsync+ssh': _multinode_transfer( - method, src, dst, username, ssh_private_key, rls, eo, - reo, mpt) + method, src, src_incl, src_excl, dst, username, + ssh_private_key, rls, eo, reo, mpt) else: raise RuntimeError( 'unknown transfer method: {}'.format(method)) diff --git a/docs/10-batch-shipyard-configuration.md b/docs/10-batch-shipyard-configuration.md index 62b7b83..adf8946 100644 --- a/docs/10-batch-shipyard-configuration.md +++ b/docs/10-batch-shipyard-configuration.md @@ -93,7 +93,11 @@ The global config schema is as follows: ], "files": [ { - "source": "/some/local/path/dir", + "source": { + "path": "/some/local/path/dir", + "include": ["*.dat"], + "exclude": ["*.bak"] + }, "destination": { "shared_data_volume": "glustervol", "data_transfer": { @@ -209,9 +213,22 @@ from a location accessible by the local machine (i.e., machine invoking in the pool). `files` is a json list of objects, which allows for multiple sources to destinations to be ingressed during the same invocation. Each object within the `files` list contains the following members: -* (required) `source` property is a local path. A single file or a directory -can be specified. No globbing/wildcards are currently supported. -* (required) `destination` property containing the following members: +* (required) `source` property contains the following members: + * (required) `path` is a local path. A single file or a directory + can be specified. Filters below cannot be specified for single file to + transfer. + * (optional) `include` is an array of + [Unix shell-style wildcard filters](https://docs.python.org/3.5/library/fnmatch.html) + where only files matching a filter are included in the data transfer. + Filters specified in `include` have precedence over `exclude` described + next. In this example, all files ending in `.dat` are ingressed. + * (optional) `exclude` is an array of + [Unix shell-style wildcard filters](https://docs.python.org/3.5/library/fnmatch.html) + where files matching a filter are excluded from the data transfer. Filters + specified in `include` have precedence over filters specified in + `exclude`. In this example, all files ending in `.bak` are skipped for + ingress. +* (required) `destination` property contains the following members: * (required) `shared_data_volume` is a GlusterFS volume name. Please see below in the `shared_data_volumes` for information on how to set up a GlusterFS share.