This commit is contained in:
Yvan Boily 2012-02-09 11:47:13 -08:00
Родитель 03cb29b1a5
Коммит 86668d98b3
8 изменённых файлов: 580 добавлений и 248 удалений

Просмотреть файл

@ -1,7 +1,6 @@
from urlparse import urlparse
from urlparse import urlparse, urljoin
import requests
from scanner import ActiveTest, PassiveTest, Scanner, get_url
from scanner import ActiveTest, PassiveTest, HtmlTest, Scanner
class HttpOnlyAttributePresent(PassiveTest):
description = "Inspect the Set-Cookie: header and determine if the HttpOnly attribute is present."
@ -12,11 +11,11 @@ class HttpOnlyAttributePresent(PassiveTest):
if "httponly" in response.headers[cookieheader].lower():
result = self.result("Pass", "HttpOnly is set", response.headers[cookieheader])
else:
result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
else:
result = self.result("Skip", "No cookie is set by this response.", None)
return result
class SecureAttributePresent(PassiveTest):
description = "Inspect the Set-Cookie: header and determine if the Secure attribute is present."
def analyze(self, response):
@ -24,20 +23,20 @@ class SecureAttributePresent(PassiveTest):
cookieheader = "Set-Cookie"
has_cookie = cookieheader in response.headers
if has_cookie:
if "httponly" in response.headers[cookieheader].lower():
if "secure" in response.headers[cookieheader].lower():
if url.scheme == "https":
result = self.result("Pass", "HttpOnly is set", response.headers[cookieheader])
result = self.result("Pass", "Secure cookie attribute is set", response.headers[cookieheader])
else:
result = self.result("Fail", "HttpOnly should only be set for cookies sent over SSL.", response.headers[cookieheader])
result = self.result("Fail", "Secure cookie attribute should only be set for cookies sent over SSL.", response.headers[cookieheader])
else:
if url.scheme == "https":
result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
result = self.result("Fail", "Secure cookie attribute is not set", response.headers[cookieheader])
else:
result = self.result("Pass", "The secure attribute is not set (expected for HTTP)", response.headers[cookieheader])
result = self.result("Pass", "The secure attribute is not set (expected for HTTP)", response.headers[cookieheader])
else:
result = self.result("Skip", "No cookie is set by this response.", None)
return result
class StrictTransportSecurityPresent(PassiveTest):
secure_only = True
@ -66,7 +65,8 @@ class Http200Check(ActiveTest):
run_passives = True
description = "Make a GET request to the specified URL, reporting success only on a 200 response without following redirects"
def do_test(self, url):
response = get_url(url, False)
sess = self.sessions[self.url]
response = sess.get(url, allow_redirects=False)
if response.status_code == 200:
result = self.result("Pass", "The request returned an HTTP 200 response.", None)
else:
@ -77,57 +77,211 @@ class WebTouch(ActiveTest):
run_passives = True
description = "Make a GET request to the specified URL, and check for a 200 response after resolving redirects."
def do_test(self, url):
response = requests.get(url)
sess = self.sessions[self.url]
response = sess.get(url)
if response.status_code == 200:
result = self.result("Pass", "The request returned an HTTP 200 response.", None)
else:
result = self.result("Fail", "The response code was %s" % response.status_code, None)
return (result, response)
class StsUpgradeCheck(ActiveTest):
class StsPresentCheck(ActiveTest):
insecure_only = False
run_passives = True
description = "Inspect the second response in the Strict-Transport-Security redirect process according to http://tools.ietf.org/html/draft-hodges-strict-transport-sec"
events = {}
def do_test(self, url):
stsheader = "Strict-Transport-Security"
#XXX hack: we should take response isntead
url = url.replace('http:', 'https:')
#XXX end of hack
sess = self.sessions[self.url]
response = sess.get(url, allow_redirects=False)
if stsheader in response.headers:
result = self.result('Pass', 'Subsequential HTTPS Response for STS contained corresponding STS header', None)
else:
result = self.result('Fail', 'Subsequential HTTPS Response did not contain STS header', None)
return (result, response)
class StsRedirectCheck(ActiveTest):
insecure_only = True
run_passives = False
description = "Inspect the Strict-Transport-Security redirect process according to http://tools.ietf.org/html/draft-hodges-strict-transport-sec"
run_passives = True
description = "Inspect the first response in the Strict-Transport-Security redirect process according to http://tools.ietf.org/html/draft-hodges-strict-transport-sec"
events = { "Pass": StsPresentCheck,
"Error": None,
"Fail": None }
def do_test(self, url):
stsheader = "Strict-Transport-Security"
u = urlparse(url)
if u.scheme == "http":
correct_header = False
sess = self.sessions[self.url]
response = sess.get(url, allow_redirects=False)
invalid_header = stsheader in response.headers
is_redirect = response.status_code == 301
bad_redirect = False
response1 = get_url(url, False)
invalid_header = stsheader in response1.headers
is_redirect = response1.status_code == 301
if is_redirect == True:
redirect = response1.headers["location"]
r = urlparse(redirect)
if r.scheme == "https":
response2 = get_url(redirect, False)
correct_header = stsheader in response2.headers
redirect = response.headers['location']
r = urlparse(redirect) #XXX do we need to check for same-domain? see sts draft!
if r.scheme != 'https':
pass
else:
bad_redirect = True
success = invalid_header == False and is_redirect == True and correct_header == True
if success == True:
#continue w/ Pass to see if next location contains stsheader?
next_test = (invalid_header == False) and (is_redirect == True) and (bad_redirect == False)
if next_test == True:
message = "The STS upgrade occurs properly (no STS header on HTTP, a 301 redirect, and an STS header in the subsequent request."
else:
message = "%s%s%s%s" % (
message = "%s%s%s" % (
"The initial HTTP response included an STS header (RFC violation)." if invalid_header else "",
"" if is_redirect else "The initial HTTP response should be a 301 redirect (RFC violation see ).",
"" if correct_header else "The followup to the 301 redirect must include the STS header.",
"The 301 location must use the https scheme." if bad_redirect else ""
)
result = self.result("Pass" if success else "Fail", message, None)
return (result, response1)
result = self.result('Pass' if next_test else 'Fail', message, None)
return (result, response)
else:
#XXX maybe just /change/ the scheme to enforce checking?
result = self.result('Skip', 'Not checking for STS-Upgrade on already-secure connection', None)
return result, None
class CSPPolicyCheck(ActiveTest):
insecure_only = False
run_passives = True
description = 'checks if the policy is present'
def do_test(self, url, pred):
cspheader = "X-Content-Security-Policy"
csproheader = 'X-Content-Security-Policy-Report-Only'
response = pred['response']
if cspheader in response.headers or csproheader in response.headers:
if cspheader in response.headers:
h = response.headers[cspheader]
elif csproheader in response.headers:
h = response.headers[csproheader]
harr = h.split(' ')
if harr[0].lower() == 'policy-uri':
url = urljoin(response.url, harr[1]) # join previous URL with the one coming from the header
sess = self.sessions[self.url]
resp = sess.get(url) # allow_redirects=False?
if resp.status_code == 200:
result = self.result('Pass', 'Policy file present', resp)
else:
result = self.result('Fail', 'Policy file not found', resp)
return result, resp
class CSPHeaderCheck(ActiveTest):
# please revise after another readthrough of https://wiki.mozilla.org/Security/CSP/Specification#Sample_Policy_Definitions necessary
insecure_only = False
run_passives = True
description = "Checks if the CSP Header is present and links to a policy. If it does, we will forward to another test to check if it present"
events = {'Pass': CSPPolicyCheck}
def do_test(self, url):
cspheader = "X-Content-Security-Policy"
csproheader = 'X-Content-Security-Policy-Report-Only'
#x-content-security-policy-report-only: policy-uri /services/csp/policy?build=1919
sess = self.sessions[self.url]
response = sess.get(url, allow_redirects=False)
if cspheader in response.headers or csproheader in response.headers:
if cspheader in response.headers:
h = response.headers[cspheader]
elif csproheader in response.headers:
h = response.headers[csproheader]
harr = h.split(' ')
if harr[0].lower() == 'policy-uri':
result = self.result('Pass', 'CSP Header present and points to a policy file', None)
#XXX is this line correct here??
else:
result = self.result('Fail', 'No %s or %s in headers' % (cspheader, csproheader), None)
return (result, response)
class HttpsLoginForm(HtmlTest):
description = "Check that html forms with password-type inputs point to https"
def analyze_html(self, response, soup):
url = urlparse(response.url)
forms = soup.findAll('form')
# look only at those form elements that have password type input elements as children
forms = filter(lambda x: x.findChildren("input", type="password") ,forms)
if len(forms) == 0:
result = self.result("Skip", "There are no login forms on this page", None)
return result
failforms = []
for form in forms:
if url.scheme == "https":
if form['action'].startswith('http:'):
failforms.append(form)
else:
if not form['action'].startswith('https'):
failforms.append(form)
if len(failforms) == 0:
result = self.result("Pass", "All login forms point to secure resources", forms)
else:
result = self.result("Fail", "There are login forms pointing to insecure locations", failforms)
return result
class HttpsResourceOnHttpsLink(HtmlTest):
# also called 'mixed content'
description = "Check if all external resources are pointing to https links, when on https page"
secure_only = True
def analyze_html(self, response, soup):
''' there is a list on stackoverflow[1] which claims to contain
all possible attributes hat may carry a URL. is
there a way to confirm this list is exhaustive?
I have removed attributes which are just links/pointers,
we only want those attributes to resources, the browser
downloads automatically
[1] http://stackoverflow.com/questions/2725156/complete-list-of-html-tag-attributes-which-have-a-url-value/2725168#2725168
'''
attrlist = ['codebase', 'background', 'src', 'usemap', 'data', 'icon', 'manifest', 'poster', 'archive']
failtags = []
for tag in soup.findAll(True):
for attr in attrlist:
if tag.has_key(attr):
val = tag[attr]
if val.startswith('http:'):
failtags.append(tag)
if len(failtags) == 0:
result = self.result("Pass", "All external resources are https", None)
else:
result = self.result("Fail", "There are links to insecure locations", failtags)
return result
class InlineJS(HtmlTest):
description = "complain about inline JS to improve migration to CSP"
def analyze_html(self, response, soup):
url = urlparse(response.url)
scripts = soup.findAll('script')
if len(scripts) == 0:
result = self.result ("Skip", "There are no script tags.", None)
return result
inlinescripts = filter(lambda x: len(x.text) > 0, scripts)
if len(inlinescripts) == 0:
result = self.result("Pass", "No inline JavaScript found", None)
else:
result = self.result("Fail", "Inline JavaScript found", inlinescripts)
return result
def configure(scanner):
if isinstance(scanner, Scanner) == False:
raise Exception("Cannot configure a non-scanner object!")
scanner.register_check(Http200Check())
scanner.register_check(WebTouch())
scanner.register_check(StrictTransportSecurityPresent())
scanner.register_check(XFrameOptionsPresent())
scanner.register_check(StsUpgradeCheck())
scanner.register_check(HttpOnlyAttributePresent())
scanner.register_check(SecureAttributePresent())
scanner.register_check(Http200Check)
scanner.register_check(WebTouch)
scanner.register_check(StrictTransportSecurityPresent)
scanner.register_check(XFrameOptionsPresent)
scanner.register_check(StsRedirectCheck)
scanner.register_check(HttpOnlyAttributePresent)
scanner.register_check(SecureAttributePresent)
scanner.register_check(HttpsLoginForm)
scanner.register_check(HttpsResourceOnHttpsLink)
scanner.register_check(InlineJS)
scanner.register_check(CSPHeaderCheck)

Просмотреть файл

@ -12,6 +12,7 @@ def main():
parser = argparse.ArgumentParser("Runs a set of tests against the set of provided URLs")
parser.add_argument("-u", "--url", action="append", dest="targets", help="Add a target to test")
parser.add_argument("-f", "--target-file", action="append", dest="target_files", help="File with URLs to test")
parser.add_argument("-S", "--new-sessions", action="store_true", default=False, dest="new_sessions", help="Create new Session for each test")
parser.add_argument("-m", "--module", action="append", default = [], dest="modules", help="Load an extension module")
parser.add_argument("-D", "--disable-core", action="store_true", default = False, dest="disable_core", help="Disable corechecks")
@ -22,19 +23,28 @@ def main():
parser.add_argument("-c", "--check", action="append", dest="opts", help="Set a parameter for a check (check:opt=value)" )
parser.add_argument("-e", "--exclude", action="append", dest="exclusions", help="Prevent a check from being run/processed")
parser.add_argument("--save", action="store", dest="dump_path", help="Write out a configuration file based on parameters (won't run scan)")
args = parser.parse_args()
scanner = Scanner()
scanner.force_passives = args.force_passives
scanner.resolve_target = args.resolve_target
scanner.output = args.output
# Configure new-ssion item
if args.new_sessions:
Scanner.logger.info('Enforcing new sessions for each test')
# for each test, not for
ActiveTest.new_session = True
# Start building target list.
if args.targets != None:
for target in args.targets:
scanner.register_target(target)
# Add targets from files to the list.
if args.target_files != None:
for targets in args.target_files:
@ -46,13 +56,13 @@ def main():
scanner.register_target(t)
except:
Scanner.logger.error("Unable to process the target list in: %s", targets)
# Load built-in modules if required.
if args.disable_core == False:
corechecks.configure(scanner)
# Configure modules.
# TODO: change the module loading to scan the list of classes in a module and automagically
# TODO: change the module loading to scan the list of classes in a module and automagically
# detect any tests defined.
if args.modules != None:
for module in args.modules:
@ -63,7 +73,7 @@ def main():
except Exception, e:
Scanner.logger.fatal("Unable to load the requested module [%s]: %s", module, e)
quit()
# Set up the reporter (allow it to load from modules that are configured)
try:
reporter = args.report.split('.')
@ -77,12 +87,12 @@ def main():
except Exception, e:
Scanner.logger.fatal("Unable to use the reporter class [%s]: %s", args.report, e)
quit()
# Disable excluded checks.
if args.exclusions != None:
for exclude in args.exclusions:
scanner.disable_check(exclude)
# Configure checks
if args.opts != None:
for opt in args.opts:
@ -92,13 +102,13 @@ def main():
scanner.configure_check(check, key, value)
except Exception, e:
Scanner.logger.fatal("Invalid check option: %s (%s)", opt, e)
if args.dump_path != None:
scanner.save_configuration(args.dump_path)
return
scanner.run_scan()
if __name__ == "__main__":
main()

Просмотреть файл

@ -4,57 +4,57 @@ class Reporter():
reporters = {}
def start_report(self):
return None
def start_targets(self):
return None
def write_target(self, target):
return None
def start_actives(self):
return None
def write_active(self, test):
return None
def start_passives(self):
return None
def write_passive(self, target):
return None
def end_passives(self):
return None
def end_actives(self):
return None
def end_targets(self):
return None
def end_report(self):
return "This reporter is unimplemented!"
class DetailReporter(Reporter):
# TODO Implement detailed reporter
def end_report(self):
return "This reporter should emit an XML report that includes all of the the details for each test, including captured data"
class AntXmlReporter(Reporter):
def __init__(self):
self.report = ""
self.errtypes = { 'Error' : "error", 'Fail' : "failure", 'Skip' : "skipped"}
def start_report(self):
self.report = '<?xml version="1.0" encoding="utf-8"?>\n'
return None
def start_targets(self):
self.report += "<testsuites>\n"
return None
def write_target(self, target):
self.states = {}
self.states["Skip"] = 0
@ -65,25 +65,24 @@ class AntXmlReporter(Reporter):
self.current_target = target
self.lines = ""
return None
def start_actives(self):
return None
def write_active(self, test, result):
self.states[result["state"]] += 1
self.checks += 1
print test
module, check = ("%s" % test ).split('.')[-2:]
self.lines += '\t\t<testcase classname="%s" name="%s" time="%s"' % (module, check, result["duration"])
if result["state"] == "Pass":
self.lines += " />\n"
else:
self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
else:
self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
return None
def start_passives(self):
return None
def write_passive(self, test, result):
self.states[result["state"]] += 1
self.checks += 1
@ -91,25 +90,25 @@ class AntXmlReporter(Reporter):
self.lines += '\t\t<testcase classname="%s" name="%s" time="%s"' % (module, check, result["duration"])
if result["state"] == "Pass":
self.lines += " />\n"
else:
else:
self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
return None
def end_passives(self):
return None
def end_actives(self):
self.report+= '\t<testsuite name="{target}" errors="{errors}" failures="{failures}" skips="{skips}" tests="{checks}" time="{duration}">\n{lines}\t</testsuite>\n'.format(
target = self.current_target, errors=self.states["Error"], failures = self.states["Fail"],
skips = self.states["Skip"], checks = self.checks, duration=100, lines=self.lines)
return None
def end_targets(self):
self.report += "</testsuites>\n"
return None
def end_report(self):
return self.report
Reporter.reporters['xml'] = AntXmlReporter()

Просмотреть файл

@ -1,78 +1,129 @@
from datetime import datetime
from reporter import Reporter
from urlparse import urlparse
from BeautifulSoup import BeautifulSoup
import ConfigParser
import logging
import requests
import socket
import traceback
from inspect import getargspec
import subprocess
import json
def clean_headers(self, response_headers):
headers = {}
for head in response_headers:
lst = head.strip(" \r\n").split(":")
headers[lst[0]] = lst[1].strip()
return headers
def get_url(url, status = True):
r = requests.get(url, allow_redirects = False)
if status:
r.raise_for_status()
return r
def exec_helper(cmd, args=None):
'''Use this function to call helpers, not to perform checks!
The example use-case would a Testcase that required to get an
authentication tokenout of a mailbox to complete a login procedure.
The email-fetching would be done as a helper
'''
if not args:
params = cmd
else:
params = [cmd] + args # becomes [cmd, arg1, arg2]
try:
output = subprocess.Popen(params, stdout=subprocess.PIPE).communicate()[0] # use Popen instead of subprocess.get_output for Python2.6 compatibility
try:
res = json.loads(output)
except ValueError:
return {"result":"Fail", "message":"Invalid JSON data", "data":""}
if 'result' in res and 'message' in res and 'data' in res:
return res
else:
return {"result":"Fail", "message":"Incomplete JSON data. Your helper should return a dict with the keys result, message and data.", "data":""}
except subprocess.CalledProcessError as e: # raised when? for Popen?
return {"result":"Fail", "message":"The helper script returned with a non-zero returnvalue", "data": e.output}
class PassiveTest():
secure_only = False
insecure_only = False
def analyze(self, response, results):
return None
def result(self, state, message, data):
return {'state' : state, 'message' : message, 'data' : data }
class ActiveTest():
class ActiveTest():
new_session = False # enable (e.g. from cli) to enforce new session generation
secure_only = False
insecure_only = False
run_passives = True
description = "The base class for an Active Test."
sessions = {}
def __init__(self):
if hasattr(self, "setup"):
self.setup()
def execute(self, url):
try:
result = self.do_test(url)
#def get_url(self, url, status = True):
# try:
# sess = self.sessions[self.url]
# except KeyError:
# sess = requests.session()
# #print "Issue request towards %s using %s" % (url, sess.cookies)
# r = sess.get(url, allow_redirects = False)
# print url, r.status_code, status
# if status:
# r.raise_for_status()
# return r
def execute(self, url, predecessor=None):
self.url = url
if self.url not in self.sessions or self.new_session:
self.sessions[url] = requests.session() # Create per-target session
try:
if "pred" in getargspec(self.do_test).args:
resulttuple = self.do_test(url, predecessor)
else:
resulttuple = self.do_test(url)
except Exception, e:
tb = traceback.format_exc()
result = (ActiveTest().result("Error", e, tb), None)
return result
resulttuple = (ActiveTest().result("Error", e, tb), None)
return resulttuple
def result(self, state, message, data):
return { 'state' : state, 'message' : message, 'data' : data, 'passive' : {}}
class HtmlTest(PassiveTest):
description = 'allow easy analysis of html source code'
def analyze(self, response):
if 'text/html' in response.headers['content-type']:
soup = BeautifulSoup(response.content)
return self.analyze_html(response, soup)
else:
result = self.result("Skip", "Content-type is not html "+ response.headers['content-type'], None)
return result
def analyze_html(self, response, soup):
""" implement this method in subclass"""
pass
class Scanner():
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s')
logger = logging.getLogger("Garmr-Scanner")
logger.setLevel(logging.DEBUG)
def __init__(self):
self.resolve_target = True
self.force_passives = False
self._passive_tests_ = {}
self._active_tests_ = {}
self._disabled_tests_ = []
self._passive_tests_ = []
self._active_tests_ = []
self._finished_active_tests_ = []
self._targets_ = []
self._protos_ = ["http", "https"]
Scanner.logger.debug("Scanner initialized.")
self.reporter = Reporter()
self.modules = []
def do_passive_scan(self, passive, is_ssl, response):
if passive.secure_only and not is_ssl:
Scanner.logger.debug("\t\t[%s] Skip Test invalid for http scheme" % passive.__class__)
def do_passive_scan(self, passiveclass, is_ssl, response):
if passiveclass.secure_only and not is_ssl:
Scanner.logger.debug("\t\t[%s] Skip Test invalid for http scheme" % passiveclass)
passive_result = PassiveTest().result("Skip", "This check is only applicable to SSL requests.", None)
start = datetime.now()
passive_result['start'] = start
@ -80,65 +131,91 @@ class Scanner():
passive_result["duration"] = 0
else:
start = datetime.now()
passive = passiveclass()
passive_result = passive.analyze(response)
end = datetime.now()
td = end - start
passive_result['start'] = start
passive_result['end'] = end
passive_result['duration'] = float((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
Scanner.logger.info("\t\t[%s] %s %s" % (passive.__class__, passive_result['state'], passive_result['message']))
Scanner.logger.info("\t\t[%s] %s %s" % (passiveclass, passive_result['state'], passive_result['message']))
return passive_result
def do_active_scan(self, test, is_ssl, target):
if (test.secure_only and not is_ssl):
Scanner.logger.info("\t[Skip] [%s] (reason: secure_only)" % test.__class__)
def do_active_scan(self, testclass, is_ssl, target):
''' instantiate the class and run it against the specified target, if applicable '''
if (testclass.secure_only and not is_ssl):
Scanner.logger.info("\t[Skip] [%s] (reason: secure_only)" % testclass)
result = ActiveTest().result("Skip", "This check is only applicable to SSL requests", None)
result['start'] = datetime.now()
result['end'] = result['start']
result['duration'] = 0
return result
elif (test.insecure_only and is_ssl):
Scanner.logger.info("\t[Skip] [%s] (reason: insecure_only)" % test.__class__)
elif (testclass.insecure_only and is_ssl):
Scanner.logger.info("\t[Skip] [%s] (reason: insecure_only)" % testclass)
result = ActiveTest().result("Skip", "This check is only applicable to SSL requests", None)
result['start'] = datetime.now()
result['end'] = result['start']
result['duration'] = 0
return result
elif str(testclass).split('.')[-1] in self._disabled_tests_:
Scanner.logger.info("\t[Skip] [%s] (reason: disabled)" % testclass)
result = ActiveTest().result("Skip", "This check was marked as disabled.", None)
result['start'] = datetime.now()
result['end'] = result['start']
result['duration'] = 0
return result
start = datetime.now()
result, response = test.execute(target)
test = testclass() # from now on we have an instance of the class
if "pred" in getargspec(test.do_test).args:
# Check if class accepts this parameter. avoids rewriting.
predecessor_results = self.results[self._finished_active_tests_[-1]]
result, response = test.execute(target, predecessor=predecessor_results)
else:
result, response = test.execute(target)
end = datetime.now()
td = end - start
result['response'] = response
result['start'] = start
result['end'] = end
result['duration'] = float((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
Scanner.logger.info("\t[%s] %s %s" % (test.__class__, result['state'], result['message']))
self.reporter.write_active(test.__class__, result)
Scanner.logger.info("\t[%s] %s %s" % (testclass, result['state'], result['message']))
self.reporter.write_active(testclass, result)
if (result['state'] == "Error"):
Scanner.logger.error(result['data'])
if response != None and test.run_passives:
result['passive'] = {}
self.reporter.start_passives()
for passive_key in self._passive_tests_.keys():
passive = self._passive_tests_[passive_key]["test"]
result["passive"][passive.__class__] = self.do_passive_scan(passive, is_ssl, response)
self.reporter.write_passive(passive.__class__, result["passive"][passive.__class__])
for passive_testclass in self._passive_tests_:
result["passive"][passive_testclass] = self.do_passive_scan(passive_testclass, is_ssl, response)
self.reporter.write_passive(passive_testclass, result["passive"][passive_testclass])
self.reporter.end_passives()
return result
def scan_target(self, target):
''' iterate over registered tests and deligate for scan '''
self.reporter.write_target(target)
Scanner.logger.info("[%s] scanning:" % target)
url = urlparse(target)
is_ssl = url.scheme == "https"
results = {}
self.results = {}
self.reporter.start_actives()
for key in self._active_tests_.keys():
test = self._active_tests_[key]["test"]
results[test.__class__] = self.do_active_scan(test, is_ssl, target)
self.active_tests_stack = self._active_tests_
while len(self.active_tests_stack) > 0:
testclass = self.active_tests_stack[0]
self.active_tests_stack = self.active_tests_stack[1:]
self.results[testclass] = self.do_active_scan(testclass, is_ssl, target)
if hasattr(testclass, 'events'): #TODO enforce every test to have event dict present?
events_lower = dict([(k.lower(),v) for k,v in testclass.events.items()])
if self.results[testclass]['state'].lower() in events_lower and events_lower[self.results[testclass]['state'].lower()] != None:
nexttest = events_lower[self.results[testclass]['state'].lower()]
Scanner.logger.info("\t[%s] Instantiated because %s declares it as its successor (the event was '%s')" % (nexttest, testclass, self.results[testclass]['state']))
self.active_tests_stack.append(nexttest) # we have to hand over the response!!1, # important: we hand over an instance, not the class
self._finished_active_tests_.append(testclass)
self.reporter.end_actives()
return results
return self.results
def run_scan(self):
''' iterate over target and deligate to list of tests '''
results = {}
self.reporter.start_report()
self.reporter.start_targets()
@ -155,12 +232,13 @@ class Scanner():
file.write(self.reporter.end_report())
file.close()
def register_target(self, url):
''' add target to the scanning engine '''
u = urlparse(url)
valid = u.netloc != "" and u.scheme in self._protos_
reason = "%s%s" % ("[bad netloc]" if u.netloc == "" else "", "" if u.scheme in self._protos_ else "[bad scheme]")
# todo - support ipv6 urls
host = u.netloc.split(':')[0]
if (self.resolve_target):
@ -176,12 +254,14 @@ class Scanner():
Scanner.logger.debug("[target]: %s" % url)
return
Scanner.logger.error("%s is not a valid target (reason: %s)" % (url, reason))
def configure_check(self, check_name, key, value):
if self._active_tests_.has_key(check_name):
check = self._active_tests_[check_name]["test"]
elif self._passive_tests_.has_key(check_name):
check = self._passive_tests_[check_name]["test"]
if check_name in map(lambda x: str(x), self._active_tests_):
index = map(lambda x: str(x), self._active_tests_).index(check_name)
check = self._active_tests_[index]
if check_name in map(lambda x: str(x), self._passive_tests_):
index = map(lambda x: str(x), self._active_tests_).index(check_name)
check = self._active_tests_[index]
else:
raise Exception("The requested check is not available (%s)" % check_name)
if hasattr(check, "config") == False:
@ -190,34 +270,36 @@ class Scanner():
raise Exception("%s is not a valid configuration for %s", key, check_name)
check.config[key] = value
Scanner.logger.debug("\t%s.%s=%s" % (check_name, key, value))
def disable_check(self, check_name):
if self._active_tests_.has_key(check_name):
self._active_tests_[check_name]["enabled"] = False
elif self._passive_tests_.has_key(check_name):
self._passive_tests_[check_name]["enabled"] = False
''' add a previously added test to a blacklist of test that are to be skipped '''
if check_name in map(lambda x: str(x).split('.')[-1], self._active_tests_) or check_name in map(lambda x: str(x).split('.')[-1], self._passive_tests_):
self._disabled_tests_.append(check_name)
Scanner.logger.debug("\t%s disabled.", check_name)
else:
raise Exception("The requested check is not available (%s)" % check_name)
Scanner.logger.debug("\t%s disabled.", check_name)
print "The requested check is not available (%s)" % check_name
print "The list of available checks is %s for actives and %s for passives" % (map(lambda x: str(x).split('.')[-1], self._active_tests_), map(lambda x: str(x).split('.')[-1], self._passive_tests_))
Scanner.logger.debug("\t%s NOT disabled, because it could not be found.", check_name)
def register_check(self, test):
module = test.__class__.__module__
''' add a test to the scanner '''
module = test.__module__
if module not in self.modules:
self.modules.append(module)
key = "%s" % test.__class__
if isinstance(test, ActiveTest):
self._active_tests_[key]= { "test" : test , "enabled" : True}
Scanner.logger.debug("Added %s to active tests." % test.__class__)
if hasattr(test, "execute"):
self._active_tests_.append( test)
Scanner.logger.debug("Added %s to active tests." % test)
return len(self._active_tests_)
if isinstance(test, PassiveTest):
self._passive_tests_[key]= { "test" : test, "enabled" : True}
Scanner.logger.debug("Added %s to passive tests." % test.__class__)
if hasattr(test, "analyze"):
self._passive_tests_.append( test)
Scanner.logger.debug("Added %s to passive tests." % test)
return len(self._passive_tests_)
raise Exception('test is not a valid test type')
def save_configuration(self, path):
pass #XXX defunct
# write out a configuration file.
config = ConfigParser.RawConfigParser()
config.add_section("Garmr")
@ -226,29 +308,36 @@ class Scanner():
config.set("Garmr", "reporter", self.reporter.__class__)
config.set("Garmr", "output", self.output)
config.set("Garmr", "dns", self.resolve_target)
if len(self._targets_) > 0:
config.add_section("Targets")
i = 0
for target in self._targets_:
for i,target in enumerate(self._targets_):
config.set("Targets", "%s"%i, target)
for check in self._active_tests_.keys():
config.add_section(check)
config.set(check, "enabled", self._active_tests_[check]["enabled"])
if hasattr(self._active_tests_[check]["test"], "config"):
for key in self._active_tests_[check]["test"].config.keys():
config.set(check, key, self._active_tests_[check]["test"].config[key])
for check in self._passive_tests_.keys():
config.add_section(check)
config.set(check, "enabled", self._passive_tests_[check]["enabled"])
if hasattr(self._passive_tests_[check]["test"], "config"):
for key in self._passive_tests_[check]["test"].config.keys():
config.set(check, key, self._passive_tests_[check]["test"].config[key])
for index, check in enumerate(self._active_tests_):
check = str(check)
config.add_section(check)
if check not in self._disabled_tests_:
config.set(check, "enabled", True)
else:
config.set(check, "enabled", False)
if hasattr(self._active_tests_[index], "config"):
for key in self._active_tests_[index].config.keys():
config.set(check, key, self._active_tests_[index].config[key])
for index, check in enumerate(self._passive_tests_):
check = str(check)
config.add_section(str(check))
if check not in self._disabled_tests_:
config.set(check, "enabled", True)
else:
config.set(check, "enabled", False)
if hasattr(self._passive_tests_[index], "config"):
for key in self._passive_tests_[index].config.keys():
config.set(check, key, self._passive_tests_[index].config[key])
with open(path, 'w') as configfile:
config.write(configfile)

105
README.md
Просмотреть файл

@ -3,67 +3,64 @@
Garmr is a tool to inspect the responses from websites for basic security requirements.
Garmr includes a set of core test cases implemented in corechecks that are derived from
the Secure Coding Guidelines that can be found at [https://wiki.mozilla.org/WebAppSec/Secure_Coding_Guidelines]
the [Mozilla Secure Coding Guidelines](https://wiki.mozilla.org/WebAppSec/Secure_Coding_Guidelines)
## Installation
This version of Garmr requires Requests > 0.6.1
This version of Garmr requires Requests > 0.8.3
git clone https://github.com/ygjb/Garmr.git
cd Garmr
sudo python setup.py install
garmr -u http://my.target.app
git clone https://github.com/freddyb/Garmr.git
cd Garmr
sudo python setup.py install
garmr -u http://my.target.app
## Usage
usage: Runs a set of tests against the set of provided URLs
[-h] [-u TARGETS] [-f TARGET_FILES] [-m MODULES] [-D] [-p] [-d]
usage: Runs a set of tests against the set of provided URLs
[-h] [-u TARGETS] [-f TARGET_FILES] [-S] [-m MODULES] [-D] [-p] [-d]
[-r REPORT] [-o OUTPUT] [-c OPTS] [-e EXCLUSIONS] [--save DUMP_PATH]
optional arguments:
-h, --help show this help message and exit
-u TARGETS, --url TARGETS
Add a target to test
-f TARGET_FILES, --target-file TARGET_FILES
File with URLs to test
-m MODULES, --module MODULES
Load an extension module
-D, --disable-core Disable corechecks
-p, --force-passive Force passives to be run for each active test
-d, --dns Skip DNS resolution when registering a target
-r REPORT, --report REPORT
Load a reporter e.g. -r reporter.AntXmlReporter
-o OUTPUT, --output OUTPUT
Default output is garmr-results.xml
-c OPTS, --check OPTS
Set a parameter for a check (check:opt=value)
-e EXCLUSIONS, --exclude EXCLUSIONS
Prevent a check from being run/processed
--save DUMP_PATH Write out a configuration file based on parameters
(won't run scan)
A TARGET is an http or https scheme url to execute tests against.
e.g. garmr -u http://localhost
A MODULE is the name of a module; resolving this path needs to be improved
e.g. garmr -m djangochecks
An OPTS field contains the path and name of the option to set
e.g. garmr -m webchecks -c webchecks.RobotsTest:save_contents=True
A REPORT is the namespace qualified name of a reporter object or a valid alias (xml is the only current valid alias, and the default)
e.g. garmr -r xml
An EXCLUSION prevents a check from being executed
e.g. garmr -e Garmr.corechecks.WebTouch
Disable core checks will prevent all of the checks in corechecks from being loaded; this is useful to limit the scope of testing.
optional arguments:
-h, --help show this help message and exit
-u TARGETS, --url TARGETS
Add a target to test
-f TARGET_FILES, --target-file TARGET_FILES
File with URLs to test
-S, --new-sessions Create new Session for each test
-m MODULES, --module MODULES
Load an extension module
-D, --disable-core Disable corechecks
-p, --force-passive Force passives to be run for each active test
-d, --dns Skip DNS resolution when registering a target
-r REPORT, --report REPORT
Load a reporter e.g. -r reporter.AntXmlReporter
-o OUTPUT, --output OUTPUT
Default output is garmr-results.xml
-c OPTS, --check OPTS
Set a parameter for a check (check:opt=value)
-e EXCLUSIONS, --exclude EXCLUSIONS
Prevent a check from being run/processed
--save DUMP_PATH Write out a configuration file based on parameters
(won't run scan)
A TARGET is an http or https scheme url to execute tests against.
e.g. garmr -u http://localhost
A MODULE is the name of a module; resolving this path needs to be improved
e.g. garmr -m djangochecks (Experimental)
An OPTS field contains the path and name of the option to set
e.g. garmr -m webchecks -c webchecks.RobotsTest:save_contents=True
A REPORT is the namespace qualified name of a reporter object or a valid alias (xml is the only current valid alias, and the default)
e.g. garmr -r xml
An EXCLUSION prevents a check from being executed
e.g. garmr -e WebTouch
Disable core checks will prevent all of the checks in corechecks from being loaded; this is useful to limit the scope of testing.
## Tasks
* less noisy CLI
* proxy support (already supported in requests)
* sessions (controlled; sequence for active tests, with a cookie jar that is propagated through the session)
* detailed reporting, including the ability to record all HTTP requests and responses generated
* the ability to filter which passive checks are run by check name or by check type (i.e. cookies, headers, content-type, etc)
* support for additional protocols (websockets, spdy)
* Implement instances of each test case for each target scanned to allow them to retain state as a set of tests progresses.
See [Issues on Github](https://github.com/freddyb/garmr/issues)

58
authchecks.py Normal file
Просмотреть файл

@ -0,0 +1,58 @@
from urlparse import urlparse
import requests
from Garmr.scanner import ActiveTest, PassiveTest, Scanner
class SessionTest(ActiveTest):
pass
class CaptchaTest(ActiveTest):
pass
class LoginTest(ActiveTest):
'''04:43:14 PM) Yvan Boily: so here is an example; this provides a basic configurable authentication check; the username and password fields are configurable, as are the username and password.
the post data is assembled using a built in format that will work with many authentication forms, and the test assumes (naively) that a 200 response is a successful login
(04:43:27 PM) Yvan Boily: (the description needs to be updated :P)
(04:44:22 PM) Yvan Boily: the do_test method on ActiveTest could be simply extended to accept a second and third paramter:
(04:44:43 PM) Yvan Boily: def do_test(self, url, state, preserve):
(04:45:25 PM) Yvan Boily: the state object would be passed in, and the preserve parameter indicates that the test should not modify the state object if it is set to true
(04:45:30 PM) freddy: we could just change it to do_test(self, url, *args) and be more precise in the subclass
(04:46:14 PM) Yvan Boily: it is possible to do that, but I am not a fan of that style. I don't have a better argument than that, so if you want to go that route, feel free :D'''
run_passives = True
description = "check if login works"
config = {
"uid_field" : "username",
"pwd_field" : "password",
"username" : "admin",
"password" : "admin",
"format" : "%s=%s&%s=%s"
}
# eventing needs to be implemented
events = { "Pass": SessionTest,
"Error": CaptchaTest,
"Fail": CaptchaTest }
def do_test(self, url):
u = urlparse(url)
post_data = config['format'] % (config["uid_field"] , config["username"], config["pwd_field"], config["password"])
response = requests.post(url, post_data)
if "Login successful" in response.content:
# scrape response for indicators of a successful login
result = self.result("Pass", "Authentication was successful", None)
else:
result = self.result("Fail", "Authentication failed", None)
return (result, response)
def configure(scanner):
#if isinstance(scanner, Scanner) == False:
# raise Exception("Cannot configure a non-scanner object!")
scanner.register_check(LoginTest())

Просмотреть файл

@ -1,16 +1,17 @@
from urlparse import urlparse
import requests
from Garmr.scanner import ActiveTest, PassiveTest, Scanner, get_url
from Garmr.scanner import ActiveTest, PassiveTest, Scanner, HtmlTest
class AdminAvailable(ActiveTest):
run_passives = True
config = {"path" : "admin"}
def do_test(self, url):
u = urlparse(url)
adminurl="%s://%s/%s" % (u.scheme, u.netloc, self.config["path"])
response = requests.get(adminurl)
sess = self.sessions[self.url]
response = sess.get(adminurl)
if response.status_code == 200:
result = self.result("Pass", "Django admin page is present at %s." % adminurl, response.content)
else:
@ -18,8 +19,31 @@ class AdminAvailable(ActiveTest):
return (result, response);
class ProvokeError404(ActiveTest):
run_passives = True # we need IsDebugModeReallyEnabled
def do_test(self, url):
sess = self.sessions(url)
url += '76976cd1a3cbadaf77533a' #random garbage
response = sess.get(url)
result = self.result('Skip', 'This test cannot Pass or Fail, because it relies on the subsequent passive IsDebugModeReallyEnabled test', response)
return result, response
class IsDebugModeReallyEnabled(HtmlTest):
description = ''
secure_only = False
def analyze_html(self, response, soup):
# we dont really analye the soup, but that's ok;p
error_str = "You're seeing this error because you have" #from django source django/views/debug.py - maybe subject to change
if error_str in response.content:
result = self.result('Fail', 'Typical string of echnical 404/500 error page found', None)
else:
result = self.result('Pass', 'Debug strings not found', response)
return result
def configure(scanner):
if isinstance(scanner, Scanner) == False:
raise Exception("Cannot configure a non-scanner object!")
raise Exception("Cannot configure a non-scanner object!")
scanner.register_check(AdminAvailable())

Просмотреть файл

@ -1,6 +1,6 @@
from urlparse import urlparse
import requests
from Garmr.scanner import ActiveTest, PassiveTest, Scanner, get_url
from Garmr.scanner import ActiveTest, PassiveTest, Scanner
class RobotsTest(ActiveTest):
@ -10,9 +10,10 @@ class RobotsTest(ActiveTest):
def do_test(self, url):
u = urlparse(url)
roboturl="%s://%s/robots.txt" % (u.scheme, u.netloc)
response = requests.get(roboturl)
sess = self.sessions[self.url]
response = sess.get(roboturl)
if response.status_code == 200:
result = self.result("Pass", "A robots.txt file is present on the server",
result = self.result("Pass", "A robots.txt file is present on the server",
response.content if self.config["save_contents"].lower() == "true" else None)
else:
result = self.result("Fail", "No robots.txt file was found.", None)