* add basic unit tests for new auth settings

* add new settings to restler settings and restler.py

* add support for module and location authentication to restler

* Remove OneOf from settings parsing

* Remove OneOf from unit tests

* Inclusion checks rather than exception handling for loading configuration

* Add authentication test files folder and sample token

* Move unit_test_server_auth.py to authentication_test_files

* Update checkers log with new auth module path

* Add e2e tests for token location auth

* Implementation for token location auth

* Add e2e test for module authentication

* PR feedback, remove extra space

* Add e2e tests for token refresh cmd

* Update description for authentication settings

* Update failure message in cmd unit test

* Update comment in test cmd auth

* Exception handling for new auth mechanisms in request_utilities

* Revert changes to client cert path and client key path for now

* Initial module logging implementation

* Remove old comment

* Add OneOf validation to auth, split auth validation to function

* Add basic string matching for auth validation exception unit tests

* Add function descriptions and use import_utilities

* Update description for validate_auth_tokens

* Update retry handler with copyright and docstrings

* Add space between operator for readability

* Whitespace formatting

* Update unit tests to use "token_refresh_cmd" instead of "cmd"

* Split inclusion checks over multiple lines

* Rename token_module_method to token_module_function, remove extra space

* Make data an optional parameter to token function

* Rename token_module_method to token_module_function

* Update method to function in settings file

* Add certificate to authentication settings

* Simplify token interval and cmd parsing

* Add documentation for new auth features

* Add enum for token auth methoeds

* Add InvalidTokenAuthMethodException

* Add back existing documentation for client certificates

* Set custom_value_generators to blank

* Add license to unit_test_server_auth_module.py

* Split token formatting across multiple lines

* Make data and log required arguments to token function

* Make data and log required arguments to token function

* Use run_abc_smoke_test in auth unit tests

* Add exception, enums to retry handler. Add unit tests

* Remove colon from comment in execute_token_refresh

* Split module unit tests into data and no data

* Update Authentication.md for readability

* Reduce nesting for auth documentation

* Update formatting for for clarity

* Remove unnecessary file path, update commas in json

* Formatting, add refernece to SettingsFile.md

* Use load_module in import_attrs to load module from absolute path

* Remove unnecessary load_module call

* Revert newline addition
This commit is contained in:
ajpasupu 2022-12-13 13:59:57 -08:00 коммит произвёл GitHub
Родитель ff10314ae8
Коммит 7fac290c32
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
32 изменённых файлов: 792 добавлений и 94 удалений

Просмотреть файл

