`azdev scan/mask`: Add `--include-pattern` and `--exclude-pattern` (#465)

* add scan/mask command

* code implementation

* refine code and add tests

* pylint

* flake8

* address comments

* Add --include-pattern and --exclude-pattern

* tox

* Apply suggestions from code review

Co-authored-by: ZelinWang <zelinwang@microsoft.com>

---------

Co-authored-by: ZelinWang <zelinwang@microsoft.com>
This commit is contained in:
Yishi Wang 2024-08-29 14:58:13 +08:00 коммит произвёл GitHub
Родитель 8738442fd2
Коммит 917ca544f1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
6 изменённых файлов: 108 добавлений и 39 удалений

Просмотреть файл

@ -2,6 +2,10 @@
Release History
===============
0.1.75
++++++
* `azdev scan/mask`: Add `--include-pattern` and `--exclude-pattern` to support filtering files within directory
0.1.74
++++++
* `azdev scan/mask`: New commands for scanning and masking secrets for files or string

Просмотреть файл

@ -4,4 +4,4 @@
# license information.
# -----------------------------------------------------------------------------
__VERSION__ = '0.1.74'
__VERSION__ = '0.1.75'

Просмотреть файл

@ -193,6 +193,9 @@ helps['scan'] = """
- name: Recursively scan secrets for a directory and save results to specific file
text: |
azdev scan --directory-path /path/to/my/folder --recursive --scan-result-path /path/to/scan_result.json
- name: Scan secrets for all json files and yaml files within a directory
text: |
azdev scan --directory-path /path/to/my/folder --include-pattern *.yaml *.json
"""
helps['mask'] = """

Просмотреть файл

@ -14,7 +14,7 @@ from microsoft_security_utilities_secret_masker import (load_regex_patterns_from
logger = get_logger(__name__)
def _validate_data_path(file_path=None, directory_path=None, data=None):
def _validate_data_path(file_path=None, directory_path=None, include_pattern=None, exclude_pattern=None, data=None):
if file_path and directory_path:
raise ValueError('Can not specify file path and directory path at the same time')
if file_path and data:
@ -28,6 +28,51 @@ def _validate_data_path(file_path=None, directory_path=None, data=None):
raise ValueError(f'invalid directory path:{directory_path}')
if file_path and not os.path.isfile(file_path):
raise ValueError(f'invalid file path:{file_path}')
if not directory_path and include_pattern:
raise ValueError('--include-pattern need to be used together with --directory-path')
if not directory_path and exclude_pattern:
raise ValueError('--exclude-pattern need to be used together with --directory-path')
if include_pattern and exclude_pattern:
raise ValueError('--include-pattern and --exclude-pattern are mutually exclusive')
def _is_file_name_in_patterns(filename, patterns):
if not filename or not patterns:
return None
import fnmatch
for pattern in patterns:
if fnmatch.fnmatch(filename, pattern):
return True
return False
def _check_file_include_and_exclude_pattern(filename, include_pattern=None, exclude_pattern=None):
file_satisfied = True
if include_pattern and not _is_file_name_in_patterns(filename, include_pattern):
file_satisfied = False
if exclude_pattern and _is_file_name_in_patterns(filename, exclude_pattern):
file_satisfied = False
return file_satisfied
def _get_files_from_directory(directory_path, recursive=None, include_pattern=None, exclude_pattern=None):
target_files = []
if recursive:
for root, _, files in os.walk(directory_path):
for file in files:
if _check_file_include_and_exclude_pattern(file,
include_pattern=include_pattern,
exclude_pattern=exclude_pattern):
target_files.append(os.path.join(root, file))
else:
for file in os.listdir(directory_path):
if _check_file_include_and_exclude_pattern(file,
include_pattern=include_pattern,
exclude_pattern=exclude_pattern):
file = os.path.join(directory_path, file)
if os.path.isfile(file):
target_files.append(file)
return target_files
def _load_built_in_regex_patterns():
@ -88,21 +133,17 @@ def _scan_secrets_for_string(data, custom_pattern=None):
return secrets
def scan_secrets(file_path=None, directory_path=None, recursive=False, data=None,
def scan_secrets(file_path=None, directory_path=None, recursive=False,
include_pattern=None, exclude_pattern=None, data=None,
save_scan_result=None, scan_result_path=None, custom_pattern=None):
_validate_data_path(file_path=file_path, directory_path=directory_path, data=data)
_validate_data_path(file_path=file_path, directory_path=directory_path,
include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data)
target_files = []
scan_results = {}
if directory_path:
directory_path = os.path.abspath(directory_path)
if recursive:
for root, _, files in os.walk(directory_path):
target_files.extend(os.path.join(root, file) for file in files)
else:
for file in os.listdir(directory_path):
file = os.path.join(directory_path, file)
if os.path.isfile(file):
target_files.append(file)
target_files = _get_files_from_directory(directory_path, recursive=recursive,
include_pattern=include_pattern, exclude_pattern=exclude_pattern)
if file_path:
file_path = os.path.abspath(file_path)
target_files.append(file_path)
@ -114,7 +155,7 @@ def scan_secrets(file_path=None, directory_path=None, recursive=False, data=None
elif target_files:
for target_file in target_files:
logger.debug('start scanning secrets for %s', target_file)
with open(target_file) as f:
with open(target_file, encoding='utf8') as f:
data = f.read()
if not data:
continue
@ -140,41 +181,37 @@ def scan_secrets(file_path=None, directory_path=None, recursive=False, data=None
file_folder = os.path.join(get_azdev_config_dir(), 'scan_results')
if not os.path.exists(file_folder):
os.mkdir(file_folder, 0o755)
file_name = file_path or directory_path or datetime.now().strftime('%Y%m%d%H%M%S')
result_file_name = 'scan_result_' + file_name.replace('.', '_') + '.json'
result_file_name = 'scan_result_' + datetime.now().strftime('%Y%m%d%H%M%S') + '.json'
scan_result_path = os.path.join(file_folder, result_file_name)
with open(scan_result_path, 'w') as f:
with open(scan_result_path, 'w', encoding='utf8') as f:
json.dump(scan_results, f)
logger.debug('store scanning results in %s', scan_result_path)
return {'secrets_detected': True, 'scan_result_path': os.path.abspath(scan_result_path)}
def _get_scan_results_from_saved_file(saved_scan_result_path,
file_path=None, directory_path=None, recursive=False, data=None):
file_path=None, directory_path=None, recursive=False,
include_pattern=None, exclude_pattern=None, data=None):
scan_results = {}
if not os.path.isfile(saved_scan_result_path):
raise ValueError(f'invalid saved scan result path:{saved_scan_result_path}')
with open(saved_scan_result_path) as f:
with open(saved_scan_result_path, encoding='utf8') as f:
saved_scan_results = json.load(f)
# filter saved scan results to keep those related with specified file(s)
_validate_data_path(file_path=file_path, directory_path=directory_path, data=data)
_validate_data_path(file_path=file_path, directory_path=directory_path,
include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data)
if file_path:
file_path = os.path.abspath(file_path)
if file_path in saved_scan_results:
scan_results[file_path] = saved_scan_results[file_path]
elif directory_path:
if recursive:
for root, _, files in os.walk(directory_path):
for file in files:
file_full = os.path.join(root, file)
if file_full in saved_scan_results:
scan_results[file_full] = saved_scan_results[file_full]
else:
for file in os.listdir(directory_path):
file_full = os.path.join(directory_path, file)
if file_full in saved_scan_results:
scan_results[file_full] = saved_scan_results[file_full]
directory_path = os.path.abspath(directory_path)
target_files = _get_files_from_directory(directory_path, recursive=recursive,
include_pattern=include_pattern, exclude_pattern=exclude_pattern)
for target_file in target_files:
if target_file in saved_scan_results:
scan_results[target_file] = saved_scan_results[target_file]
else:
scan_results['raw_data'] = saved_scan_results['raw_data']
@ -193,19 +230,26 @@ def _mask_secret_for_string(data, secret, redaction_type=None):
return data
def mask_secrets(file_path=None, directory_path=None, recursive=False, data=None,
def mask_secrets(file_path=None, directory_path=None, recursive=False,
include_pattern=None, exclude_pattern=None, data=None,
save_scan_result=None, scan_result_path=None, custom_pattern=None,
saved_scan_result_path=None, redaction_type='FIXED_VALUE', yes=None):
scan_results = {}
if saved_scan_result_path:
scan_results = _get_scan_results_from_saved_file(saved_scan_result_path, file_path=file_path,
directory_path=directory_path, recursive=recursive, data=data)
scan_results = _get_scan_results_from_saved_file(saved_scan_result_path,
file_path=file_path,
directory_path=directory_path,
recursive=recursive,
include_pattern=include_pattern,
exclude_pattern=exclude_pattern,
data=data)
else:
scan_response = scan_secrets(file_path=file_path, directory_path=directory_path, recursive=recursive, data=data,
scan_response = scan_secrets(file_path=file_path, directory_path=directory_path, recursive=recursive,
include_pattern=include_pattern, exclude_pattern=exclude_pattern, data=data,
save_scan_result=save_scan_result, scan_result_path=scan_result_path,
custom_pattern=custom_pattern)
if save_scan_result and scan_response['scan_result_path']:
with open(scan_response['scan_result_path']) as f:
with open(scan_response['scan_result_path'], encoding='utf8') as f:
scan_results = json.load(f)
elif not save_scan_result:
scan_results = scan_response['scan_results']
@ -235,13 +279,13 @@ def mask_secrets(file_path=None, directory_path=None, recursive=False, data=None
return mask_result
for scan_file_path, secrets in scan_results.items():
with open(scan_file_path, 'r') as f:
with open(scan_file_path, 'r', encoding='utf8') as f:
content = f.read()
if not content:
continue
for secret in secrets:
content = _mask_secret_for_string(content, secret, redaction_type)
with open(scan_file_path, 'w') as f:
with open(scan_file_path, 'w', encoding='utf8') as f:
f.write(content)
mask_result['mask'] = True
return mask_result

Просмотреть файл

@ -136,6 +136,16 @@ class TestScanAndMaskSecrets(unittest.TestCase):
self.assertEqual(len(result['scan_results'][info_json_file]), 1)
self.assertEqual(result['scan_results'][info_json_file][0]['secret_name'], 'EmailAddress')
result = scan_secrets(directory_path=file_folder, recursive=True, include_pattern=['*.json'], custom_pattern=json.dumps(custom_pattern))
self.assertTrue(result['secrets_detected'])
self.assertNotIn(email_string_file, result['scan_results'])
self.assertIn(info_json_file, result['scan_results'])
result = scan_secrets(directory_path=file_folder, recursive=True, exclude_pattern=['*.json'], custom_pattern=json.dumps(custom_pattern))
self.assertTrue(result['secrets_detected'])
self.assertIn(email_string_file, result['scan_results'])
self.assertNotIn(info_json_file, result['scan_results'])
def test_mask(self):
test_data = "This is a test string with email fooabc@gmail.com and sas sv=2022-11-02&sr=c&sig=a9Y5mpQgKUiiPzHFNdDm53Na6UndTrNMCsRZd6b2oV4%3D"
result = mask_secrets(data=test_data, yes=True)

Просмотреть файл

@ -109,14 +109,22 @@ def load_arguments(self, _):
help='Path of the folder you want to scan secrets for')
c.argument('recursive', options_list=['--recursive', '-r'],
help='Scan the directory recursively')
c.argument('include_pattern', options_list=['--include-pattern', '--include'], nargs='*',
help="Space separated patterns used for files you want to include within the directory. "
"The supported patterns are '*', '?', '[seq]', and '[!seq]'. "
"For more information, please refer to https://docs.python.org/3/library/fnmatch.html")
c.argument('exclude_pattern', options_list=['--exclude-pattern', '--exclude'], nargs='*',
help="Space separated patterns used for files you want to exclude within the directory. "
"The supported patterns are '*', '?', '[seq]', and '[!seq]'. "
"For more information, please refer to https://docs.python.org/3/library/fnmatch.html")
c.argument('data', help='Raw string you want to scan secrets for')
c.argument('save_scan_result', options_list=['--save-scan-result', '--save'], type=bool,
c.argument('save_scan_result', options_list=['--save-scan-result', '--save'], action='store_true',
help='Whether to save scan result to file or not')
c.argument('scan_result_path', options_list=['--scan-result-path', '--result'],
help='Path for the file you want to save the result in. '
'If specified, --save-scan-result will be True anyway. '
'If not speficied but set --save-scan-result to True, '
'the file will be saved as `scan_result_xxx.json` in your `.azdev` directory ')
'the file will be saved as `scan_result_YYYYmmddHHMMSS.json` in your `.azdev` directory ')
c.argument('custom_pattern',
help='Additional patterns you want to apply or built-in patterns you want to exclude '
'for scanning. Can be json string or path to the json file.')