This commit is contained in:
Rasmus Selsmark 2018-05-27 19:36:40 +03:00
Коммит 951bf1fccc
38 изменённых файлов: 4585 добавлений и 0 удалений

2
.dockerignore Executable file
Просмотреть файл

@ -0,0 +1,2 @@
util/radamsa/
results/

7
.gitignore поставляемый Executable file
Просмотреть файл

@ -0,0 +1,7 @@
.DS_Store
*.log
__pycache__/
.idea/
results/
.env
query.txt

3
.gitmodules поставляемый Executable file
Просмотреть файл

@ -0,0 +1,3 @@
[submodule "util/radamsa"]
path = util/radamsa
url = https://github.com/aoh/radamsa.git

29
Dockerfile Executable file
Просмотреть файл

@ -0,0 +1,29 @@
FROM ubuntu:16.04
RUN apt-get update && \
apt-get -y install \
python3 \
python3-pip \
git \
build-essential \
wget \
locales && rm -rf /var/lib/apt/lists/*
RUN sed -i -e 's/# en_US.UTF-8 UTF-8/en_US.UTF-8 UTF-8/' /etc/locale.gen && \
dpkg-reconfigure --frontend=noninteractive locales && \
update-locale LANG=en_US.UTF-8
ENV LANG en_US.UTF-8
RUN mkdir -p /hotfuzz/
WORKDIR /hotfuzz
ADD . /hotfuzz/
RUN git submodule update --init && make -s -C util/radamsa
RUN pip3 install -q -r requirements.txt
RUN mkdir results
RUN mkdir /toolkit

15
LICENSE.md Normal file
Просмотреть файл

@ -0,0 +1,15 @@
Copyright (C) 2018 Unity Technologies ApS
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL UNITY TECHNOLOGIES APS OR ANY OF ITS AFFILIATES (“UNITY”) BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Except as contained in this notice, the “Unity” name/mark shall not be used in advertising or otherwise to promote the sale, use or other dealings in this Software without prior written authorization from Unity.
### Third Party Notices
External frameworks/libraries used:
- Radamsa: https://github.com/aoh/radamsa

13
Makefile Executable file
Просмотреть файл

@ -0,0 +1,13 @@
.PHONY: test
setup:
scripts/setup.sh
test:
python3 -m unittest fuzz.test.test_fuzzer && python3 -m unittest test.test_cli
docker-build:
scripts/build.sh
mock-server:
python3 -m fuzz.test.mockserver

254
README.md Executable file
Просмотреть файл

@ -0,0 +1,254 @@
# HotFuzz
"Fuzzing or fuzz testing is an automated software testing technique that involves providing invalid, unexpected, or random data as inputs to a computer program." [Fuzzing - OWASP](https://www.owasp.org/index.php/Fuzzing)
HotFuzz is a fuzz testing utility that generates random data and sends it to a service's endpoint over HTTP or HTTPS based on a given fuzzing model written in JSON. HotFuzz is useful for security testing of a public/private service's endpoint to discover vulnerabilities in data validation, service implementation or performance among others.
In HotFuzz, we define a service as a program hosted in a given domain and listening for HTTP or HTTPS requests, where an endpoint is understood as one of the resources made available through that service. These endpoints represent the main service entry point where HotFuzz can be used to verify their resilience against unexpected data and attacks.
# Setting up and fuzzing with HotFuzz
There are two ways you can setup and run HotFuzz: natively or in Docker. Running HotFuzz natively is the most straightforward option to have full control of the fuzzer settings. On the other hand, running HotFuzz in Docker isolates the fuzzer execution allowing you to run fuzz tests more automatically and with less modifications of your working environment.
# Setup
These are the software requirements to be met:
- [Python => 3.5.x](https://www.python.org/downloads/release/python-350)
- pip3
- wget
- [Docker => 17.x.x](https://www.docker.com/community-edition#/download)
Next, this is how you run the setup process:
```
$ cd hot-fuzz/
$ sudo make setup
(Docker users)
$ sudo make docker-build
```
### Dependencies
Running HotFuzz natively requires either to modify your current Python environment or to use a [Python virtual environment](https://docs.python.org/3/tutorial/venv.html) to isolate your working environment.
To install the HotFuzz dependencies natively:
```
$ pip3 install -r requirements.txt
```
To use instead a Python virtual environment:
```
$ source run_venv.sh
```
### Docker Configuration
If you are using Docker for Mac, you will need to enable file sharing so that the fuzzer's docker container can save files to your local file system. Do this by opening the docker app, then go to `Preferences -> File Sharing` and add the full path of the directory `hot-fuzz`, e.g. :
```
/Users/(user)/projects/hot-fuzz
```
### Validation
Once the setup is done and all dependencies satisfied it's time to run the HotFuzz's test suite. The next commands show you how to run it natively and in Docker.
To run the test suite natively:
```
$ make test
python3 -m unittest test.test_fuzzer
Starting mock server at 127.0.0.1:8080
Running tests...
----------------------------------------------------------------------
chance (test.test_fuzzer.MutatorTests) ... OK (0.005s)
roll_dice (test.test_fuzzer.MutatorTests) ... OK (0.019s)
juggle_type (test.test_fuzzer.MutatorTests) ... OK (0.024s)
mutate_radamsa_state_change (test.test_fuzzer.MutatorTests) ... OK (0.694s)
mutate_radamsa_state_static (test.test_fuzzer.MutatorTests) ... OK (1.042s)
(...)
get_states_from_file (test.test_fuzzer.FuzzerTests) ... OK (0.002s)
----------------------------------------------------------------------
Ran 43 tests in 6.063s
OK
Generating XML reports...
```
The number of tests may differ but a successful run will print `OK` at the end and generate both, `*.log` and `*.xml` files with more details under the `results` directory.
```
$ ls results/
20170828154439.log TEST-test.test_fuzzer.FuzzerTests-20170828154436.xml
```
Once your HotFuzz setup passes the test suite is time to fuzz something!
## Fuzzing
To start fuzzing with HotFuzz we provide the testing server `mockserver.py`. The objective is to fuzz an endpoint of this testing service as hosted at the `example` domain. To complete this task we will modify an existing fuzzing model, run the fuzzer and analyze the results.
Next, start the testing server to get ready to fuzz it:
```
$ make mock-server
python3 -m fuzz.test.mockserver
Starting mock server at 127.0.0.1:8080
```
### Models
HotFuzz requires a fuzzing model to know where and how to fuzz a specific service's endpoint.
For instance, the example model file [tutorial.json](fuzz/models/tutorial.json) defines the required details to fuzz the `/watch` endpoint hosted at the `example` domain as follows:
```
{
"domains": {
"example": {
"host": "localhost",
"port": 8080,
"protocol": "http"
}
},
"endpoints": [
{
"uri": "/watch",
"comment": "watch video",
"methods": ["GET"],
"input": {
"query": {
"v": "9bZkp7q19f0",
"t": "1m05s"
}
}
}
]
}
```
This model instructs HotFuzz to send requests to the service listening for `http` connections at the host `localhost` port `8080`. These requests will be targeted to the `/watch` endpoint using the `GET` method and an input query consisting of two parameters `v` and `t` with the initial values `9bZkp7q19f0` and `1m05s` respectively.
### Fuzz it!
Run the fuzzer client to send three (`-i=3`) requests using the `tutorial.json` model file (`--model-path fuzz/models/tutorial.json`) against the `example` domain (`--domain example`) with full debug log (`--loglevel 0`) for further analysis:
```
$ ./cli.py -i=3 --model-path fuzz/models/tutorial.json --domain example --loglevel 0
```
Running the fuzzer successfully will generate no feedback output and leave the results under the `results` directory. Here we can have a more detailed look of how HotFuzz has sent the requests and how certain data fields were modified to fuzz the target endpoint.
In the output below you can see how the original values of the fields `v` and `t` have been modified. Sometimes these values remain the same, sometimes these have small variations and in other cases these have been completely replaced with "known-to-be-dangerous" values:
```
$ cat results/20170907164501_all_uris_all_methods.log
(...)
2017-09-07 16:45:01,476 DEBUG: http://localhost:8080 "GET /watch?v[]=9bZkp7q19f0&t=0m00m00s HTTP/1.1" 200 None
(...)
2017-09-07 16:45:01,522 DEBUG: http://localhost:8080 "GET /watch?v=340282366920938463463374607431768211457bZkp7q19f0&t=%3Cimg%20%5Cx12src%3Dx%20onerror%3D%22javascript%3Aalert%281%29%22%3E HTTP/1.1" 200 None
(...)
2017-09-07 16:45:01,538 DEBUG: http://localhost:8080 "GET /watch?v=9bZkp7q19fp7q19fp7qbZkp7q19bZkp7q19bZkp7q255f429bZkp7q197&t=1m05s HTTP/1.1" 200 None
```
HotFuzz will also log information about the response received by the service and more details about the request sent:
```
$ cat results/20170907164501_all_uris_all_methods.log
(...)
2017-09-07 16:45:01,513 ERROR: {"method": "GET", "headers": {"X-Hot-Fuzz-State": "0"}, "url": "http://localhost:8080/watch?v[]=9bZkp7q19f0&t=0m00m00s", "body": null, "size": 359, "response": "{\"success\": false, \"reason\": \"Not found\"}\n", "reason": "OK", "httpcode": 200, "time": 0.049}
(...)
```
In the above output, the field `response` stores the data received by the service when sending a request which details are summarized by the `methods`, `headers`, `url`, `body` and `size` fields.
### Custom mutation
In Fuzzing, mutation is commonly understood as the variations applied to the input data required to fuzz a given program. HotFuzz has a defined strategy to decide how to mutate input values, but it also offers to user a level of control over it. This control is provided by what we call the mutation placeholders which have the form `{name}` and are part of the fuzzing model.
Coming back to fuzzing the `example` domain, we can now make use of mutation placeholders to control what gets modified or mutated. Taking the original [tutorial.json](fuzz/models/tutorial.json) model we add the next modification to the `t` data field as follows:
```
"input": {
"query": {
"v": "9bZkp7q19f0",
"t": "1m{mutate_here}05s"
}
}
```
The above modification to the model will instruct HotFuzz to only mutate the `t` data field where the mutation placeholder `{mutate_here}` is located and let the rest of the data field untouched.
### Fuzz it, again!
Run the fuzzer again and see the differences with the new model:
```
$ ./cli.py -i=3 --model-path fuzz/models/tutorial.json --domain example --loglevel 0
```
In the results below you can verify how the `t` data field has been mutated differently this time by leaving the data chunks `1m` and `05s` intact:
```
$ cat results/20170907182405_all_uris_all_methods.log
(...)
2017-09-07 18:24:05,402 DEBUG: http://localhost:8080 "GET /watch?v[]=9bZkp7q19f0&t=1m%20%20%20%C2%9F%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%C2%80a%C2%8Aa05s HTTP/1.1" 200 None
(...)
2017-09-07 18:24:05,430 DEBUG: http://localhost:8080 "GET /watch?v=340282366920938463463374607431768211457bZkp7q19f0&t=1mo%CC%82%C2%8F%C2%BF3%E2%81%844a05s HTTP/1.1" 200 None
```
### Constants
To add further customization, HotFuzz allows to define data values in your model which may change or remain across different fuzzing runs, we call them constants. These can be detailed in either, a file like [constants.json](fuzz/test/constants.json) or in the command line.
First, update the fuzzing model `tutorial.json` to include two new constants name as `{endpoint}` and `{time}`:
```
"endpoints": [
{
"uri": "/{endpoint}",
"comment": "watch video",
"methods": ["GET"],
"input": {
"query": {
"v": "9bZkp7q19f0",
"t": "{time}"
}
}
}
]
```
Next, define the value of the new constant `{endpoint}` in the `constants.json` file as follows:
```
{
"{endpoint}": "watch"
}
```
Then, use the command line parameters `--constants` and `--constants-file` to define the value of the `{time}` constant, and to include the `constants.json` file respectively:
```
(...) --constants '{"{time}": "1m05s"}' --constants-file fuzz/test/constants.json (...)
```
### Fuzz it, once more
Run the fuzzer with the new command line and see how the constants get replaced in the results:
```
$ ./cli.py -i=3 --model-path fuzz/models/tutorial.json --domain example --constants '{"{time}": "1m05s"}' --constants-file fuzz/test/constants.json --loglevel 0
$ cat results/20171204173210_all_uris_all_methods.log
(...)
2017-12-04 17:32:10,403 DEBUG: http://localhost:8080 "GET /watch?v=340282366920938463463374607431768211457bZkp7q19f0&t=%3Cimg%20%5Cx12src%3Dx%20onerror%3D%22javascript%3Aalert%281%29%22%3E HTTP/1.1" 200 None
(...)
2017-12-04 17:32:10,425 DEBUG: http://localhost:8080 "GET /watch?v=9bZkp7q19fp7q19fp7qbZkp7q19bZkp7q19bZkp7q255f429bZkp7q197&t=1m05s HTTP/1.1" 200 None
```

0
__init__.py Executable file
Просмотреть файл

131
cli.py Executable file
Просмотреть файл

@ -0,0 +1,131 @@
#!/usr/bin/env python3
import os
import sys
import json
import argparse
from fuzz import request
from fuzz.fuzzer import Fuzzer
from fuzz.config.config import Config
class Client:
def __init__(self):
self.config = Config()
self.parser = argparse.ArgumentParser(description="Hot Fuzz: A fuzzing utility that sends HTTP requests of mutated json data models")
self.parser.add_argument("-m", "--model-path", metavar="path", type=str, nargs="?",
help="The path of the data model file relative to this directory (required)",
required=True)
self.parser.add_argument("-d", "--domain", metavar="domain", type=str, nargs="?",
help="The domain name in the data model that describes transport protocol, hostname, etc. (required)",
required=True)
self.parser.add_argument("-i", metavar="n", type=int, nargs="?", dest="iterations",
help="Number of iterations per endpoint (defaults to infinite)")
self.parser.add_argument("-t", "--timeout", metavar="s", type=float, nargs="?",
help="The default maximum time (seconds) to wait for a response per request if not defined in "
"the data model, defaults to " + str(request.DEFAULT_TIMEOUT), default=request.DEFAULT_TIMEOUT)
self.parser.add_argument("-s", metavar="n", type=int, nargs="?", dest="state",
help="Fuzzer initial state. Used to resume fuzzing sessions or a replay specific case", default=0)
self.parser.add_argument("-g", "--gtimeout", action="store_true",
help="Global timeout. If set, all timeout values in the data model will be overridden")
self.parser.add_argument("-c", "--constants", metavar="obj", type=str, nargs="?",
help="A JSON string where keys are strings to replace and values are the replacement (optional)")
self.parser.add_argument("-C", "--constants-file", metavar="path", type=str, nargs="?",
help="Relative path to a json file containing placeholder keys and constant values (optional). "
"If the --constants argument is also used, they will be combined with input from the "
"constants file. In this case, matching constants will be overwritten by those supplied "
"with the --constants argument.")
self.parser.add_argument("-u", "--uri", metavar="URI", type=str, nargs="?",
help="A specific endpoint that the fuzzer will target (defaults to all in the data model)")
self.parser.add_argument("--method", metavar=("list"), type=str, nargs="+", default=None,
help="An whitespace-separated list of request methods (see RFC7231 section 4.3). If empty, all "
"methods in the data model are used for the specified uri.")
self.parser.add_argument("-l", "--loglevel", metavar=("0,1,2,3"), type=int, nargs="?",
choices=self.config.logging_levels.keys(),
help="The log verbosity level: warning=3, info=2, debug=1, trace=0")
self.parser.add_argument("--statefile", metavar="path", type=str, nargs="?",
help="A relative file path that contains a list of states. See test/example_states.txt for details.")
self.parser.add_argument("--printcurl", action='store_true', help="The request to print a curl query command only.")
self.constants = {}
self.parsed_args = None
self.fuzzer_results = []
self.model_file_path = ""
self.states = []
def _validate_printcurl_args(self):
if self.parsed_args.printcurl:
if not self.parsed_args.uri:
print("-u argument (uri) is required")
sys.exit(1)
if not self.parsed_args.method:
print("--method argument is required")
sys.exit(1)
def _set_logging_level(self):
if self.parsed_args.loglevel is not None:
self.config.root_logger.setLevel(self.config.logging_levels[self.parsed_args.loglevel])
else:
self.config.root_logger.setLevel(self.config.logging_levels[3])
@staticmethod
def _get_cmd_string():
cmd = ""
for token in sys.argv:
try:
if isinstance(json.loads(token), (dict, list)):
cmd += " '{0}'".format(token)
else:
cmd += " " + token
except (json.decoder.JSONDecodeError, TypeError):
cmd += " " + token
return cmd
def _set_constants(self):
cli_constants = json.loads(self.parsed_args.constants) if self.parsed_args.constants else {}
if self.parsed_args.constants_file:
with open(self.parsed_args.constants_file, 'r') as file:
constants_from_file = json.loads(file.read())
self.constants = {**constants_from_file, **cli_constants} if constants_from_file else cli_constants
else:
self.constants = cli_constants
def parse_cli_args(self):
self.parsed_args = self.parser.parse_args()
self._validate_printcurl_args()
self._set_logging_level()
self._set_constants()
self.states = Fuzzer.get_states_from_file(self.parsed_args.statefile) if self.parsed_args.statefile else []
self.model_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.parsed_args.model_path)
def run_fuzzer(self):
fuzzer = Fuzzer(self.parsed_args.model_path,
self.parsed_args.domain,
self.parsed_args.gtimeout,
self.parsed_args.state,
self.parsed_args.timeout,
self.constants,
self.parsed_args.uri,
self.parsed_args.method,
self.config)
self.config.root_logger.log(self.config.note_log_level, self._get_cmd_string())
if self.parsed_args.printcurl:
print(" ---> Printing curl:\n" + fuzzer.get_curl_query_string())
else:
if self.states:
self.fuzzer_results = fuzzer.fuzz_requests_by_state_list(self.states)
fuzzer.log_last_state_used(fuzzer.state)
else:
self.fuzzer_results = fuzzer.fuzz_requests_by_incremental_state(self.parsed_args.iterations)
if __name__ == "__main__":
client = Client()
client.parse_cli_args()
client.run_fuzzer()

0
fuzz/__init__.py Executable file
Просмотреть файл

0
fuzz/config/__init__.py Executable file
Просмотреть файл

35
fuzz/config/config.ini Executable file
Просмотреть файл

@ -0,0 +1,35 @@
[DEFAULT]
placeholder_pattern = {[^}{]*}
max_iterrations_in_memory = 100
timeout = 3.0
request_failure_payload_threshold = 15000
drop_header_chance = 0.2
model_reload_interval_seconds = 180
fuzz_db = fuzz/config/fuzzlist.txt
radamsa_bin = util/radamsa/bin/radamsa
default_expectations = fuzz/models/expectations.json
results_dir = results
curl_config = %(results_dir)s/curl-config.txt
maximumRequestSegmentSizeInBytes = 81730
maximumUrlSizeInBytes = 14000
slack_client_token =
slack_channel =
slack_errors_per_hour = 7
slack_status_update_interval_seconds = 60
[test]
example_json_file = fuzz/test/example.json
example_json_expectation_file = fuzz/test/example_expectations.json
example_states_file = fuzz/test/example_states.txt
cli_coverage_file = results/cli.coverage
fuzzer_coverage_file = results/fuzzer.coverage
coverage_xml_file = results/coverage.xml
[docker]
home = /hotfuzz/

51
fuzz/config/config.py Executable file
Просмотреть файл

@ -0,0 +1,51 @@
import os
from pathlib import Path
import locale
import logging
import configparser
class Config:
# pylint: disable=too-many-instance-attributes, too-few-public-methods
def __init__(self):
self._pwd = str(Path(os.path.dirname(os.path.abspath(__file__))).parents[1])
self.parser = configparser.ConfigParser()
self.parser.read(os.path.join(self._pwd, "fuzz", "config", "config.ini"))
locale.setlocale(locale.LC_ALL, "en_US.UTF-8")
self.log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
self.trace_log_level = 9
logging.addLevelName(self.trace_log_level, "TRACE")
self.note_log_level = 51
logging.addLevelName(self.note_log_level, "NOTE")
self.logging_levels = {0: self.trace_log_level,
1: logging.DEBUG,
2: logging.INFO,
3: logging.WARNING}
self.root_logger = logging.getLogger()
self.root_logger.propagate = False
self.fuzz_db_array = open(os.path.join(self._pwd, self.parser.get("DEFAULT", "fuzz_db")), "r").read().splitlines()
self.results_dir = os.path.join(self._pwd, self.parser.get("DEFAULT", "results_dir"))
self.default_placeholder_pattern = self.parser.get("DEFAULT", "placeholder_pattern")
self.max_iterations_in_memory = self.parser.getint("DEFAULT", "max_iterrations_in_memory")
self.model_reload_interval_seconds = self.parser.getint("DEFAULT", "model_reload_interval_seconds")
self.curl_data_file_path = self.parser.get("DEFAULT", "curl_config")
self.expectations_path = os.path.join(self._pwd, self.parser.get("DEFAULT", "default_expectations"))
self.maximum_url_size_in_bytes = self.parser.getint("DEFAULT", "maximumUrlSizeInBytes")
self.drop_header_chance = self.parser.getfloat("DEFAULT", "drop_header_chance")
self.slack_client_token = self.parser.get("DEFAULT", "slack_client_token")
self.slack_channel = self.parser.get("DEFAULT", "slack_channel")
self.slack_errors_per_hour = self.parser.getint("DEFAULT", "slack_errors_per_hour")
self.slack_status_update_interval_seconds = self.parser.getint("DEFAULT", "slack_status_update_interval_seconds")
self.example_json_file = self.parser.get("test", "example_json_file")
self.example_expectations_file = self.parser.get("test", "example_json_expectation_file")
self.example_states_file = self.parser.get("test", "example_states_file")
self.cli_coverage_file = self.parser.get("test", "cli_coverage_file")
self.fuzzer_coverage_file = self.parser.get("test", "fuzzer_coverage_file")
self.coverage_xml_file = self.parser.get("test", "coverage_xml_file")

1004
fuzz/config/fuzzlist.txt Executable file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

438
fuzz/fuzzer.py Executable file
Просмотреть файл

@ -0,0 +1,438 @@
import sys
import signal
import json
import logging
import itertools
import traceback
import os
import re
import copy
from collections import OrderedDict
from time import strftime
from time import time
from time import localtime
import slackclient
from fuzz import mutator
from fuzz import request
from fuzz.config.config import Config
class Fuzzer: # pylint: disable=too-many-instance-attributes
def log_last_state_used(self, state):
self.config.root_logger.log(self.config.note_log_level, "Last state used: {0}".format(state))
def _send_slack_message(self, message):
return self.slacker.api_call("chat.postMessage", channel=self.config.slack_channel, text=message)
def exit_handler(self, signum, frame):
self.config.root_logger.log(self.config.note_log_level, "Exited with signal: {0}".format(signum))
if signum != 0:
self.config.root_logger.log(logging.ERROR, traceback.extract_stack(frame))
self.log_last_state_used(self.state)
self._send_slack_message("fuzzer stopped")
sys.exit(signum)
def _connect_slack_client(self):
self.slacker = slackclient.SlackClient(self.config.slack_client_token)
resp = self._send_slack_message("fuzzer started with log " + self.log_file_name)
if not resp["ok"]:
self.config.root_logger.log(logging.ERROR,
"failed to connect slack client to channel " + self.config.slack_channel +
" with token " + self.config.slack_client_token)
def __init__(self, schema_file_path, domain, global_timeout=False, state=0, timeout=None, constants=None,
uri=None, methods=None, config_obj=None):
"""
:param schema_file_path: string file handle for the schema
:param domain: domain name
:param global_timeout: if set to true, the timeout value will override all timeouts defined in the data model
:param state: the starting state number
:param timeout: amount of seconds (float) that the request api will wait for the first byte of a response
:param constants: dict of constants used for injecting into model placeholders
:param uri: string of a specific uri to choose from the model
:param methods: list of http methods to select from when iterating over each endpoint
"""
# pylint: disable=too-many-arguments
# pylint: disable=too-many-statements
self.constants = constants
self.schema_file_path = schema_file_path
self.domain = domain
self.timeout = timeout if timeout is not None and timeout > 0 else None
self.global_timeout = global_timeout
self.state = state
self.starting_state = state
self.model_obj = self.load_model()
self.uri = uri if uri else None
self.config = config_obj if config_obj else Config()
self.model_reload_rate = self.config.model_reload_interval_seconds
self.time_since_last_model_check = 0.0
if not os.path.exists(self.config.results_dir):
os.makedirs(self.config.results_dir)
if methods is None:
self.methods = request.METHODS
elif isinstance(methods, list):
self.methods = []
for m in methods:
if m not in request.METHODS:
raise RuntimeError("method {0} is not a valid HTTP method".format(str(m)))
self.methods.append(m.upper())
elif isinstance(methods, str) and methods in request.METHODS:
self.methods = [methods]
else:
raise RuntimeError("method {0} is not a valid HTTP method".format(str(methods)))
name = "-" + os.path.splitext(os.path.basename(self.schema_file_path))[0]
name += re.sub("[{}]", "", self.uri).replace("/", "-") if self.uri else "_all_uris"
name += "_all_methods" if self.methods == request.METHODS else "_" + "_".join(self.methods)
self.log_file_name = os.path.join(self.config.results_dir, "{0}{1}.log".format(strftime("%Y%m%d%H%M%S"), name))
file_handler = logging.FileHandler(self.log_file_name)
file_handler.setFormatter(self.config.log_formatter)
self.config.root_logger.addHandler(file_handler)
try:
with open(self.config.expectations_path, 'r') as file:
self.default_expectations = json.loads(file.read(), object_pairs_hook=OrderedDict)
except FileNotFoundError:
self.config.root_logger.error("Expectation file " + self.config.expectations_path +
" was unable to open. Default expectations were not set.")
self.default_expectations = {}
self.mutator = mutator.Mutator(self.config.fuzz_db_array, state)
signal.signal(signal.SIGABRT, self.exit_handler)
signal.signal(signal.SIGFPE, self.exit_handler)
signal.signal(signal.SIGILL, self.exit_handler)
signal.signal(signal.SIGINT, self.exit_handler)
signal.signal(signal.SIGSEGV, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
self.slacker = None
self._connect_slack_client()
self.slack_errors = 0
self.last_hour = localtime().tm_hour
self.last_slack_status_update = time()
@staticmethod
def evaluate_expectations(expectations_obj, result):
"""
Determine if the data contained in result meets the requirements provided in expectations_obj.
:param expectations_obj: A list of code to be evaluated to determine if result is acceptable
:param result: A result object provided by send_payload
:return: boolean: True if the result meets the expectation
"""
# pylint: disable=unused-argument, exec-used
# result and expectation are actually used in the exec call
expectation = False # pylint: disable=unused-variable
vlocals = locals()
if expectations_obj:
for k in expectations_obj.keys():
for e in expectations_obj[k]:
exec(e, globals(), vlocals)
return vlocals["expectation"]
@staticmethod
def inject_constants(model_obj, constants):
"""
Replace placeholders in the model with values in constants.
:param model_obj: data model subset
:param constants: dictionary of placeholders (keys) and replacements (values) for placeholders
:return: updated model_obj
"""
if not constants:
return model_obj
json_str = json.dumps(model_obj)
for k in constants.keys():
if constants[k] is True:
constants[k] = "true"
elif constants[k] is False:
constants[k] = "false"
json_str = json_str.replace(k, str(constants[k]))
return json.loads(json_str, object_pairs_hook=OrderedDict)
def mutate_payload(self, endpoint_obj):
"""
Mutate the payload
:param endpoint_obj: an entry in the endpoints list of the data model
:return: mutated payload dictionary
"""
payload = OrderedDict()
payload["uri"] = self.mutator.mutate(endpoint_obj["uri"], True, self.config.default_placeholder_pattern)
if endpoint_obj["input"].get("body"):
payload["body"] = self.mutator.mutate(endpoint_obj["input"]["body"],
pattern=self.config.default_placeholder_pattern)
else:
payload["body"] = None
if endpoint_obj["input"].get("query"):
payload["query"] = self.mutator.mutate(endpoint_obj["input"]["query"],
pattern=self.config.default_placeholder_pattern)
else:
payload["query"] = None
payload["headers"] = self.mutate_headers(endpoint_obj.get("headers", {}),
self.config.default_placeholder_pattern)
payload["headers"]["X-Hot-Fuzz-State"] = str(self.state)
return payload
def send_payload(self, payload, method, timeout, delay=0):
"""
Send the payload
:param payload: a mutated payload
:param method: request method
:param timeout: amount of seconds (float) that the request api will wait for the first byte of a response
:param delay: delay in seconds before the payload is sent
:return: request result dictionary
"""
return request.send_request(self.model_obj["domains"][self.domain], payload["uri"],
method, timeout, delay, payload["headers"],
payload["body"], payload["query"])
def load_model(self):
"""
Load the data model from the schema, then inject constants into the model.
:return: data model with injected constants
"""
with open(self.schema_file_path, 'r') as model_file:
return json.loads(model_file.read(), object_pairs_hook=OrderedDict)
def get_curl_query_string(self):
"""
Construct a mutated request payload and print it as a curl command with a curl config file.
:return: A curl command line string
"""
if not self.uri:
raise RuntimeError("uri must be a non-empty string")
method = self.methods[0]
endpoints = Fuzzer.get_endpoints(self.model_obj["endpoints"], self.uri, [method])
if not endpoints:
raise RuntimeError("failed to locate uri '{0}' with method '{1}' in model".format(self.uri, method))
endpoint_obj = self.inject_constants(endpoints[0], self.constants)
payload = self.mutate_payload(endpoint_obj)
return request.construct_curl_query(self.config.curl_data_file_path, self.model_obj["domains"][self.domain],
payload["uri"], method, payload["headers"],
payload["body"], payload["query"])
def change_state(self, new_state):
self.state = new_state
self.mutator.change_state(self.state)
@staticmethod
def get_endpoints(endpoints_list, uri=None, methods=None):
"""
Get all endpoint definitions for a uri
:param endpoints_list: the endpoints value of a model
:param uri: endpoint uri
:param methods: list of http request methods
:return:
"""
if not uri:
return endpoints_list
endpoints = []
for _, endpoint in enumerate(endpoints_list):
if uri == endpoint.get("uri", "") and\
(methods is None or set(endpoint.get("methods", request.METHODS)).intersection(methods) != set()):
endpoints.append(endpoint)
return endpoints
def get_expectations(self, endpoint_obj):
"""
Get the most granular expectations available for the endpoint.
:param endpoint_obj: an entry in the endpoints list of the data model
:return: a list of code used in evaluate_expectations()
"""
expectations = OrderedDict({})
if endpoint_obj.get("expectations", False):
expectations["local"] = endpoint_obj["expectations"]
elif self.model_obj.get("expectations", False):
expectations["global"] = self.model_obj["expectations"]
else:
expectations = self.default_expectations
return expectations
def iterate_endpoints(self):
"""
Send a newly mutated payload for each uri/method permutation. Logs information for each request.
:return: dict containing number of iterations (values) for each uri (keys)
"""
results = []
for endpoint_obj in Fuzzer.get_endpoints(self.model_obj["endpoints"], self.uri):
my_timeout = self.timeout
if not self.global_timeout:
my_timeout = endpoint_obj.get("timeout", my_timeout)
requests_per_second = endpoint_obj.get("requestsPerSecond", self.model_obj.get("requestsPerSecond"))
request_delay = request.get_request_delay(requests_per_second)
my_methods = list(set(endpoint_obj.get("methods", self.methods)).intersection(self.methods))
my_methods = [my_methods] if isinstance(my_methods, str) else my_methods
for method in my_methods:
if method not in endpoint_obj.get("methods", request.METHODS):
break
injected_endpoint_obj = Fuzzer.inject_constants(endpoint_obj, self.constants)
mutated_payload = self.mutate_payload(injected_endpoint_obj)
result = self.send_payload(mutated_payload, method, my_timeout, request_delay)
results.append(result)
summary = "state={0} method={1} uri={2} code={3} reason='{4}'"\
.format(result["headers"]["X-Hot-Fuzz-State"], method, endpoint_obj["uri"],
result.get("httpcode"), result.get("reason"))
expectations_obj = self.get_expectations(endpoint_obj)
if Fuzzer.evaluate_expectations(expectations_obj, result) is False:
self.config.root_logger.warning(summary)
self.config.root_logger.debug(request.dump_result(result))
# reset the counted slack errors every hour
if self.last_hour != localtime().tm_hour:
self.slack_errors = 0
self.last_hour = localtime().tm_hour
# print the error to slack if it does not exceed the throttle
if self.slack_errors < self.config.slack_errors_per_hour:
self._send_slack_message(summary)
self.slack_errors += 1
else:
self.config.root_logger.info(summary)
self.config.root_logger.log(self.config.trace_log_level, request.dump_result(result))
if time() - self.last_slack_status_update > self.config.slack_status_update_interval_seconds:
self._send_slack_message("current state is " + str(self.state))
self.last_slack_status_update = time()
self.config.root_logger.log(self.config.trace_log_level, "payload: " + json.dumps(mutated_payload))
if my_timeout is not None:
self.config.root_logger.log(self.config.trace_log_level,
"timeout={0}s delay={1}s".format(my_timeout, request_delay))
else:
self.config.root_logger.log(self.config.trace_log_level,
"delay={0}s".format(request_delay))
return results
def _check_for_model_update(self):
"""
If the check interval is reached, check for changes in the current model loaded in memory with a new instance
loaded from the same schema on the disk. If a change is found, reset the fuzzer state to its starting state,
update the loaded model, log the event, then reset the check interval.
:return:
"""
if self.model_reload_rate > self.time_since_last_model_check:
return
model = self.load_model()
if model != self.model_obj:
self.model_obj = model
self.config.root_logger.log(self.config.note_log_level, "at state " + str(self.state) +
" a new data model instance was loaded after detecting a change in " +
self.schema_file_path)
self.config.root_logger.log(self.config.note_log_level, "state has been reset to " + str(self.starting_state))
self.change_state(self.starting_state)
self.time_since_last_model_check = 0.0
def fuzz_requests_by_incremental_state(self, n_times=None):
"""
Send a request n_times for each uri/method permutation.
:param n_times: number of requests, this method will run indefinitely if n_times is None
:return: dict containing the first number of iterations (values) for each uri (keys)
"""
maxval = self.config.max_iterations_in_memory
results = []
r = itertools.count()
if n_times and n_times > 0:
r = range(n_times)
for _ in r:
self._check_for_model_update()
start = time()
my_results = self.iterate_endpoints()
if len(results) < maxval and n_times:
results.extend(my_results)
if len(results) > maxval:
results = results[:maxval]
self.change_state(self.state + 1)
self.time_since_last_model_check += time() - start
return results
def fuzz_requests_by_state_list(self, states):
"""
Functionally similar to fuzz_requests_by_incremental_state but instead applies for a list of states.
:param states: list of state numbers
:return: dict containing the first number of iterations (values) for each uri (keys)
"""
maxval = self.config.max_iterations_in_memory
results = []
for state in states:
self.change_state(state)
my_results = self.iterate_endpoints()
if len(results) < maxval:
results.extend(my_results)
if len(results) > maxval:
results = results[:maxval]
return results
@staticmethod
def get_states_from_file(file_handle):
"""
Get a list of fuzzer states from a text file.
:param file_handle: relative path to the state file
:return: list of states read from the file
"""
states = []
with open(file_handle, 'r') as state_file:
for state in state_file.read().split("\n"):
if state != "":
states.append(int(state))
return states
def mutate_headers(self, headers, pattern=None):
"""
Mutate or drop HTTP headers
:param headers: headers dictionary
:param pattern: a string regex
:return: mutated headers
"""
if headers is None:
return headers
mutated_headers = copy.deepcopy(headers)
headers_to_pop = []
for (key, value) in mutated_headers.items():
if pattern and re.search(pattern, value):
if self.mutator.chance(self.config.drop_header_chance):
headers_to_pop.append(key)
else:
mutated_headers[key] = self.mutator.safe_decode(self.mutator.mutate_regex(value, pattern).encode())
if headers_to_pop:
for header in headers_to_pop:
mutated_headers.pop(header, None)
return mutated_headers

55
fuzz/models/README.md Executable file
Просмотреть файл

@ -0,0 +1,55 @@
## Models
The fuzzer requires a JSON model giving it a description of what and how to attack. A model template is available at `template.json` and a full example is given in `example.json`.
### Structure
Particular model fields would require an elaboration (optional fields are contained in parentheses):
- `domains`: a dictionary of top-level domains, each domain key is the domain identifier, has the following fields:
- `host`: hostname of the target service
- `port`: tcp/ip port number of the target service, if set to `null` it will default to 80
- `protocol`: the transport protocol, can be either `http` or `https`
- `(expectations)`: global expectation (see *Expectations* section)
- `endpoints`: list of dictionaries describing each service endpoint, each dictionary has the following fields:
- `uri`: route path of the service (e.g. `http://github.com[/uri]`), has special mutation behavior (see *Mutation Behavior* section)
- `(timeout)`: the maximum time (seconds) between the request content sent and the response content to be received
- `(headers)`: dictionary of request headers, has special mutation behavior (see *Mutation Behavior* section)
- `(methods)`: list of http request methods, if this field is skipped, the fuzzer will use a pre-defined list of methods
- `(comment)`: this is a cosmetic field which is only for user readability
- `input`: the request's payload, which is either body, query, or both, has the following fields:
- `body`: represents data sent in the request body, this will typically require a JSON content-type header to function correctly
- `query`: dictionary of url query parameters
- `(expectations)`: local expectation (see *Expectations* section)
- `(requestsPerSecond)`: if set, the fuzzer will send this many requests in one second. Any value that is 0 or less means there is no delay before sending a request. This value can be defined globally in the model and locally in an endpoint. If both are defined, the local value will override the global one.
### Field Mutation
HotFuzz modification of data fields or mutation can be controlled by the user with mutation placeholders. A mutation placeholder has the form `{name}` and can be included into any data field description in a fuzzing model like `"t": "1m{mutator1}05s"` where `mutator1` acts as a name label.
The mutation placeholders in a fuzzing model are interpreted as follows:
- If the data field includes a mutation placeholder, only the placeholder location is modified.
- E.g. `"t": "1m{mutator1}05s"` would be mutated into `"t": "1m%20%20%20%C2%9F%2005s"`, `"t": "1mAAAAAAAA05s"`, ... etc.
- If the data field does not include any mutation placeholder, the whole data field is modified.
   - E.g. `"t": "1m05s"` would be mutated into `"t": "0m00m00s"`, `"t": "%3Cimg%20%5Cx12src%3Dx%20onerror%3D%22javascript%3Aalert%281%29%22%3E"`, ... etc.
- If the data field is an URI or header field, its data will not be modified unless there is a mutation placeholder.
   - E.g. `"uri": "/watch"` will remain intact, while `"uri": "/wa{mutate}tch"` would be mutated into `"uri": "/wa98s8d9fh!tch"`, `"uri": "/wa%20%20%20%C2%9F%20tch"`, ... etc.
## Expectations
Expectations are a set of user-defined rules that determine whether a request response is interpreted as good or bad. The fuzzer parses them in JSON format as a dictionary of Python code that is executed at runtime.
### Syntax and Semantics
1. Must conform to JSON format
1. Each expectation consists of a key for the name and an array of strings
1. The expectation strings must conform to Python 3.x syntax
1. Expectations can be defined in an endpoint object (local), the top-level of a data model (global), or in a separate JSON file which consists only of expectation definitions (default).
1. Local and global expectations can only have one key and must have "expectations" as the key name. Default expectations can have any string value as a key name and any number of keys.
1. Global expectations override defaults, while local expectations override all others.
1. Expectation definitions must assign a boolean value to the 'expectation' variable at least once. Otherwise, the evaluation will always be false.
1. Expectation definitions have access to the 'result' object which can be used for evaluating values.
## Making changes
When the fuzzer is run without a state file, it will periodically check for changes to the data model it loaded when starting. If it finds a change you made in your schema when comparing it to what is loaded in memory, it will reset its state to the starting state and apply the changes to the loaded model. This enables the fuzzer to run indefinitely as a service without requiring downtime to apply new changes to a data model. The `model_reload_interval_seconds` variable in fuzz/config/config.ini is the frequency, in seconds, in which the fuzzer will check for changes to the schema.

6
fuzz/models/expectations.json Executable file
Просмотреть файл

@ -0,0 +1,6 @@
{
"default": [
"code = int(result.get('httpcode', 0))",
"expectation = (code >= 400 and code < 500) or ('error' in result.get('response', '').lower() and code < 400)"
]
}

35
fuzz/models/template.json Executable file
Просмотреть файл

@ -0,0 +1,35 @@
{
"domains": {
"domainname": {
"host": "",
"port": null,
"protocol": "http"
}
},
"expectations": [
""
],
"endpoints": [
{
"uri": "",
"comment": "",
"methods": ["GET", "POST", "PUT", "PATCH", "DELETE"],
"headers": {
"Content-Type": ""
},
"expectations": [
""
],
"timeout": 10.0,
"requestsPerSecond": 2.5,
"input": {
"body": {
"foo": ""
},
"query": {
"bar": ""
}
}
}
]
}

22
fuzz/models/tutorial.json Executable file
Просмотреть файл

@ -0,0 +1,22 @@
{
"domains": {
"example": {
"host": "localhost",
"port": 8080,
"protocol": "http"
}
},
"endpoints": [
{
"uri": "/watch",
"comment": "watch video",
"methods": ["GET"],
"input": {
"query": {
"v": "9bZkp7q19f0",
"t": "1m05s"
}
}
}
]
}

162
fuzz/mutator.py Executable file
Просмотреть файл

@ -0,0 +1,162 @@
import os
import random
import copy
import re
from pathlib import Path
import configparser
from subprocess import Popen, PIPE, STDOUT
_pwd = os.path.dirname(os.path.abspath(__file__))
PROJECT_DIR = str(Path(_pwd).parents[0])
CONFIG = configparser.ConfigParser()
CONFIG.read(os.path.join(_pwd, "config", "config.ini"))
_radamsa_bin_env = os.environ.get("RADAMSA_BIN")
_radamsa_bin_config = os.path.join(PROJECT_DIR, CONFIG.get("DEFAULT", "radamsa_bin"))
RADAMSA_BIN = _radamsa_bin_env if _radamsa_bin_env is not None else _radamsa_bin_config
class Mutator:
def __init__(self, fuzzdb_array, state=0, byte_encoding="unicode_escape"):
self.own_rand = random.Random()
self.change_state(state)
self.fuzzdb_array = fuzzdb_array
self.byte_encoding = byte_encoding
def change_state(self, new_state):
self.state = new_state
self.own_rand.seed(self.state)
def chance(self, probability):
"""Returns True x% of the time"""
self.change_state(self.state)
return self.own_rand.random() < probability
def roll_dice(self, minimum, maximum):
self.change_state(self.state)
return self.own_rand.randint(minimum, maximum)
def safe_decode(self, input_bytes):
"""
Attempt to decode the input using byte_encoding. Return the value as a string if not possible.
"""
try:
output = input_bytes.decode(self.byte_encoding)
except (UnicodeDecodeError, OverflowError):
output = str(input_bytes) # Leave it as it is
return output
def mutate_radamsa(self, value):
"""
Mutate the value and encode the mutator output using byte_encoding.
:param value: seed value for the mutator
:param byte_encoding: name of the byte encoding method defined in the python encodings library
:return:
"""
value = str(value)
if self.state == -1:
radamsa_process = Popen([RADAMSA_BIN], stdout=PIPE, stdin=PIPE, stderr=STDOUT)
else:
radamsa_process = Popen([RADAMSA_BIN, "-s", str(self.state)], stdout=PIPE, stdin=PIPE, stderr=STDOUT)
radamsa_output = radamsa_process.communicate(input=value.encode(self.byte_encoding))[0]
return self.safe_decode(radamsa_output)
def juggle_type(self, value): # pylint: disable=too-many-return-statements, inconsistent-return-statements
roll = self.roll_dice(1, 6)
if roll == 1: # String
return str(value)
if roll == 2: # Boolean
return self.chance(0.5)
if roll == 3: # Number
try:
return int(value)
except ValueError:
if self.chance(0.5):
return 1
return 0
if roll == 4: # Array
return [value]
if roll == 5: # Object
return {str(value): value}
if roll == 6: # NoneType / null
return None
def pick_from_fuzzdb(self):
roll = self.roll_dice(0, len(self.fuzzdb_array) - 1)
return self.fuzzdb_array[roll]
def mutate_val(self, value):
roll = self.roll_dice(1, 3)
if roll == 1:
mutated_val = self.mutate_radamsa(value)
elif roll == 2:
mutated_val = self.juggle_type(value)
elif roll == 3:
mutated_val = self.pick_from_fuzzdb()
return mutated_val
@staticmethod
def list_obj_iterable(obj):
if isinstance(obj, dict):
return obj
return range(len(obj))
def mutate_regex(self, string, pattern):
"""
Discards tokens matching the pattern and replaces them with mutations seeded by the preceding string value
This works as long as the tokens in string are not sequential
"""
tokens = re.split(pattern, string)
mutated = ""
for index, token in enumerate(tokens):
mutated += token
if index < len(tokens) - 1:
mutated += str(self.mutate_val(token))
return mutated
def walk_and_mutate(self, obj, strict, pattern):
for key in self.list_obj_iterable(obj):
if isinstance(obj[key], (dict, list)): # Not a single val, dig deeper
self.walk_and_mutate(obj[key], strict, pattern)
elif isinstance(obj[key], str) and pattern and re.search(pattern, obj[key]):
obj[key] = self.mutate_regex(obj[key], pattern)
elif not strict:
obj[key] = self.mutate_val(obj[key])
def mutate(self, obj, strict=False, pattern=None):
"""
Main entry point
:obj: Data structure to mutate, can be any type
:strict: If true, values that are of type string will only be mutated where a substring matches the pattern
:pattern: A string regex
"""
if not obj:
return obj
elif isinstance(obj, str):
if pattern and re.search(pattern, obj):
obj = self.mutate_regex(obj, pattern)
elif not strict:
obj = self.mutate_val(obj)
return obj
else:
obj_to_mutate = copy.deepcopy(obj)
self.walk_and_mutate(obj_to_mutate, strict, pattern)
return obj_to_mutate

277
fuzz/request.py Executable file
Просмотреть файл

@ -0,0 +1,277 @@
import urllib
import time
import json
import os
import configparser
import unicodedata
import operator
import re
from collections import OrderedDict
from numbers import Number
import requests
CONFIG = configparser.ConfigParser()
CONFIG.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), "config", "config.ini"))
DEFAULT_TIMEOUT = CONFIG.getfloat("DEFAULT", "timeout")
METHODS = ["GET", "POST", "PUT", "PATCH", "DELETE"]
def get_header_size_in_bytes(header_obj):
"""
Calculate the header object size based on UTF-8 character encoding.
:param header_obj: dictionary of headers
:return: number of bytes of header_obj size
"""
header_bytes = 0
if header_obj is not None:
separator = ": "
crlf = "\r\n"
for key in header_obj.keys():
header_bytes += len(str(key)) + len(separator) + len(str(header_obj[key])) + len(crlf)
header_bytes += len(crlf)
return header_bytes
def send_request(domain_obj, uri, method, timeout=DEFAULT_TIMEOUT, delay=0, headers_obj=None, body_obj=None, query_obj=None):
"""
Send a request over http/s in query-string or json-body format.
:param domain_obj: domain dict
:param uri: request uri
:param method: A string representation of an http request method (RFC7231,4.3)
:param timeout: amount of seconds that the request api will wait for the first byte of a response
:param delay: delay in seconds before the request is sent
:param headers_obj: request headers dict
:param body_obj: request body dict
:param query_obj: request query parameters dict
:return: result object
"""
# pylint: disable=too-many-arguments
result = OrderedDict()
result["method"] = method
result["headers"] = headers_obj
result["body"] = body_obj
if delay > 0:
time.sleep(delay)
result["delay"] = delay
now = time.time()
try:
url, headers_obj = sanitize(domain_obj, uri, query_obj, headers_obj)
result["url"] = url
body_str = json.dumps(result["body"])
result["size"] = len(result["url"]) + \
len(body_str) + \
get_header_size_in_bytes(result["headers"])
r = requests.request(method, url, headers=headers_obj, timeout=timeout, data=body_str)
result["result"] = r # to be cleaned up in another code change
result["response"] = r.text
result["reason"] = r.reason
result["httpcode"] = r.status_code
except (OSError, ValueError, requests.exceptions.Timeout) as e:
result["reason"] = str(type(e)) + ": " + str(e.args)
finally:
result["time"] = round(time.time() - now, 3)
return result
def dump_result(result):
"""
Dump the result object created by send_request to a string-ified json. The requests.Reponse object is stripped
because it cannot be serialized.
"""
swap = result
swap.pop("result", None)
return json.dumps(swap)
def get_url_encoded_text(text):
return urllib.parse.quote(text, safe="/*-._")
def get_encoded_url(domain_obj, uri, query_obj=None):
"""
:param domain_obj: endpoint domain dict
:param uri: request uri
:param query_obj: request query parameters dict
:return: the query segment of a url (e.g. ?foo=123&bar=false)
"""
url = domain_obj["protocol"] + "://" + get_url_encoded_text(domain_obj["host"])
if isinstance(domain_obj.get("port"), int):
url += ":" + str(domain_obj["port"])
url += get_url_encoded_text(uri)
if query_obj:
param_string = "?"
for (key, value) in query_obj.items():
if value is not None:
if isinstance(value, list):
for n in value:
token = str(n).lower() if isinstance(n, bool) else str(n)
param_string = param_string + key + "[]=" + get_url_encoded_text(token) + "&"
else:
token = str(value).lower() if isinstance(value, bool) else str(value)
param_string = param_string + key + "=" + get_url_encoded_text(token) + "&"
url += param_string[:-1] # chop the last '&' off
return url
def construct_curl_query(curl_config_file_path, domain_obj, uri, method, headers_obj=None, body_obj=None, query_obj=None):
"""
Construct a curl query and write the body to JSON file if it presents
:param curl_config_file_path: a path to the TXT file, where the curl arguments should be written to
:param domain_obj: domain dictionary
:param uri: request uri
:param method: A string representation of an http request method (RFC7231,4.3)
:param headers_obj: request headers dictionary
:param body_obj: request body dictionary
:param query_obj: request query parameters dictionary
:return: the curl query
"""
# pylint: disable=too-many-arguments
headers = ""
body = ""
url, headers_obj = sanitize(domain_obj, uri, query_obj, headers_obj)
if body_obj is not None:
body = json.dumps(body_obj)
request = "request = {0}\n".format(method)
if headers_obj is not None:
for (key, value) in headers_obj.items():
headers += "header = \"{0}: {1}\"\n".format(key, value)
if body_obj is not None:
body = json.dumps(body) # serialize it again, so the data has a proper format in the config file
body = "data = {0}\n".format(body)
url = "url = \"{0}\"".format(url)
config_file = open(curl_config_file_path, "w+")
config_file.writelines([request, headers, body, url])
config_file.close()
curl_query = "curl -g -K {0}".format(curl_config_file_path)
return curl_query
def get_request_delay(requests_per_second):
if (requests_per_second is not None) and isinstance(requests_per_second, Number) and (requests_per_second > 0):
one_second = 1
request_delay = one_second / requests_per_second
return request_delay
return 0
def truncate_object(obj, n_bytes, is_header=False):
"""
Reduce the number of character bytes in obj by n_bytes. This is necessary to avoid rejected over-sized requests.
:param obj: A request object such as a body or header
:param n_bytes: The number of character bytes to strip from the object
:param is_header:
:return: Truncated request object
"""
if n_bytes > 0:
critical_headers = ["authorization", "content-type", "x-hot-fuzz-state"]
for key in obj if isinstance(obj, dict) else range(len(obj)):
if isinstance(obj[key], (dict, list)):
truncate_object(obj[key], n_bytes, is_header)
elif isinstance(obj[key], str):
if is_header and str(key).lower() in critical_headers and len(obj[key]) <= n_bytes:
pass
elif n_bytes > len(obj[key]):
n_bytes -= len(obj[key])
obj[key] = ""
else:
obj[key] = obj[key][:len(obj[key]) - n_bytes]
break
return obj
TRUNCATION_RESIZE_FACTOR = 3
MAX_REQUEST_SEGMENT_SIZE = CONFIG.getint("DEFAULT", "maximumRequestSegmentSizeInBytes")
def sanitize_headers(headers_obj, max_n_bytes=MAX_REQUEST_SEGMENT_SIZE):
"""
Remove invalid strings from headers_obj and truncate the header to a size of at most max_n_bytes.
:param headers_obj: headers dict
:param max_n_bytes: byte limit of header size
:return: a modified headers_obj
"""
rNewlines = re.compile(r"^\\+n+$")
invalidCategories = ["C"]
for (key, _) in headers_obj.items():
headers_obj[key] = headers_obj[key].strip()
headers_obj[key] = re.sub(rNewlines, "", headers_obj[key])
headers_obj[key] = "".join(ch for ch in headers_obj[key] if unicodedata.category(ch)[0] not in invalidCategories)
size = get_header_size_in_bytes(headers_obj)
while size > max_n_bytes:
newSize = int(size / TRUNCATION_RESIZE_FACTOR)
headers_obj = truncate_object(headers_obj, newSize, is_header=True)
size = get_header_size_in_bytes(headers_obj)
return headers_obj
def sanitize_url(domain_obj, uri, query_obj=None, max_n_bytes=MAX_REQUEST_SEGMENT_SIZE):
"""
Truncate the url to max_n_bytes length. The url will be valid-enough to put in a request.
:param domain_obj: dict containing the domain defined in the data model
:param uri: request route string
:param query_obj: dict describing the query parameters and values of the request url
:param max_n_bytes: maximum length of the url
:return: a url string
"""
return get_encoded_url(domain_obj, uri, query_obj)[:max_n_bytes]
MAX_URL_SIZE = CONFIG.getint("DEFAULT", "maximumUrlSizeInBytes")
def sanitize(domain_obj, uri, query_obj=None, headers_obj=None):
"""
Prepare the request components to conform to the ssl library's and endpoint's http parser's specifications.
:param domain_obj: dict containing the domain defined in the data model
:param uri: request route string
:param query_obj: dict describing the query parameters and values of the request url
:param max_n_bytes: byte limit of the total request size
:param headers_obj: http headers dict
:return: The return values should be acceptable to send as a request.
"""
if headers_obj is not None:
headers_obj = sanitize_headers(headers_obj)
url = sanitize_url(domain_obj, uri, query_obj, MAX_URL_SIZE)
sizes = {
"headers": get_header_size_in_bytes(headers_obj),
"url": len(url)
}
while sum(sizes.values()) > MAX_REQUEST_SEGMENT_SIZE:
key = sorted(sizes.items(), key=operator.itemgetter(1), reverse=True)[0][0]
if key == "url":
url = sanitize_url(domain_obj, uri, query_obj, int(sizes["url"] / TRUNCATION_RESIZE_FACTOR))
sizes["url"] = len(url)
elif key == "headers":
headers_obj = sanitize_headers(headers_obj, int(sizes["headers"] / TRUNCATION_RESIZE_FACTOR))
sizes["headers"] = get_header_size_in_bytes(headers_obj)
return url, headers_obj

0
fuzz/test/__init__.py Executable file
Просмотреть файл

3
fuzz/test/constants.json Executable file
Просмотреть файл

@ -0,0 +1,3 @@
{
"{placeholder}": 123
}

197
fuzz/test/example.json Executable file
Просмотреть файл

@ -0,0 +1,197 @@
{
"domains": {
"example": {
"host": "localhost",
"port": null,
"protocol": "http"
},
"local": {
"host": "localhost",
"port": 8080,
"protocol": "http"
}
},
"requestsPerSecond": 500,
"endpoints": [
{
"uri": "/json",
"comment": "videos",
"methods": ["POST"],
"headers": {
"Authorization": "Bearer {token}",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
},
"timeout": 2.5,
"input": {
"body": {
"accessible": false,
"dynamicField": "stuff {placeholder}",
"owner": "user-1",
"parent": {
"child": 23,
"child2": {
"grandchild": "weirdval"
}
},
"array": [
{
"child": 23,
"child2": {
"grandchild": "weirdval"
}
},
{
"child": 23,
"child2": {
"grandchild": "weirdval"
}
},
{
"child": 23,
"child2": {
"grandchild": "weirdval"
}
}
]
}
}
},
{
"uri": "/watch",
"comment": "watch video",
"methods": ["GET"],
"input": {
"query": {
"v": "9bZkp7q19f0",
"t": "{time}"
}
}
},
{
"uri": "/sleepabit",
"comment": "watch video",
"methods": ["GET"],
"input": {
"query": {
"v": "9bZkp7q19f0",
"t": "{time}"
}
},
"expectations": [
"import string",
"expectation = result.get('httpcode', 0) == 200 and string.digits == '0123456789'",
"expectation = expectation and result.get('time', 0) >= 1"
]
},
{
"uri": "/delayabit",
"comment": "delay video",
"methods": ["GET"],
"requestsPerSecond": 2.5,
"input": {}
},
{
"uri": "/query/string",
"comment": "query string endpoint",
"methods": ["GET"],
"input": {
"query": {
"int": 0,
"list": [1, 2, 3],
"string": "a",
"bool": false,
"float": 0.1
}
}
},
{
"uri": "/complex/qstring",
"comment": "query string with complex parameters",
"methods": ["GET"],
"input": {
"query": {
"obj": {"a": {"b": []}},
"list": [[[1], [1]]]
}
}
},
{
"uri": "/multiple",
"comment": "this uri has a GET method",
"methods": ["GET"],
"input": {
"query": {
"stuff": false
}
}
},
{
"uri": "/multiple",
"comment": "also has a POST method",
"methods": ["POST"],
"input": {
"body": {
"stuff": false
}
}
},
{
"uri": "/multiple",
"comment": "supports both PUT and PATCH",
"methods": ["PUT", "PATCH"],
"input": {
"body": {
"stuff": false
}
}
},
{
"uri": "/any/method",
"comment": "all methods are implicitly supported",
"input": {
"body": {
"stuff": false,
"sherb": "asdf"
}
}
},
{
"uri": "/{someId}",
"comment": "pivoted uri",
"methods": ["GET"],
"input": {
"body": {
}
}
},
{
"uri": "/{otherId}",
"comment": "another pivoted uri",
"methods": ["GET"],
"input": {
"body": {
}
}
},
{
"uri": "/poorly/designed/endpoint",
"comment": "supports any method and has both a body and a query string",
"headers": {
"Authorization": "Bearer foobar",
"Accept": "application/json"
},
"input": {
"body": {
"foo": false,
"bar": 0,
"puu": ""
},
"query": {
"wot": "",
"ferr": 5,
"doh": false
}
}
}
]
}

Просмотреть файл

@ -0,0 +1,63 @@
{
"domains": {
"example": {
"host": "localhost",
"port": null,
"protocol": "http"
},
"local": {
"host": "localhost",
"port": 8080,
"protocol": "http"
}
},
"expectations": [
"expectation = True"
],
"endpoints": [
{
"uri": "/json",
"comment": "videos",
"methods": [
"POST"
],
"headers": {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
},
"timeout": 2.5,
"input": {
"body": {
"accessible": false,
"dynamicField": "stuff {placeholder}",
"owner": "user-1",
"parent": {
"child": 23,
"child2": {
"grandchild": "weirdval"
}
},
"array": [
{
"child": 23,
"child2": {
"grandchild": "weirdval"
}
},
{
"child": 23,
"child2": {
"grandchild": "weirdval"
}
},
{
"child": 23,
"child2": {
"grandchild": "weirdval"
}
}
]
}
}
}
]
}

7
fuzz/test/example_states.txt Executable file
Просмотреть файл

@ -0,0 +1,7 @@
234
812
1
999909
234
22222893428923498
9

71
fuzz/test/mockserver.py Executable file
Просмотреть файл

@ -0,0 +1,71 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
import time
class httpHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path.startswith('/json'):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true, "num": 1}\n')
elif self.path.startswith('/multiple'):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true, "num": 2}\n')
elif self.path.startswith('/any/method'):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true, "num": 3}\n')
elif self.path.startswith('/watch'):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": false, "reason": "Video not found"}\n')
elif self.path.startswith('/sleepabit'):
try:
time.sleep(1)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true, "num": 3}\n')
except BrokenPipeError:
pass
elif self.path.startswith('/delayabit'):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"success": true, "num": 4}\n')
elif self.path.startswith('/fail'):
self.send_response(500)
self.end_headers()
elif self.path.startswith('/die'):
self.send_response(500)
self.end_headers()
self.server.shutdown()
else:
self.send_response(404)
def log_message(self, format, *args): # pylint: disable=redefined-builtin
return
def run_mock_server():
print('Starting mock server at 127.0.0.1:8080')
my_server = HTTPServer(('127.0.0.1', 8080), httpHandler)
my_server.serve_forever()
if __name__ == "__main__":
run_mock_server()

1200
fuzz/test/test_fuzzer.py Executable file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

3
kill_fuzzer_container.sh Executable file
Просмотреть файл

@ -0,0 +1,3 @@
#! /bin/bash
docker kill -s 2 $(docker ps | grep "hot-fuzz" | awk '{print $1}')

164
pylintrc Executable file
Просмотреть файл

@ -0,0 +1,164 @@
[MASTER]
ignore=CVS,.env,.git
jobs=1
suggestion-mode=yes
unsafe-load-any-extension=no
[MESSAGES CONTROL]
disable=invalid-name,
missing-docstring,
line-too-long
enable=c-extension-no-member
[REPORTS]
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
output-format=text
reports=no
score=yes
[REFACTORING]
max-nested-blocks=5
never-returning-functions=optparse.Values,sys.exit
[VARIABLES]
allow-global-unused-variables=yes
callbacks=cb_,
_cb
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
ignored-argument-names=_.*|^ignored_|^unused_
init-import=no
redefining-builtins-modules=six.moves,past.builtins,future.builtins
[LOGGING]
logging-modules=logging
[BASIC]
argument-naming-style=snake_case
attr-naming-style=snake_case
class-attribute-naming-style=any
class-naming-style=PascalCase
const-naming-style=UPPER_CASE
function-naming-style=snake_case
method-naming-style=snake_case
module-naming-style=snake_case
inlinevar-naming-style=any
variable-naming-style=snake_case
include-naming-hint=no
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
docstring-min-length=-1
good-names=i,
j,
k,
ex,
Run,
_
no-docstring-rgx=^_
property-classes=abc.abstractproperty
[FORMAT]
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
indent-after-paren=4
indent-string=' '
max-line-length=200
max-module-lines=2000
no-space-check=trailing-comma,
dict-separator
single-line-class-stmt=no
single-line-if-stmt=no
[MISCELLANEOUS]
notes=FIXME,
XXX,
TODO
[TYPECHECK]
contextmanager-decorators=contextlib.contextmanager
ignore-mixin-members=yes
ignore-on-opaque-inference=yes
ignored-classes=optparse.Values,thread._local,_thread._local
missing-member-hint=yes
missing-member-hint-distance=1
missing-member-max-choices=1
[SIMILARITIES]
ignore-comments=yes
ignore-docstrings=yes
ignore-imports=no
min-similarity-lines=4
[SPELLING]
max-spelling-suggestions=4
spelling-dict=
spelling-ignore-words=
spelling-private-dict-file=
spelling-store-unknown-words=no
[CLASSES]
defining-attr-methods=__init__,
__new__,
setUp
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
valid-classmethod-first-arg=cls
valid-metaclass-classmethod-first-arg=mcs
[IMPORTS]
allow-wildcard-with-all=no
analyse-fallback-blocks=no
deprecated-modules=optparse,tkinter.tix
known-third-party=enchant
[DESIGN]
max-args=5
max-attributes=7
max-bool-expr=5
max-branches=12
max-locals=15
max-parents=7
max-public-methods=20
max-returns=6
max-statements=50
min-public-methods=2
[EXCEPTIONS]
overgeneral-exceptions=Exception

11
requirements.txt Executable file
Просмотреть файл

@ -0,0 +1,11 @@
pycodestyle>=2.3.1
requests==2.13.0
six==1.10.0
unittest-xml-reporting==2.1.0
xmlrunner==1.7.7
pylint>=1.8.2
pyflakes>=1.6.0
git-pylint-commit-hook>=2.2.2
coverage==4.5.1
slackclient==1.2.1
websocket-client==0.47.0

40
run_fuzzer_container.sh Executable file
Просмотреть файл

@ -0,0 +1,40 @@
#! /bin/bash
cmdname=$(basename $0)
cmddir="$(dirname $0)"
function usage {
cat << EOF
Usage:
Run the fuzzer from a docker container
$cmdname [fuzzer args]
EOF
exit 1
}
if [ $# -eq 0 ]; then
usage
fi
fuzz_lib=$( cd ${LIB_DIR:-"fuzz/"}; pwd)
pushd $cmddir
if [ ! -d results ]; then
mkdir results
fi
identifier=${IMAGE_IDENTIFIER:-`date "+%y-%m-%d.%H%M%S"`}
image=${IMAGE:-"hot-fuzz"}
results="$(pwd)/results/"
fuzzer="$(pwd)/fuzzer.py"
echo "=== Launching fuzzer container"
docker run -e DOCKER=1 -v $results:/hotfuzz/results/ -v $fuzz_lib:/hotfuzz/fuzz/ -v $fuzzer:/hotfuzz/fuzzer.py --rm -t --name=image-${identifier} ${image} python3 fuzzer.py "$@"
success=$?
popd
exit $success

45
run_venv.sh Executable file
Просмотреть файл

@ -0,0 +1,45 @@
#! /bin/bash
PYTHON=${PYTHON:=python3}
PIP=${PIP:=pip3}
REQUIREMENTS=requirements.txt
####################################
# Usage
### source run_venv.sh
# or
### ./run_venv.sh activate
# or
### ./run_venv.sh setup_virtualenv
####################################
function setup_virtualenv {
echo -n "starting up virtual environment ... "
virtualenv -q -p $(which $PYTHON) --no-site-packages --distribute .env
echo "done"
}
# Usage:
## source activate_venv.sh activate
function activate {
source .env/bin/activate
}
function install {
echo -n "installing requirements to virtual environment ... "
if [ -f $REQUIREMENTS ]; then
$PIP install -q -r $REQUIREMENTS
fi
echo "done"
}
# Bash magic to call functions defined here from the CLI e.g.
## ./activate_venv.sh activate
"$@"
if [ -z "$1" ]; then
setup_virtualenv
activate
install
fi

3
scripts/build.sh Executable file
Просмотреть файл

@ -0,0 +1,3 @@
image=${image:-"hot-fuzz"}
docker build -t ${image} .

79
scripts/setup.sh Executable file
Просмотреть файл

@ -0,0 +1,79 @@
#! /bin/bash
cd "$(dirname "$0")"
echo -n "checking dependencies ... "
if ! [ -x "$(command -v python3)" ]; then
echo "python3 is required"
exit 1
fi
if ! [ -x "$(command -v pip3)" ]; then
echo "pip3 is required"
exit 1
fi
echo "done"
pip3 show -q virtualenv
if [ $? -ne 0 ]; then
pip3 install virtualenv
fi
linter="pycodestyle"
options="--ignore=E501,E124 --show-source --exclude=.env"
lintcmd="$linter $options $(cd .. ; pwd)"
linthooktype="pre-commit"
linthook="../.git/hooks/$linthooktype"
# pylint pre-commit hook on git
echo -n "setting up $linthooktype hook for python git-pylint-commit-hook and $linter... "
echo "#!/bin/bash" > $linthook
echo "./run_venv.sh setup_virtualenv" >> $linthook
echo "source .env/bin/activate" >> $linthook
echo "./run_venv.sh install" >> $linthook
echo $lintcmd >> $linthook
echo "git-pylint-commit-hook --limit 10 --pylintrc $(cd .. ; pwd)/pylintrc" >> $linthook
echo "done"
echo -n "setting execute bit for $linthook ..."
chmod +x $linthook
echo "done"
testhooktype="pre-push"
testhook="../.git/hooks/$testhooktype"
testcmd="PYTHONPATH=$(pwd) VERBOSE=1 make test"
echo -n "setting $testhooktype hook for python unit tests ... "
echo "#!/bin/bash" > $testhook
echo "./run_venv.sh setup_virtualenv" >> $testhook
echo "source .env/bin/activate" >> $testhook
echo "./run_venv.sh install" >> $testhook
echo $testcmd >> $testhook
chmod +x $testhook
echo "done"
echo -n "setting up radamsa fuzzer ... "
radamsa_dir=../util/radamsa
if ! [ -d ./$radamsa_dir/.git ]; then
git submodule update --init
fi
# Build radamsa
make -s -C ./$radamsa_dir
if [ $? -ne 0 ]; then
echo "failed to build radamsa"
exit 1
fi
echo "done"

0
test/__init__.py Executable file
Просмотреть файл

160
test/test_cli.py Executable file
Просмотреть файл

@ -0,0 +1,160 @@
import unittest
import sys
import os
import json
import xmlrunner
import coverage
from cli import Client
from fuzz.config.config import Config
test_config = Config()
if os.path.exists(test_config.cli_coverage_file):
os.remove(test_config.cli_coverage_file)
cov = coverage.Coverage(data_file=test_config.cli_coverage_file,
source=["cli",
"test.test_cli",
"fuzz.config.config"])
cov.start()
class CliTests(unittest.TestCase):
def setUp(self):
self.client = Client()
self.argparse_args = ["-m", os.path.join("."), "-d", "test"]
def _validate_printcurl_args(self):
# pylint: disable=protected-access
def evaluate_args(additional_args):
self.client.parsed_args = self.client.parser.parse_args(self.argparse_args + additional_args)
try:
self.client._validate_printcurl_args()
self.fail("should raise SystemExit")
except SystemExit as ex:
self.assertEqual(ex.code, 1)
evaluate_args(["--printcurl"])
evaluate_args(["--printcurl", "--method", "POST"])
evaluate_args(["--printcurl", "--uri", "/test"])
self.client.parsed_args = self.client.parser.parse_args(self.argparse_args + ["--printcurl", "--uri", "/test", "--method", "POST"])
self.client._validate_printcurl_args()
def _set_logging_level(self):
# pylint: disable=protected-access
def evaluate_log_level_arg(level):
self.client.parsed_args = self.client.parser.parse_args(self.argparse_args + ["-l", str(level)])
self.client._set_logging_level()
msg = "should be log level {0} when level was set to {1}".format(self.client.config.logging_levels[level], str(level))
self.assertEqual(self.client.config.root_logger.level, self.client.config.logging_levels[level], msg)
self.client.parsed_args = self.client.parser.parse_args(self.argparse_args + ["--loglevel", str(level)])
self.client._set_logging_level()
msg = "should be log level {0} when level was set to {1}".format(self.client.config.logging_levels[level], str(level))
self.assertEqual(self.client.config.root_logger.level, self.client.config.logging_levels[level], msg)
for i in range(0, len(self.client.config.logging_levels)):
evaluate_log_level_arg(i)
def _get_cmd_string(self):
# pylint: disable=protected-access
args = ["./cli.py"] + self.argparse_args
sys.argv = args
self.assertEqual(self.client._get_cmd_string().strip(" "), " ".join(sys.argv).strip(" "),
"should reproduce command line input as a string")
jsonargs = ["-c", '{"my": "test", "json": "arg"}']
sys.argv = args + jsonargs
actual = self.client._get_cmd_string().strip(" ")
expected = " ".join(args + [jsonargs[0]]) + " '{0}'".format(jsonargs[1])
self.assertEqual(actual, expected, "should reproduce input with json arg surrounded with quotes")
def _set_constants(self):
# pylint: disable=protected-access
jsonfile = self.client.config.example_json_file
jsonfile_args = ["-C", jsonfile]
args = self.argparse_args + jsonfile_args
self.client.parsed_args = self.client.parser.parse_args(args)
self.client._set_constants()
with open(jsonfile, "r") as file:
jsonfile_constants = json.loads(file.read())
self.assertEqual(self.client.constants, jsonfile_constants, "should load constants from " + jsonfile)
jsonstring_args = ["-c", '{"{otherPlaceholder}":5}']
args += jsonstring_args
self.client.parsed_args = self.client.parser.parse_args(args)
self.client._set_constants()
constants = self.client.constants
constants.update(json.loads(jsonstring_args[1]))
self.assertEqual(self.client.constants, constants,
"should combine constants from " + jsonfile + " with args from " + str(jsonstring_args))
jsonstring_args = ["-c", '{"{otherPlaceholder}":5, "{placeholder}":"test"}']
args = self.argparse_args + jsonfile_args + jsonstring_args
self.client.parsed_args = self.client.parser.parse_args(args)
self.client._set_constants()
constants = self.client.constants
constants.update(json.loads(jsonstring_args[1]))
self.assertEqual(self.client.constants, constants,
"should overwrite constants from " + jsonfile + " with args from " + str(jsonstring_args))
def parse_cli_args(self):
model_file = self.client.config.example_json_file
cmdline_args = ["./cli.py", "-d", "test", "-m", model_file]
sys.argv = cmdline_args
self.client.parse_cli_args()
self.assertEqual(self.client.states, [], "should have empty state list since no state file was provided")
with open(self.client.model_file_path, "r"):
pass
state_file = self.client.config.example_states_file
sys.argv = cmdline_args + ["--statefile", state_file]
self.client.parse_cli_args()
expected_states = [234, 812, 1, 999909, 234, 22222893428923498, 9]
self.assertEqual(self.client.states.sort(), expected_states.sort())
def run_fuzzer(self):
model_file = self.client.config.example_json_file
state_file = self.client.config.example_states_file
cmdline_args = ["./cli.py", "-d", "local", "-m", model_file, "-u", "/json", "--method", "POST"]
sys.argv = cmdline_args + ["--statefile", state_file]
self.client.parse_cli_args()
self.client.run_fuzzer()
expected_nstates = 7
self.assertEqual(len(self.client.fuzzer_results),
expected_nstates,
"should execute " + str(expected_nstates) + " iterations (each state in the state file)")
sys.argv = cmdline_args + ["-i", str(expected_nstates)]
self.client.parse_cli_args()
self.client.run_fuzzer()
self.assertEqual(len(self.client.fuzzer_results),
expected_nstates,
"should execute " + str(expected_nstates) + " iterations")
Suite = unittest.TestSuite()
Suite.addTests([CliTests("_validate_printcurl_args"),
CliTests("_set_logging_level"),
CliTests("_get_cmd_string"),
CliTests("_set_constants"),
CliTests("parse_cli_args"),
CliTests("run_fuzzer")])
test_runner = xmlrunner.XMLTestRunner(output="results", verbosity=int(os.environ.get("VERBOSE", 2)))
result = not test_runner.run(Suite).wasSuccessful()
cov.stop()
cov.save()
try:
cov.combine(data_paths=[test_config.cli_coverage_file,
test_config.fuzzer_coverage_file], strict=True)
except coverage.CoverageException:
pass # ignore the exception, but don't combine if not all files exist to prevent xml report failure
cov.xml_report(outfile=test_config.coverage_xml_file)
sys.exit(result)

0
util/__init__.py Executable file
Просмотреть файл