@ -4,10 +4,40 @@ RESTler supports token-based and certificate based authentication.
**Token based authentication**
The user must provide a separate program to generate tokens, which implements the authentication method required by the API. This will be invoked in a separate process by RESTler to obtain and regularly refresh tokens. When invoked, this program must print metadata about the tokens on the first line, followed by each token and the required token header on a separate line for each application. For example:
The user has three options for providing token based authentication; Module, Location, and CMD. For details on the format of these options, please see SettingsFile.md.
**Module**
The user must provide the path to a python module (.py) that implements a function that returns a token, and the name of the function (default: `acquire_token`). RESTler will import the module and call the function to obtain tokens.
Additionally, a user can opt to add data (e.g. with additional authentication-related parameters specific to the service under test) to pass to this function. The function signature must be as follows:
```python
def acquire_token(data, log):
## Return token
```
Where
- `data` is a dictionary containing the json payload specified in the corresponding engine setting (see SettingsFile.md)
- `log` is a function that may be used to write logs to a network auth text file that will be saved in the RESTler results directory next to the network logs.
**Location**
The user must provide the full path to a text file containing a token. RESTler will read this text file to obtain tokens.
**Command**
The user must provide a separate program to generate tokens, which implements the authentication method required by the API. This will be invoked in a separate process by RESTler to obtain tokens.
`>my_gettoken.exe <args to my_gettoken>`
**Token Formatting**
All token-based authentication mechanisms require tokens to be specified as follows - metadata about the tokens on the first line, followed by each token and the required token header on a separate line for each application. For example:
```
{u'app1': {<any additional metadata you'd like to print. currently only used for troubleshooting. >}, u'app2':{}}
ApiTokenTag: 9A
@ -30,6 +60,11 @@ RESTler will obtain new tokens by invoking the token generation script with the
Note: in the above example, there are two different applications. This is only required by the 'namespace' checker (off by default). This checker is used to detect unauthorized access by one user/app to the data of another user/app. To have this checker work as intended to find bugs, you should specify two users that do not have access to each other's private resources (for example, two different accounts with private data to each).
**Token Refresh Interval**
All token-based authentication mechanisms require the user to provide "token_refresh_interval" an interval in seconds after which RESTler will attempt to refresh the token by executing the specified token authentication mechanism.
**Token values in logs**
RESTler has logic to prevent token values from being written to the network logs. It is recommended to check the RESTler network logs and make sure that the token values are, indeed, successfully omitted from the logs.

Просмотреть файл

@ -37,6 +37,72 @@ Path to your key file in a txt file.
If provided and valid, RESTler will attempt to use it during the SSL handshake.
### authentication: dict (default empty)
Settings for specifying authentication. See Authentication.md for details
#### _token_ dict (default empty): Can optionally provide one of {```location```, ```token_refresh_cmd```, ```module```}
__location__ str (Default None): File path to a text file containing a token
```json
"authentication": {
"token": {
"location": "/path/to/authentication_token.txt",
"token_refresh_interval": 300
}
}
```
__token_refresh_cmd__ str (Default None): The command to execute in order to refresh the authentication token
```json
"authentication": {
"token": {
"token_refresh_cmd": "python unit_test_server_auth.py",
"token_refresh_interval": 300
}
}
```
__module__ dict (Default None): Dictionary containing settings for RESTler to invoke user-specified module to refresh the authentication token
```json
"authentication": {
"token": {
"module": {
"file": "/path/to/unit_test_server_auth_module.py",
"function": "acquire_token_data",
"data": {
"client_id": "client_id"
}
},
"token_refresh_interval": 300
}
}
```
```file``` str (default None): File path to python file containing function that returns a token
```function``` str (default "acquire_token"): Name of function in file that returns a token. The function must accept two parameters "data", a Dictionary containing the json payload specified under data, and "log" a method that will write any logs to a network auth text file
```data``` dict (Default None): Optional data payload to provide to function. If data is included, RESTler will attempt to call function with data as an argument
__token_refresh_interval__ int (default None): Required parameter if using token authentication. The interval between periodic refreshes of the authentication token, in seconds
#### _certificate_ dict (Default empty): Can optionally provide certificate for SSL handshake
__client_certificate_path__ str (default None): Path to your X.509 certificate file in PEM format. If provided and valid, RESTler will attempt to use it during the SSL handshake
__client_certificate_key_path__ str (default None): Path to your key file in a txt file. If provided and valid, RESTler will attempt to use it during the SSL handshake
```json
"authentication": {
"certificate": {
"client_certificate_path": "/path/to/file.pem",
"client_certificate_key_path": "/path/to/file.key"
}
}
```
### custom_bug_codes: list(str)
List of status codes that will be flagged as bugs.

Просмотреть файл

@ -10,16 +10,21 @@ import ast
import uuid
import types
import threading
import os
from engine.errors import ResponseParsingException
from engine.errors import TransportLayerException
from restler_settings import Settings
from restler_settings import TokenAuthMethod
import engine.primitives as primitives
import engine.dependencies as dependencies
from engine.transport_layer.response import HttpResponse
from engine.transport_layer.response import RESTLER_BUG_CODES
from engine.transport_layer.messaging import UTF8
from engine.transport_layer.messaging import HttpSock
from engine.core.retry_handler import RetryHandler
from utils import import_utilities
last_refresh = 0
NO_TOKEN_SPECIFIED = 'NO-TOKEN-SPECIFIED\r\n'
@ -34,10 +39,14 @@ threadLocal = threading.local()
class EmptyTokenException(Exception):
pass
class InvalidTokenAuthMethodException(Exception):
pass
def get_latest_token_value():
global latest_token_value
return latest_token_value
def str_to_hex_def(val_str):
""" Creates a hex definition from a specified string
@ -50,6 +59,103 @@ def str_to_hex_def(val_str):
"""
return hashlib.sha1(val_str.encode(UTF8)).hexdigest()
def execute_token_refresh(token_dict):
""" Executes token refresh based on parameters in token_dict.
@param token_dict: Dictionary containing data required to fetch token
@type: token_dict: Dict
@return: None. Updates global latest_token_value and latest_shadow_token_value
@type: None
"""
global latest_token_value, latest_shadow_token_value
ERROR_VAL_STR = 'ERROR\r\n'
result = None
token_auth_method = token_dict["token_auth_method"]
retry_handler = RetryHandler()
while retry_handler.can_retry():
try:
if token_auth_method == TokenAuthMethod.LOCATION:
result = execute_location_token_refresh(
token_dict["token_location"])
elif token_auth_method == TokenAuthMethod.CMD:
result = execute_token_refresh_cmd(
token_dict["token_refresh_cmd"])
elif token_auth_method == TokenAuthMethod.MODULE:
result = execute_token_refresh_module(
token_dict["token_module_file"],
token_dict["token_module_function"],
token_dict["token_module_data"])
_, latest_token_value, latest_shadow_token_value = parse_authentication_tokens(
result)
break
except EmptyTokenException:
error_str = "Error: Authentication token was empty."
print(error_str)
_RAW_LOGGING(error_str)
sys.exit(-1)
except InvalidTokenAuthMethodException as exc:
error_str = f"Error: Invalid token authentication mechanism. \n Failed with {exc}"
print(error_str)
_RAW_LOGGING(error_str)
sys.exit(-1)
except Exception as error:
error_str = f"Authentication failed when refreshing token:\n\nUsing Token authentication method: \n{token_auth_method} \n with error {error}"
print(f'\n{error_str}')
latest_token_value = ERROR_VAL_STR
latest_shadow_token_value = ERROR_VAL_STR
_RAW_LOGGING(error_str)
retry_handler.wait_for_next_retry()
def execute_location_token_refresh(location):
""" Executes token refresh by attempting to read a token from a file path.
@param location: File path to a text file containing token
@type: location: string (filepath)
@return: token
@type: Str:
"""
try:
with open(location,"r") as f:
token_result = f.read()
return token_result
except FileNotFoundError:
error_str = f"Could not find token file at {location}. Please ensure that you've passed a valid path"
_RAW_LOGGING(error_str)
raise InvalidTokenAuthMethodException(error_str)
def execute_token_refresh_module(module_path, function, data):
""" Executes token refresh by attempting to execute a user provided auth module
@param: module_path: Path to auth module
@type: module_path: Str (filepath)
@param: function: function to call in auth module to retrieve a token
@type: function: Str
@param: data: Data to pass to authentication module
@type: data: Dict
@return: token
@type: string:
"""
module_name = os.path.basename(module_path)
try:
token_refresh_function = import_utilities.import_attr(module_path, function)
token_result = token_refresh_function(data, _AUTH_LOGGING)
return token_result
except FileNotFoundError:
error_str = f"Could not find token module file at {module_path}/{module_name}. Please ensure that you've passed a valid path"
_RAW_LOGGING(error_str)
raise InvalidTokenAuthMethodException(error_str)
except AttributeError:
error_str = f"Could not execute token refresh function {function} in module {module_path}. Please ensure that you've passed a valid function"
_RAW_LOGGING(error_str)
raise InvalidTokenAuthMethodException(error_str)
def execute_token_refresh_cmd(cmd):
""" Forks a subprocess to execute @param cmd to refresh token.
@ -62,43 +168,12 @@ def execute_token_refresh_cmd(cmd):
"""
global latest_token_value, latest_shadow_token_value
_RAW_LOGGING(f"Will refresh token: {cmd}")
MAX_RETRIES = 5
RETRY_SLEEP_TIME_SEC = 2
ERROR_VAL_STR = 'ERROR\r\n'
retry_count = 0
while retry_count < MAX_RETRIES:
try:
if sys.platform.startswith('win'):
cmd_result = subprocess.getoutput(str(cmd).split(' '))
else:
cmd_result = subprocess.getoutput([cmd])
_RAW_LOGGING(f"New value: {cmd_result}")
_, latest_token_value, latest_shadow_token_value = parse_authentication_tokens(cmd_result)
_RAW_LOGGING(f"Successfully obtained the latest token")
break
except subprocess.CalledProcessError:
error_str = f"Authentication failed when refreshing token:\n\nCommand that failed: \n{cmd}"
print(f'\n{error_str}')
latest_token_value = ERROR_VAL_STR
latest_shadow_token_value = ERROR_VAL_STR
_RAW_LOGGING(error_str)
retry_count = retry_count + 1
time.sleep(RETRY_SLEEP_TIME_SEC)
except EmptyTokenException:
error_str = "Error: Authentication token was empty."
print(error_str)
_RAW_LOGGING(error_str)
sys.exit(-1)
except Exception as error:
error_str = f"Exception refreshing token with cmd {cmd}. Error: {error}"
print(error_str)
_RAW_LOGGING(error_str)
sys.exit(-1)
if sys.platform.startswith('win'):
cmd_result = subprocess.getoutput(str(cmd).split(' '))
else:
_RAW_LOGGING(f"\nMaximum number of retries ({MAX_RETRIES}) exceeded. Exiting program.")
sys.exit(-1)
cmd_result = subprocess.getoutput([cmd])
return cmd_result
def parse_authentication_tokens(cmd_result):
""" Parses the output @param cmd_result from token scripts to refresh tokens.
@ -256,11 +331,10 @@ def resolve_dynamic_primitives(values, candidate_values_pool):
)
if not isinstance(token_dict, dict):
raise Exception("Refreshable token was not specified as a setting, but a request was expecting it.")
if token_dict:
if "token_auth_method" in token_dict and token_dict["token_auth_method"]:
token_refresh_interval = token_dict['token_refresh_interval']
token_refresh_cmd = token_dict['token_refresh_cmd']
if int(time.time()) - last_refresh > token_refresh_interval:
execute_token_refresh_cmd(token_refresh_cmd)
execute_token_refresh(token_dict)
last_refresh = int(time.time())
#print("-{}-\n-{}-".format(repr(latest_token_value),
# repr(latest_shadow_token_value)))
@ -420,3 +494,7 @@ def _RAW_LOGGING(log_str):
"""
from utils.logger import raw_network_logging as RAW_LOGGING
RAW_LOGGING(log_str)
def _AUTH_LOGGING(log_str):
from utils.logger import auth_logging as AUTH_LOGGING
AUTH_LOGGING(log_str)

Просмотреть файл

@ -0,0 +1,54 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import time
from enum import Enum
class RetryStrategy(Enum):
""" Enum of retry strategies """
LINEAR = 0
EXPONENTIAL = 1
class RetryLimitExceeded(Exception):
pass
class RetryHandler:
""" Utilities for handling retries """
def __init__(self, strategy=RetryStrategy.LINEAR, max_retries=5, delay=5, max_delay=60):
if isinstance(strategy, RetryStrategy):
self.strategy = strategy
else:
raise ValueError(f"Unknown retry strategy: {strategy}")
self.max_retries = max_retries
self.delay = delay
self.max_delay = max_delay
self.__num_retries = 0
def can_retry(self):
""" Determine if a retry should be executed
@return: Whether or not a retry should be executed
@rtype : Bool
"""
if self.__num_retries < self.max_retries:
return True
else:
return False
def wait_for_next_retry(self):
""" Sleep until next retry should be attempted
@return: None
@rtype : None
"""
if not self.can_retry():
raise RetryLimitExceeded("Retry limit exceeded")
if self.strategy == RetryStrategy.LINEAR:
time.sleep(self.delay)
elif self.strategy == RetryStrategy.EXPONENTIAL:
sleep = min(self.max_delay, self.delay * 2 ** self.__num_retries)
time.sleep(sleep)
else:
raise ValueError(f"Unknown retry strategy: {self.strategy}")
self.__num_retries += 1

Просмотреть файл

@ -36,6 +36,7 @@ from engine.errors import NoTokenSpecifiedException
from engine.primitives import InvalidDictPrimitiveException
from engine.primitives import UnsupportedPrimitiveException
from restler_settings import Settings
from restler_settings import TokenAuthMethod
MANAGER_HANDLE = None
@ -415,23 +416,30 @@ if __name__ == '__main__':
print_to_console=True)
sys.exit(-1)
if settings.token_refresh_cmd:
req_collection.candidate_values_pool.set_candidate_values(
{
'restler_refreshable_authentication_token':
{
'token_refresh_cmd': settings.token_refresh_cmd,
'token_refresh_interval': settings.token_refresh_interval
}
}
)
else:
req_collection.candidate_values_pool.set_candidate_values(
{
'restler_refreshable_authentication_token':
{
}
}
token_auth_method = settings.token_authentication_method
restler_refreshable_authentication_token = {
"token_auth_method": token_auth_method,
"token_refresh_interval": settings.token_refresh_interval,
}
if token_auth_method == TokenAuthMethod.CMD:
restler_refreshable_authentication_token.update({
"token_refresh_cmd": settings.token_refresh_cmd,
})
elif token_auth_method == TokenAuthMethod.MODULE:
restler_refreshable_authentication_token.update({
"token_module_file": settings.token_module_file,
"token_module_function": settings.token_module_function,
"token_module_data": settings.token_module_data,
})
elif token_auth_method == TokenAuthMethod.LOCATION:
restler_refreshable_authentication_token.update({
"token_location": settings.token_location,
})
req_collection.candidate_values_pool.set_candidate_values(
{
'restler_refreshable_authentication_token': restler_refreshable_authentication_token
}
)
# Initialize the fuzzing monitor

Просмотреть файл

@ -3,10 +3,17 @@
""" Holds user-defined settings data """
from __future__ import print_function
from enum import Enum
import json
import sys
import re
class TokenAuthMethod(Enum):
""" Enum of token auth methods """
LOCATION = 0
CMD = 1
MODULE = 2
class NewSingletonError(Exception):
pass
@ -485,6 +492,8 @@ class RestlerSettings(object):
self._token_refresh_cmd = SettingsArg('token_refresh_cmd', str, None, user_args)
## Interval to periodically refresh the authentication token (seconds)
self._token_refresh_interval = SettingsArg('token_refresh_interval', int, None, user_args)
## Set the authentication options
self._authentication_settings = SettingsArg('authentication', dict, {}, user_args)
## Restler's version
self._version = SettingsArg('set_version', str, DEFAULT_VERSION, user_args)
## If set, poll for async resource creation before continuing
@ -512,10 +521,16 @@ class RestlerSettings(object):
@property
def client_certificate_path(self):
if 'certificate' in self._authentication_settings.val:
if 'client_certificate_path' in self._authentication_settings.val['certificate']:
return self._authentication_settings.val['certificate']['client_certificate_path']
return self._client_certificate_path.val
@property
def client_certificate_key_path(self):
if 'certificate' in self._authentication_settings.val:
if 'client_certificate_key_path' in self._authentication_settings.val['certificate']:
return self._authentication_settings.val['certificate']['client_certificate_key_path']
return self._client_certificate_key_path.val
@property
@ -704,12 +719,52 @@ class RestlerSettings(object):
@property
def token_refresh_cmd(self):
if 'token' in self._authentication_settings.val:
if 'token_refresh_cmd' in self._authentication_settings.val['token']:
return self._authentication_settings.val['token']['token_refresh_cmd']
return self._token_refresh_cmd.val
@property
def token_refresh_interval(self):
if 'token' in self._authentication_settings.val:
if 'token_refresh_interval' in self._authentication_settings.val['token']:
return self._authentication_settings.val['token']['token_refresh_interval']
return self._token_refresh_interval.val
@property
def token_location(self):
if 'token' in self._authentication_settings.val:
if 'location' in self._authentication_settings.val['token']:
return self._authentication_settings.val['token']['location']
else:
return None
@property
def token_module_file(self):
if 'token' in self._authentication_settings.val:
if 'module' in self._authentication_settings.val['token']:
if 'file' in self._authentication_settings.val['token']['module']:
return self._authentication_settings.val['token']['module']['file']
return None
@property
def token_module_function(self):
if 'token' in self._authentication_settings.val:
if 'module' in self._authentication_settings.val['token']:
if 'function' in self._authentication_settings.val['token']['module']:
return self._authentication_settings.val['token']['module']['function']
else:
return 'acquire_token'
return None
@property
def token_module_data(self):
if 'token' in self._authentication_settings.val:
if 'module' in self._authentication_settings.val['token']:
if 'data' in self._authentication_settings.val['token']['module']:
return self._authentication_settings.val['token']['module']['data']
return None
@property
def version(self):
return self._version.val
@ -747,6 +802,16 @@ class RestlerSettings(object):
else:
return include_req() and not exclude_req()
@property
def token_authentication_method(self):
if self.token_module_file:
return TokenAuthMethod.MODULE
elif self.token_refresh_cmd:
return TokenAuthMethod.CMD
elif self.token_location:
return TokenAuthMethod.LOCATION
else:
return None
def get_cached_prefix_request_settings(self, endpoint, method):
def get_settings():
if 'create_prefix_once' in self._seq_rendering_settings.val:
@ -906,15 +971,28 @@ class RestlerSettings(object):
Raises OptionValidationError if any validation fails.
"""
if self.fuzzing_mode == 'random-walk' and self.max_sequence_length != 100:
raise OptionValidationError("Should not provide maximum sequence length"
" for random walk method")
if self.token_refresh_interval and not self.token_refresh_cmd:
raise OptionValidationError("Must specify command to refresh token")
if self.token_refresh_cmd and not self.token_refresh_interval:
raise OptionValidationError("Must specify refresh period in seconds")
if self.request_throttle_ms and self.fuzzing_jobs != 1:
raise OptionValidationError("Request throttling not available for multiple fuzzing jobs")
if self.custom_bug_codes and self.custom_non_bug_codes:
raise OptionValidationError("Both custom_bug_codes and custom_non_bug_codes lists were specified. "
"Specifying both lists is not allowed.")
def validate_auth_options():
if self.token_refresh_interval and not self.token_authentication_method:
raise OptionValidationError("Must specify token refresh method")
if self.token_authentication_method and not self.token_refresh_interval:
raise OptionValidationError("Must specify refresh period in seconds")
if self.token_authentication_method == 'module' and not self.token_module_file:
raise OptionValidationError("Must specify token module file")
token_auth_options = [self.token_module_file, self.token_refresh_cmd, self.token_location]
user_provided_token_auth_options = [option for option in token_auth_options if option is not None]
if len(user_provided_token_auth_options) > 1:
raise OptionValidationError(f"Must specify only one token authentication mechanism - received {user_provided_token_auth_options}")
validate_auth_options()

Просмотреть файл

@ -180,6 +180,22 @@ class FuzzingLogParser(LogParser):
return True
def validate_auth_tokens(self, tokens):
""" Validate that every token request header is in the set of valid tokens
@param other: Set of valid tokens
@type other: Set
@return: True if all tokens in the request sequence are in the set of valid tokens
@rtype : Bool
"""
for seq in self._seq_list:
for request in seq.requests:
if not request.authorization_token in tokens:
return False
return True
def _parse(self, max_seq):
""" Parses the fuzzing log to populate the seq list

Просмотреть файл

@ -7,7 +7,7 @@ from test_servers.parsed_requests import *
import traceback
VALID_UNIT_TEST_TOKEN = 'valid_unit_test_token'
VALID_UNIT_TEST_TOKENS = {'valid_unit_test_token', 'valid_location_unit_test_token', 'valid_module_unit_test_token'}
class UnitTestServer(TestServerBase):
PRINT_DEBUG = False
@ -42,7 +42,7 @@ class UnitTestServer(TestServerBase):
"""
if NAMESPACE_RULE_RESOURCE not in dyn_objects:
if auth_token is not None and auth_token == VALID_UNIT_TEST_TOKEN:
if auth_token is not None and auth_token in VALID_UNIT_TEST_TOKENS:
return True
return False
return True

Просмотреть файл

@ -0,0 +1,3 @@
{'user1':{}, 'user2':{}}
Authorization: valid_location_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -0,0 +1,8 @@
{
"authentication": {
"token": {
"token_refresh_cmd": "python unit_test_server_auth.py",
"token_refresh_interval": 600
}
}
}

Просмотреть файл

@ -0,0 +1,8 @@
{
"authentication": {
"token": {
"location": "authentication_token.txt",
"token_refresh_interval": 600
}
}
}

Просмотреть файл

@ -0,0 +1,14 @@
{
"authentication": {
"token": {
"module": {
"file": "unit_test_server_auth_module.py",
"function": "acquire_token_data",
"data": {
"client_id": "client_id"
}
},
"token_refresh_interval": 600
}
}
}

Просмотреть файл

@ -0,0 +1,13 @@
{
"authentication": {
"token": {
"module": {
"file": "unit_test_server_auth_module.py",
"function": "acquire_token_no_data",
"data": {
}
},
"token_refresh_interval": 600
}
}
}

Просмотреть файл

@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
print("{'user1':{}, 'user2':{}}")
print("Authorization: valid_unit_test_token")
print("Authorization: shadow_unit_test_token")

Просмотреть файл

@ -0,0 +1,20 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
def acquire_token_no_data(data, log):
log("Returning a valid token")
token_lines = [
"{'user1':{}, 'user2':{}}",
"Authorization: valid_module_unit_test_token",
"Authorization: shadow_unit_test_token"
]
return "\n".join(token_lines)
def acquire_token_data(data, log):
log(data)
token_lines = [
"{'user1':{}, 'user2':{}}",
"Authorization: valid_module_unit_test_token",
"Authorization: shadow_unit_test_token"
]
return "\n".join(token_lines)

Просмотреть файл

@ -1,4 +1,4 @@
2021-07-01 17:55:24.466: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2021-07-01 17:55:24.466: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2021-07-01 17:55:24.554: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -1,4 +1,4 @@
2021-06-11 14:09:11.972: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2021-06-11 14:09:11.972: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2021-06-11 14:09:12.038: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -1,4 +1,4 @@
2021-06-01 18:17:04.893: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2021-06-01 18:17:04.893: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2021-06-01 18:17:04.988: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -1,4 +1,4 @@
2022-02-15 00:06:07.651: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2022-02-15 00:06:07.651: Will refresh token: python D:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2022-02-15 00:06:07.715: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -1,4 +1,4 @@
2020-12-16 15:03:33.401: Will refresh token: python C:\RESTlerRepo\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2020-12-16 15:03:33.401: Will refresh token: python C:\RESTlerRepo\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2020-12-16 15:03:33.471: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token
@ -1238,7 +1238,7 @@ Generation-2: Rendering Sequence-1
2020-12-16 15:03:44.111: NameSpaceRuleChecker
Re-rendering start of original sequence
2020-12-16 15:03:44.122: Re-rendering start of original sequence
2020-12-16 15:03:44.131: Will refresh token: python C:\RESTlerRepo\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2020-12-16 15:03:44.131: Will refresh token: python C:\RESTlerRepo\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2020-12-16 15:03:44.200: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token
@ -2371,7 +2371,7 @@ Generation-3: Rendering Sequence-1
+ restler_refreshable_authentication_token: ['token_refresh_cmd', 'token_refresh_interval']
- restler_static_string: '\r\n'
2020-12-16 15:03:55.230: Will refresh token: python C:\RESTlerRepo\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2020-12-16 15:03:55.230: Will refresh token: python C:\RESTlerRepo\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2020-12-16 15:03:55.310: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -1,4 +1,4 @@
2022-08-16 16:52:04.408: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2022-08-16 16:52:04.408: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2022-08-16 16:52:04.496: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -3054,7 +3054,7 @@
2022-08-16 16:46:55.998: Received: "HTTP/1.1 404 Not Found\r\nRestler Test\r\n\r\n{'Resource': ResourceDoesNotExist()}"
2022-08-16 16:46:56.001: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2022-08-16 16:46:56.001: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2022-08-16 16:46:56.094: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -1,4 +1,4 @@
2022-08-16 16:46:34.370: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2022-08-16 16:46:34.370: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2022-08-16 16:46:34.440: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token
@ -7095,7 +7095,7 @@ Generation-2: Rendering Sequence-2
+ restler_refreshable_authentication_token: [token_refresh_cmd, token_refresh_interval, ...]
- restler_static_string: '\r\n'
2022-08-16 16:46:45.087: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2022-08-16 16:46:45.087: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2022-08-16 16:46:45.159: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -1,4 +1,4 @@
2020-12-24 16:29:56.111: Will refresh token: python restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2020-12-24 16:29:56.111: Will refresh token: python restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2020-12-24 16:29:56.229: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -1 +1 @@
{"custom_value_generators": "D:\\git\\restler-fuzzer\\restler\\unit_tests\\log_baseline_test_files\\custom_value_gen.py", "max_combinations": 6, "test_combinations_settings": {"max_schema_combinations": 1}}
{"custom_value_generators": "", "max_combinations": 6, "test_combinations_settings": {"max_schema_combinations": 1}}

Просмотреть файл

@ -1,4 +1,4 @@
2022-10-03 18:12:52.831: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\log_baseline_test_files\unit_test_server_auth.py
2022-10-03 18:12:52.831: Will refresh token: python d:\git\restler-fuzzer\restler\unit_tests\authentication_test_files\unit_test_server_auth.py
2022-10-03 18:12:52.931: New value: {'user1':{}, 'user2':{}}
Authorization: valid_unit_test_token
Authorization: shadow_unit_test_token

Просмотреть файл

@ -18,23 +18,37 @@ import subprocess
import json
import utils.logger as logger
from collections import namedtuple
from pathlib import Path
from test_servers.log_parser import *
Test_File_Directory = os.path.join(
os.path.dirname(__file__), 'log_baseline_test_files'
)
Authentication_Test_File_Directory = os.path.join(
os.path.dirname(__file__), 'authentication_test_files'
)
Restler_Path = os.path.join(os.path.dirname(__file__), '..', 'restler.py')
Common_Settings = [
Common_Settings_No_Auth = [
"python", "-B", Restler_Path, "--use_test_socket",
'--custom_mutations', f'{os.path.join(Test_File_Directory, "test_dict.json")}',
"--garbage_collection_interval", "30", "--host", "unittest",
"--token_refresh_cmd", f'python {os.path.join(Test_File_Directory, "unit_test_server_auth.py")}',
"--token_refresh_interval", "10"
]
Common_Settings = Common_Settings_No_Auth + [
"--token_refresh_cmd", f'python {os.path.join(Authentication_Test_File_Directory, "unit_test_server_auth.py")}',
"--token_refresh_interval", "10"
]
## TODO: Share constants with unit_test_server?
LOCATION_AUTHORIZATION_TOKEN = 'valid_location_unit_test_token'
MODULE_AUTHORIZATION_TOKEN = 'valid_module_unit_test_token'
CMD_AUTHORIZATION_TOKEN = 'valid_unit_test_token'
class FunctionalityTests(unittest.TestCase):
def get_experiments_dir(self):
""" Returns the most recent experiments directory that contains the restler logs
@ -73,18 +87,21 @@ class FunctionalityTests(unittest.TestCase):
self.fail(f"Restler returned non-zero exit code: {result.returncode} {result.stdout}")
def run_abc_smoke_test(self, test_file_dir, grammar_file_name, fuzzing_mode, settings_file=None, dictionary_file_name=None,
failure_expected=False):
failure_expected=False, common_settings=Common_Settings):
grammar_file_path = os.path.join(test_file_dir, grammar_file_name)
if dictionary_file_name is None:
dictionary_file_name = "abc_dict.json"
dict_file_path = os.path.join(test_file_dir, dictionary_file_name)
args = Common_Settings + [
args = common_settings + [
'--fuzzing_mode', f"{fuzzing_mode}",
'--restler_grammar', f'{grammar_file_path}',
'--custom_mutations', f'{dict_file_path}'
]
if settings_file:
settings_file_path = os.path.join(test_file_dir, settings_file)
if Path(settings_file).exists():
settings_file_path = settings_file
else:
settings_file_path = os.path.join(test_file_dir, settings_file)
args = args + ['--settings', f'{settings_file_path}']
self.run_restler_engine(args, failure_expected=failure_expected)
@ -95,6 +112,180 @@ class FunctionalityTests(unittest.TestCase):
print(f"tearDown function failed: {err!s}.\n"
"Experiments directory was not deleted.")
def test_location_auth_test(self):
""" This test is equivalent to test_abc_minimal_smoke_test except we use the token location authentication mechanism
and validate that RESTler uses the LOCATION_AUTHORIZATION_TOKEN
"""
settings_file_path = os.path.join(Authentication_Test_File_Directory, "token_location_authentication_settings.json")
## Create a new, temporary settings file with reference to full path to token location
new_settings_file_path = os.path.join(Authentication_Test_File_Directory, "tmp_token_location_authentication_settings.json")
try:
with open(settings_file_path, 'r') as file:
settings = json.loads(file.read())
settings["authentication"]["token"]["location"] = os.path.join(Authentication_Test_File_Directory, settings["authentication"]["token"]["location"])
json_settings = json.dumps(settings)
with open(new_settings_file_path, "w") as outfile:
outfile.write(json_settings)
self.run_abc_smoke_test(Test_File_Directory, "abc_test_grammar.py", "directed-smoke-test", settings_file=new_settings_file_path, common_settings=Common_Settings_No_Auth)
finally:
## Clean up temporary settings file
if os.path.exists(new_settings_file_path):
os.remove(new_settings_file_path)
experiments_dir = self.get_experiments_dir()
## Make sure all requests were successfully rendered. This is because the comparisons below do not
## take status codes into account
## Make sure the right number of requests was sent.
testing_summary_file_path = os.path.join(experiments_dir, "logs", "testing_summary.json")
try:
with open(testing_summary_file_path, 'r') as file:
testing_summary = json.loads(file.read())
total_requests_sent = testing_summary["total_requests_sent"]["main_driver"]
num_fully_valid = testing_summary["num_fully_valid"]
self.assertEqual(num_fully_valid, 5)
self.assertLessEqual(total_requests_sent, 14)
test_parser = FuzzingLogParser(self.get_network_log_path(experiments_dir, logger.LOG_TYPE_TESTING))
## Validate that LOCATION_AUTHORIZATION_TOKEN is used in request headers
self.assertTrue(test_parser.validate_auth_tokens(LOCATION_AUTHORIZATION_TOKEN))
except TestFailedException:
self.fail("Smoke test with token location auth failed")
def test_module_no_data_auth(self):
""" This test is equivalent to test_abc_minimal_smoke_test except we use the token module authentication mechanism
and validate that RESTler uses the MODULE_AUTHORIZATION_TOKEN
"""
settings_file_path = os.path.join(Authentication_Test_File_Directory, "token_module_authentication_settings.json")
## Create a new, temporary settings file with reference to full path to token location
new_settings_file_path = os.path.join(Authentication_Test_File_Directory, "tmp_token_module_authentication_settings.json")
try:
with open(settings_file_path, 'r') as file:
settings = json.loads(file.read())
settings["authentication"]["token"]["module"]["file"] = os.path.join(Authentication_Test_File_Directory, settings["authentication"]["token"]["module"]["file"])
json_settings = json.dumps(settings)
with open(new_settings_file_path, "w") as outfile:
outfile.write(json_settings)
self.run_abc_smoke_test(Test_File_Directory, "abc_test_grammar.py", "directed-smoke-test", settings_file=new_settings_file_path, common_settings=Common_Settings_No_Auth)
finally:
## Clean up temporary settings file
if os.path.exists(new_settings_file_path):
os.remove(new_settings_file_path)
experiments_dir = self.get_experiments_dir()
## Make sure all requests were successfully rendered. This is because the comparisons below do not
## take status codes into account
## Make sure the right number of requests was sent.
testing_summary_file_path = os.path.join(experiments_dir, "logs", "testing_summary.json")
try:
with open(testing_summary_file_path, 'r') as file:
testing_summary = json.loads(file.read())
total_requests_sent = testing_summary["total_requests_sent"]["main_driver"]
num_fully_valid = testing_summary["num_fully_valid"]
self.assertEqual(num_fully_valid, 5)
self.assertLessEqual(total_requests_sent, 14)
test_parser = FuzzingLogParser(self.get_network_log_path(experiments_dir, logger.LOG_TYPE_TESTING))
## Validate that MODULE_AUTHORIZATION_TOKEN is used in request headers
self.assertTrue(test_parser.validate_auth_tokens(MODULE_AUTHORIZATION_TOKEN))
except TestFailedException:
self.fail("Smoke test with token module auth failed")
def test_module_with_data_auth(self):
""" This test is equivalent to test_abc_minimal_smoke_test except we use the token module authentication mechanism
and validate that RESTler uses the MODULE_AUTHORIZATION_TOKEN
"""
settings_file_path = os.path.join(Authentication_Test_File_Directory, "token_module_authentication_data_settings.json")
## Create a new, temporary settings file with reference to full path to token location
new_settings_file_path = os.path.join(Authentication_Test_File_Directory, "tmp_token_module_authentication_data_settings.json")
try:
with open(settings_file_path, 'r') as file:
settings = json.loads(file.read())
settings["authentication"]["token"]["module"]["file"] = os.path.join(Authentication_Test_File_Directory, settings["authentication"]["token"]["module"]["file"])
data = str(settings["authentication"]["token"]["module"]["data"])
json_settings = json.dumps(settings)
with open(new_settings_file_path, "w") as outfile:
outfile.write(json_settings)
self.run_abc_smoke_test(Test_File_Directory, "abc_test_grammar.py", "directed-smoke-test", settings_file=new_settings_file_path, common_settings=Common_Settings_No_Auth)
finally:
## Clean up temporary settings file
if os.path.exists(new_settings_file_path):
os.remove(new_settings_file_path)
experiments_dir = self.get_experiments_dir()
## Make sure all requests were successfully rendered. This is because the comparisons below do not
## take status codes into account
## Make sure the right number of requests was sent.
testing_summary_file_path = os.path.join(experiments_dir, "logs", "testing_summary.json")
try:
with open(testing_summary_file_path, 'r') as file:
testing_summary = json.loads(file.read())
total_requests_sent = testing_summary["total_requests_sent"]["main_driver"]
num_fully_valid = testing_summary["num_fully_valid"]
self.assertEqual(num_fully_valid, 5)
self.assertLessEqual(total_requests_sent, 14)
test_parser = FuzzingLogParser(self.get_network_log_path(experiments_dir, logger.LOG_TYPE_TESTING))
## Validate that MODULE_AUTHORIZATION_TOKEN is used in request headers
self.assertTrue(test_parser.validate_auth_tokens(MODULE_AUTHORIZATION_TOKEN))
## Validate that data is logged in auth log
with open(self.get_network_log_path(experiments_dir, logger.LOG_TYPE_AUTH), "r") as auth_log:
self.assertTrue(data in auth_log.read())
except TestFailedException:
self.fail("Smoke test with token module auth failed")
def test_cmd_auth(self):
""" This test is equivalent to test_abc_minimal_smoke_test except we use the token cmd authentication mechanism
and validate that RESTler uses the CMD_AUTHORIZATION_TOKEN
"""
settings_file_path = os.path.join(Authentication_Test_File_Directory, "token_cmd_authentication_settings.json")
## Create a new, temporary settings file with reference to full path to token location
new_settings_file_path = os.path.join(Authentication_Test_File_Directory, "tmp_token_cmd_authentication_settings.json")
try:
with open(settings_file_path, 'r') as file:
settings = json.loads(file.read())
settings["authentication"]["token"]["token_refresh_cmd"] = f'python {os.path.join(Authentication_Test_File_Directory, "unit_test_server_auth.py")}'
json_settings = json.dumps(settings)
with open(new_settings_file_path, "w") as outfile:
outfile.write(json_settings)
self.run_abc_smoke_test(Test_File_Directory, "abc_test_grammar.py", "directed-smoke-test", settings_file=new_settings_file_path, common_settings=Common_Settings_No_Auth)
finally:
## Clean up temporary settings file
if os.path.exists(new_settings_file_path):
os.remove(new_settings_file_path)
experiments_dir = self.get_experiments_dir()
## Make sure all requests were successfully rendered. This is because the comparisons below do not
## take status codes into account
## Make sure the right number of requests was sent.
testing_summary_file_path = os.path.join(experiments_dir, "logs", "testing_summary.json")
try:
with open(testing_summary_file_path, 'r') as file:
testing_summary = json.loads(file.read())
total_requests_sent = testing_summary["total_requests_sent"]["main_driver"]
num_fully_valid = testing_summary["num_fully_valid"]
self.assertEqual(num_fully_valid, 5)
self.assertLessEqual(total_requests_sent, 14)
test_parser = FuzzingLogParser(self.get_network_log_path(experiments_dir, logger.LOG_TYPE_TESTING))
## Validate that CMD_AUTHORIZATION_TOKEN is used in request headers
self.assertTrue(test_parser.validate_auth_tokens(CMD_AUTHORIZATION_TOKEN))
except TestFailedException:
self.fail("Smoke test with token cmd auth failed")
def test_abc_invalid_b_smoke_test(self):
self.run_abc_smoke_test(Test_File_Directory, "abc_test_grammar_invalid_b.py", "directed-smoke-test", settings_file="test_one_schema_settings.json")
experiments_dir = self.get_experiments_dir()

Просмотреть файл

@ -316,15 +316,63 @@ class RestlerSettingsTest(unittest.TestCase):
'target_ip': '192.168.0.1',
'token_refresh_cmd': 'some command'}
settings = RestlerSettings(user_args)
with self.assertRaisesRegexp(OptionValidationError, "Must specify refresh period"):
settings.validate_options()
def test_refresh_module_no_interval(self):
user_args = {'target_port': 500,
'target_ip': '192.168.0.1',
'authentication': {
'token':
{
'module': {
'file': 'some_module.py',
'data': {}
}
}
}}
settings = RestlerSettings(user_args)
with self.assertRaises(OptionValidationError):
settings.validate_options()
def test_refresh_interval_no_cmd(self):
def test_multiple_auth_options(self):
user_args = {'target_port': 500,
'target_ip': '192.168.0.1',
'authentication': {
'token':
{
'module': {
'file': 'some_module.py',
'data': {}
},
'token_refresh_cmd': 'some command',
'location': "//some_location",
'token_refresh_interval': 10
},
}}
settings = RestlerSettings(user_args)
with self.assertRaisesRegexp(OptionValidationError, "Must specify only one token authentication mechanism"):
settings.validate_options()
def test_refresh_location_no_interval(self):
user_args = {'target_port': 500,
'target_ip': '192.168.0.1',
'authentication': {
'token':
{
'location': "//some_location"
}
}}
settings = RestlerSettings(user_args)
with self.assertRaisesRegexp(OptionValidationError, "Must specify refresh period"):
settings.validate_options()
def test_refresh_interval_no_method(self):
user_args = {'target_port': 500,
'target_ip': '192.168.0.1',
'token_refresh_interval': 30}
settings = RestlerSettings(user_args)
with self.assertRaises(OptionValidationError):
with self.assertRaisesRegexp(OptionValidationError, "Must specify token refresh method"):
settings.validate_options()
def test_throttling_multiple_fuzzing_jobs(self):

Просмотреть файл

@ -0,0 +1,46 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import unittest
import time
from engine.core.retry_handler import RetryHandler, RetryStrategy, RetryLimitExceeded
class RetryHandlerTest(unittest.TestCase):
def test_invalid_strategy(self):
""" Test that an invalid strategy raises an exception """
with self.assertRaises(ValueError):
RetryHandler(strategy="invalid")
def test_linear_retries(self):
""" Test that linear retries work as expected
Should take about 10 seconds (5 retries * 2 second delay)
RetryLimitExceeded should be raised on the 6th retry"""
retry_handler = RetryHandler(RetryStrategy.LINEAR, max_retries=5, delay=2)
start = time.time()
for i in range(5):
self.assertTrue(retry_handler.can_retry())
retry_handler.wait_for_next_retry()
self.assertFalse(retry_handler.can_retry())
with self.assertRaises(RetryLimitExceeded):
retry_handler.wait_for_next_retry()
end = time.time()
## Allow for a 1 second delta - this should take about 9 seconds
self.assertAlmostEqual(end - start, 10, delta=1)
def test_exponential_retries(self):
""" Test that exponential retries work as expected
Should take about 34 seconds (2 + 4 + 8 + 10 + 10)
RetryLimitExceeded should be raised on the 6th retry"""
retry_handler = RetryHandler(RetryStrategy.EXPONENTIAL, max_retries=5, delay=2, max_delay=10)
start = time.time()
for i in range(5):
self.assertTrue(retry_handler.can_retry())
retry_handler.wait_for_next_retry()
self.assertFalse(retry_handler.can_retry())
with self.assertRaises(RetryLimitExceeded):
retry_handler.wait_for_next_retry()
end = time.time()
## Allow for a 1 second delta - this should take about 34 seconds
self.assertAlmostEqual(end - start, 34, delta=1)

Просмотреть файл

@ -18,6 +18,7 @@ def load_module(name, module_file_path):
spec = importlib.util.spec_from_file_location(name, module_file_path)
module_to_load = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module_to_load)
return module_to_load
def import_attrs(module_file_path, attr_names):
file_name = os.path.basename(module_file_path)
@ -25,7 +26,7 @@ def import_attrs(module_file_path, attr_names):
# Import the object
sys.path.append(os.path.dirname(module_file_path))
imported_module = importlib.import_module(module_name)
imported_module = load_module(module_name, module_file_path)
imported_attrs = []
for attr_name in attr_names:
imported_attr = getattr(imported_module, attr_name, None)

Просмотреть файл

@ -65,6 +65,9 @@ LOG_TYPE_TESTING = 'testing'
LOG_TYPE_GC = 'gc'
LOG_TYPE_PREPROCESSING = 'preprocessing'
LOG_TYPE_REPLAY = 'replay'
LOG_TYPE_AUTH = 'auth'
Network_Auth_Log = None
class NetworkLog(object):
""" Implements logic for creating, chunking, and writing to network logs """
@ -442,6 +445,13 @@ def raw_network_logging(data):
network_log = Network_Logs[thread_id]
network_log.write(f"{formatting.timestamp()}: {data}")
def auth_logging(data):
global Network_Auth_Log
if Network_Auth_Log is None:
thread_id = threading.current_thread().ident
Network_Auth_Log = NetworkLog(LOG_TYPE_AUTH, thread_id)
Network_Auth_Log.write(f"{formatting.timestamp()}: {data}")
def custom_network_logging(sequence, candidate_values_pool, **kwargs):
""" Helper to log (in a more civilized manner) the template of the request
which will be subsequently rendered with the respective feasible value