From 917ca544f1d755690f62df799b6ab0f17b31272a Mon Sep 17 00:00:00 2001 From: Yishi Wang Date: Thu, 29 Aug 2024 14:58:13 +0800 Subject: [PATCH] `azdev scan/mask`: Add `--include-pattern` and `--exclude-pattern` (#465) * add scan/mask command * code implementation * refine code and add tests * pylint * flake8 * address comments * Add --include-pattern and --exclude-pattern * tox * Apply suggestions from code review Co-authored-by: ZelinWang --------- Co-authored-by: ZelinWang --- HISTORY.rst | 4 + azdev/__init__.py | 2 +- azdev/help.py | 3 + azdev/operations/secret.py | 116 +++++++++++++------ azdev/operations/tests/test_scan_and_mask.py | 10 ++ azdev/params.py | 12 +- 6 files changed, 108 insertions(+), 39 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 5459caf..e94d1c2 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,10 @@ Release History =============== +0.1.75 +++++++ +* `azdev scan/mask`: Add `--include-pattern` and `--exclude-pattern` to support filtering files within directory + 0.1.74 ++++++ * `azdev scan/mask`: New commands for scanning and masking secrets for files or string diff --git a/azdev/__init__.py b/azdev/__init__.py index 898e4e3..f799185 100644 --- a/azdev/__init__.py +++ b/azdev/__init__.py @@ -4,4 +4,4 @@ # license information. # ----------------------------------------------------------------------------- -__VERSION__ = '0.1.74' +__VERSION__ = '0.1.75' diff --git a/azdev/help.py b/azdev/help.py index f755b03..44f37ff 100644 --- a/azdev/help.py +++ b/azdev/help.py @@ -193,6 +193,9 @@ helps['scan'] = """ - name: Recursively scan secrets for a directory and save results to specific file text: | azdev scan --directory-path /path/to/my/folder --recursive --scan-result-path /path/to/scan_result.json + - name: Scan secrets for all json files and yaml files within a directory + text: | + azdev scan --directory-path /path/to/my/folder --include-pattern *.yaml *.json """ helps['mask'] = """ diff --git a/azdev/operations/secret.py b/azdev/operations/secret.py index 77a85cc..03689d8 100644 --- a/azdev/operations/secret.py +++ b/azdev/operations/secret.py @@ -14,7 +14,7 @@ from microsoft_security_utilities_secret_masker import (load_regex_patterns_from logger = get_logger(__name__) -def _validate_data_path(file_path=None, directory_path=None, data=None): +def _validate_data_path(file_path=None, directory_path=None, include_pattern=None, exclude_pattern=None, data=None): if file_path and directory_path: raise ValueError('Can not specify file path and directory path at the same time') if file_path and data: @@ -28,6 +28,51 @@ def _validate_data_path(file_path=None, directory_path=None, data=None): raise ValueError(f'invalid directory path:{directory_path}') if file_path and not os.path.isfile(file_path): raise ValueError(f'invalid file path:{file_path}') + if not directory_path and include_pattern: + raise ValueError('--include-pattern need to be used together with --directory-path') + if not directory_path and exclude_pattern: + raise ValueError('--exclude-pattern need to be used together with --directory-path') + if include_pattern and exclude_pattern: + raise ValueError('--include-pattern and --exclude-pattern are mutually exclusive') + + +def _is_file_name_in_patterns(filename, patterns): + if not filename or not patterns: + return None + import fnmatch + for pattern in patterns: + if fnmatch.fnmatch(filename, pattern): + return True + return False + + +def _check_file_include_and_exclude_pattern(filename, include_pattern=None, exclude_pattern=None): + file_satisfied = True + if include_pattern and not _is_file_name_in_patterns(filename, include_pattern): + file_satisfied = False + if exclude_pattern and _is_file_name_in_patterns(filename, exclude_pattern): + file_satisfied = False + return file_satisfied + + +def _get_files_from_directory(directory_path, recursive=None, include_pattern=None, exclude_pattern=None): + target_files = [] + if recursive: + for root, _, files in os.walk(directory_path): + for file in files: + if _check_file_include_and_exclude_pattern(file, + include_pattern=include_pattern, + exclude_pattern=exclude_pattern): + target_files.append(os.path.join(root, file)) + else: + for file in os.listdir(directory_path): + if _check_file_include_and_exclude_pattern(file, + include_pattern=include_pattern, + exclude_pattern=exclude_pattern): + file = os.path.join(directory_path, file) + if os.path.isfile(file): + target_files.append(file) + return target_files def _load_built_in_regex_patterns(): @@ -88,21 +133,17 @@ def _scan_secrets_for_string(data, custom_pattern=None): return secrets -def scan_secrets(file_path=None, directory_path=None, recursive=False, data=None, +def scan_secrets(file_path=None, directory_path=None, recursive=False, + include_pattern=None, exclude_pattern=None, data=None, save_scan_result=None, scan_result_path=None, custom_pattern=None): - _validate_data_path(file_path=file_path, directory_path=directory_path, data=data) + _validate_data_path(file_path=file_path, directory_path=directory_path, + include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data) target_files = [] scan_results = {} if directory_path: directory_path = os.path.abspath(directory_path) - if recursive: - for root, _, files in os.walk(directory_path): - target_files.extend(os.path.join(root, file) for file in files) - else: - for file in os.listdir(directory_path): - file = os.path.join(directory_path, file) - if os.path.isfile(file): - target_files.append(file) + target_files = _get_files_from_directory(directory_path, recursive=recursive, + include_pattern=include_pattern, exclude_pattern=exclude_pattern) if file_path: file_path = os.path.abspath(file_path) target_files.append(file_path) @@ -114,7 +155,7 @@ def scan_secrets(file_path=None, directory_path=None, recursive=False, data=None elif target_files: for target_file in target_files: logger.debug('start scanning secrets for %s', target_file) - with open(target_file) as f: + with open(target_file, encoding='utf8') as f: data = f.read() if not data: continue @@ -140,41 +181,37 @@ def scan_secrets(file_path=None, directory_path=None, recursive=False, data=None file_folder = os.path.join(get_azdev_config_dir(), 'scan_results') if not os.path.exists(file_folder): os.mkdir(file_folder, 0o755) - file_name = file_path or directory_path or datetime.now().strftime('%Y%m%d%H%M%S') - result_file_name = 'scan_result_' + file_name.replace('.', '_') + '.json' + result_file_name = 'scan_result_' + datetime.now().strftime('%Y%m%d%H%M%S') + '.json' scan_result_path = os.path.join(file_folder, result_file_name) - with open(scan_result_path, 'w') as f: + with open(scan_result_path, 'w', encoding='utf8') as f: json.dump(scan_results, f) logger.debug('store scanning results in %s', scan_result_path) return {'secrets_detected': True, 'scan_result_path': os.path.abspath(scan_result_path)} def _get_scan_results_from_saved_file(saved_scan_result_path, - file_path=None, directory_path=None, recursive=False, data=None): + file_path=None, directory_path=None, recursive=False, + include_pattern=None, exclude_pattern=None, data=None): scan_results = {} if not os.path.isfile(saved_scan_result_path): raise ValueError(f'invalid saved scan result path:{saved_scan_result_path}') - with open(saved_scan_result_path) as f: + with open(saved_scan_result_path, encoding='utf8') as f: saved_scan_results = json.load(f) # filter saved scan results to keep those related with specified file(s) - _validate_data_path(file_path=file_path, directory_path=directory_path, data=data) + _validate_data_path(file_path=file_path, directory_path=directory_path, + include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data) if file_path: file_path = os.path.abspath(file_path) if file_path in saved_scan_results: scan_results[file_path] = saved_scan_results[file_path] elif directory_path: - if recursive: - for root, _, files in os.walk(directory_path): - for file in files: - file_full = os.path.join(root, file) - if file_full in saved_scan_results: - scan_results[file_full] = saved_scan_results[file_full] - else: - for file in os.listdir(directory_path): - file_full = os.path.join(directory_path, file) - if file_full in saved_scan_results: - scan_results[file_full] = saved_scan_results[file_full] + directory_path = os.path.abspath(directory_path) + target_files = _get_files_from_directory(directory_path, recursive=recursive, + include_pattern=include_pattern, exclude_pattern=exclude_pattern) + for target_file in target_files: + if target_file in saved_scan_results: + scan_results[target_file] = saved_scan_results[target_file] else: scan_results['raw_data'] = saved_scan_results['raw_data'] @@ -193,19 +230,26 @@ def _mask_secret_for_string(data, secret, redaction_type=None): return data -def mask_secrets(file_path=None, directory_path=None, recursive=False, data=None, +def mask_secrets(file_path=None, directory_path=None, recursive=False, + include_pattern=None, exclude_pattern=None, data=None, save_scan_result=None, scan_result_path=None, custom_pattern=None, saved_scan_result_path=None, redaction_type='FIXED_VALUE', yes=None): scan_results = {} if saved_scan_result_path: - scan_results = _get_scan_results_from_saved_file(saved_scan_result_path, file_path=file_path, - directory_path=directory_path, recursive=recursive, data=data) + scan_results = _get_scan_results_from_saved_file(saved_scan_result_path, + file_path=file_path, + directory_path=directory_path, + recursive=recursive, + include_pattern=include_pattern, + exclude_pattern=exclude_pattern, + data=data) else: - scan_response = scan_secrets(file_path=file_path, directory_path=directory_path, recursive=recursive, data=data, + scan_response = scan_secrets(file_path=file_path, directory_path=directory_path, recursive=recursive, + include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data, save_scan_result=save_scan_result, scan_result_path=scan_result_path, custom_pattern=custom_pattern) if save_scan_result and scan_response['scan_result_path']: - with open(scan_response['scan_result_path']) as f: + with open(scan_response['scan_result_path'], encoding='utf8') as f: scan_results = json.load(f) elif not save_scan_result: scan_results = scan_response['scan_results'] @@ -235,13 +279,13 @@ def mask_secrets(file_path=None, directory_path=None, recursive=False, data=None return mask_result for scan_file_path, secrets in scan_results.items(): - with open(scan_file_path, 'r') as f: + with open(scan_file_path, 'r', encoding='utf8') as f: content = f.read() if not content: continue for secret in secrets: content = _mask_secret_for_string(content, secret, redaction_type) - with open(scan_file_path, 'w') as f: + with open(scan_file_path, 'w', encoding='utf8') as f: f.write(content) mask_result['mask'] = True return mask_result diff --git a/azdev/operations/tests/test_scan_and_mask.py b/azdev/operations/tests/test_scan_and_mask.py index 18ffa38..4c08572 100644 --- a/azdev/operations/tests/test_scan_and_mask.py +++ b/azdev/operations/tests/test_scan_and_mask.py @@ -136,6 +136,16 @@ class TestScanAndMaskSecrets(unittest.TestCase): self.assertEqual(len(result['scan_results'][info_json_file]), 1) self.assertEqual(result['scan_results'][info_json_file][0]['secret_name'], 'EmailAddress') + result = scan_secrets(directory_path=file_folder, recursive=True, include_pattern=['*.json'], custom_pattern=json.dumps(custom_pattern)) + self.assertTrue(result['secrets_detected']) + self.assertNotIn(email_string_file, result['scan_results']) + self.assertIn(info_json_file, result['scan_results']) + + result = scan_secrets(directory_path=file_folder, recursive=True, exclude_pattern=['*.json'], custom_pattern=json.dumps(custom_pattern)) + self.assertTrue(result['secrets_detected']) + self.assertIn(email_string_file, result['scan_results']) + self.assertNotIn(info_json_file, result['scan_results']) + def test_mask(self): test_data = "This is a test string with email fooabc@gmail.com and sas sv=2022-11-02&sr=c&sig=a9Y5mpQgKUiiPzHFNdDm53Na6UndTrNMCsRZd6b2oV4%3D" result = mask_secrets(data=test_data, yes=True) diff --git a/azdev/params.py b/azdev/params.py index f9af3b6..44cd5ff 100644 --- a/azdev/params.py +++ b/azdev/params.py @@ -109,14 +109,22 @@ def load_arguments(self, _): help='Path of the folder you want to scan secrets for') c.argument('recursive', options_list=['--recursive', '-r'], help='Scan the directory recursively') + c.argument('include_pattern', options_list=['--include-pattern', '--include'], nargs='*', + help="Space separated patterns used for files you want to include within the directory. " + "The supported patterns are '*', '?', '[seq]', and '[!seq]'. " + "For more information, please refer to https://docs.python.org/3/library/fnmatch.html") + c.argument('exclude_pattern', options_list=['--exclude-pattern', '--exclude'], nargs='*', + help="Space separated patterns used for files you want to exclude within the directory. " + "The supported patterns are '*', '?', '[seq]', and '[!seq]'. " + "For more information, please refer to https://docs.python.org/3/library/fnmatch.html") c.argument('data', help='Raw string you want to scan secrets for') - c.argument('save_scan_result', options_list=['--save-scan-result', '--save'], type=bool, + c.argument('save_scan_result', options_list=['--save-scan-result', '--save'], action='store_true', help='Whether to save scan result to file or not') c.argument('scan_result_path', options_list=['--scan-result-path', '--result'], help='Path for the file you want to save the result in. ' 'If specified, --save-scan-result will be True anyway. ' 'If not speficied but set --save-scan-result to True, ' - 'the file will be saved as `scan_result_xxx.json` in your `.azdev` directory ') + 'the file will be saved as `scan_result_YYYYmmddHHMMSS.json` in your `.azdev` directory ') c.argument('custom_pattern', help='Additional patterns you want to apply or built-in patterns you want to exclude ' 'for scanning. Can be json string or path to the json file.')