Initial commit of the HTTP Observatory
# Mozilla HTTP Observatory
A set of tools to scan your websites for basic web hygeine. This project is undergoing heavy development and is not yet suitable for production.
## Authors
* April King
## License
* Mozilla Public License Version 2.0
from .database import *
__all__ = [
from contextlib import contextmanager
from json import dumps
import psycopg2
import psycopg2.extras
import psycopg2.pool
import scanner.analyzer
# Create a psycopg2 connection pool
# TODO: pull credentials from environmental variable
pool = psycopg2.pool.SimpleConnectionPool(1, 32, database='http_observatory')
def get_cursor():
conn = pool.getconn()
yield conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
def insert_scan_grade(scan_id, grade):
with get_cursor() as cur:
cur.execute("""UPDATE scans
SET (grade, grade_reasons) =
(%s, %s)
WHERE id = %s
(grade['grade'], dumps(grade['grade_reasons']), scan_id))
return cur.fetchone()
def insert_scan(site_id) -> psycopg2.extras.DictRow:
with get_cursor() as cur:
cur.execute("""INSERT INTO scans (site_id, state, start_time, tests_quantity)
VALUES (%s, %s, NOW(), %s)
(site_id, STATE_STARTED, len(scanner.analyzer.__all__)))
return cur.fetchone()
def insert_test_result(site_id: int, scan_id: int, name: str, output: dict) -> psycopg2.extras.DictRow:
expectation = output.pop('expectation')
with get_cursor() as cur:
# First, let's get the scan from the scans table
cur.execute("""SELECT tests_completed, tests_passed, tests_failed, tests_quantity, state FROM scans
WHERE id=%s""", (scan_id,))
row = cur.fetchone()
# Increment the number of tests completed
tests_completed = row['tests_completed'] + 1
end_time = 'NULL'
# Set the proper state
state = row['state']
if state == STATE_STARTED:
elif tests_completed == row['tests_quantity']:
end_time = 'NOW()'
# Increment the tests passed/failed column
tests_passed = row['tests_passed'] + 1 if output['pass'] == True else row['tests_passed']
tests_failed = row['tests_failed'] + 1 if output['pass'] == False else row['tests_failed']
# Update the scans table
cur.execute("""UPDATE scans
SET (end_time, tests_completed, tests_failed, tests_passed, state) =
({0}, %s, %s, %s, %s)
WHERE id = %s""".format(end_time),
(tests_completed, tests_failed, tests_passed, state, scan_id))
# Add the test result to the database
cur.execute("""INSERT INTO tests (site_id, scan_id, name, expectation, output)
VALUES (%s, %s, %s, %s, %s)
(site_id, scan_id, name, expectation, dumps(output, indent=4, sort_keys=True)))
return cur.fetchone()
# TODO: Only look for successful scans?
def select_scan_recent_scan(site_id: int) -> psycopg2.extras.DictRow:
with get_cursor() as cur:
cur.execute("""SELECT * FROM scans
WHERE start_time >= NOW() - INTERVAL '1 day'
AND site_id = '%s'
ORDER BY start_time DESC
LIMIT 1""",
if cur.rowcount > 0:
return cur.fetchone()
return {}
def select_site_id(hostname: str) -> int:
# See if the site exists already
with get_cursor() as cur:
cur.execute("""SELECT id FROM sites
WHERE domain=(%s)
ORDER BY creation_time DESC
LIMIT 1""",
if cur.rowcount > 0:
return cur.fetchone()['id']
# If not, let's create the site
with get_cursor() as cur:
cur.execute("""INSERT INTO sites (domain, creation_time)
VALUES (%s, NOW())
RETURNING id""", (hostname,))
return cur.fetchone()['id']
def select_test_results(scan_id: int) -> dict:
tests = {}
with get_cursor() as cur:
cur.execute("SELECT * FROM tests WHERE scan_id = %s", (scan_id,))
# Grab every test and stuff it into the tests dictionary
if cur.rowcount > 1:
for test in cur:
expectation = test['expectation']
passed = test['output'].pop('pass')
result = test['output'].pop('result')
tests[test['name']] = {
'expectation': expectation,
'output': test['output'],
'passed': passed,
'result': result,
return tests
domain VARCHAR(255) NOT NULL,
creation_time TIMESTAMP NOT NULL,
public_headers JSONB NULL,
private_headers JSONB NULL
site_id INTEGER REFERENCES sites (id),
expectation VARCHAR NOT NULL
site_id INTEGER REFERENCES sites (id),
tests_completed SMALLINT NOT NULL DEFAULT 0,
tests_quantity SMALLINT NOT NULL,
grade VARCHAR(2) NULL,
grade_reasons JSONB NULL
site_id INTEGER REFERENCES sites (id),
scan_id INTEGER REFERENCES scans (id),
expectation VARCHAR NOT NULL,
CREATE INDEX sites_domain_idx ON sites ((lower(domain)));
CREATE INDEX tests_name_idx ON tests (name);
CREATE ROLE httpobsscanner;
GRANT SELECT, INSERT ON sites, expectations, scans, tests TO httpobsscanner;
GRANT UPDATE on sites, expectations, scans TO httpobsscanner;
CREATE ROLE httpobsapi;
GRANT SELECT ON expectations, scans, tests to httpobsapi;
GRANT SELECT (id, domain, public_headers) ON sites TO httpobsapi;
GRANT INSERT, UPDATE ON sites, expectations to httpobsapi;
GRANT INSERT, UPDATE (private_headers) ON sites to httpobsapi;
__all__ = [
# The various statuses
from .content import contribute, subresource_integrity
from .headers import content_security_policy, cookies, strict_transport_security,\
x_content_type_options, x_xss_protection, x_frame_options
from .misc import cross_origin_resource_sharing, redirection, tls_configuration
__all__ = [
from bs4 import BeautifulSoup as bs
from urllib.parse import urlparse
import json
import tld
MOZILLA_DOMAINS = ('mozilla', 'allizom', 'webmaker')
def contribute(reqs: dict, expectation='contribute-json-with-required-keys') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
contribute-json-with-required-keys: contribute.json exists, with all the REQUIRED_KEYS [default]
contribute-json-missing-required-keys: contribute.json exists, but missing some of the REQUIRED_KEYS
contribute-json-only-required-on-mozilla-properties: contribute.json isn't required,
since it's not a Mozilla domain
contribute-json-not-implemented: contribute.json file missing
:return: dictionary with:
data: the parsed contribute.json file
expectation: test expectation
pass: whether the site's configuration met its expectation (null for non-Mozilla sites)
result: short string describing the result of the test
REQUIRED_KEYS = ('name', 'description', 'participate', 'bugs', 'urls')
output = {
'data': None,
'expectation': expectation,
'pass': False,
'result': None,
response = reqs['responses']['auto']
# If there's no contribute.json file
if reqs['resources']['/contribute.json']:
output['data'] = json.loads(reqs['resources']['/contribute.json'])
if all(key in output['data'] for key in REQUIRED_KEYS):
output['result'] = 'contribute-json-with-required-keys'
output['result'] = 'contribute-json-missing-required-keys'
except (json.JSONDecodeError, TypeError):
output['result'] = 'contribute-json-invalid-json'
elif urlparse(response.url).netloc.split('.')[-2] not in MOZILLA_DOMAINS:
output['expectation'] = output['result'] = 'contribute-json-only-required-on-mozilla-properties'
output['result'] = 'contribute-json-not-implemented'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
elif output['result'] == 'contribute-json-only-required-on-mozilla-properties':
output['pass'] = None
return output
def subresource_integrity(reqs: dict, expectation='sri-implemented-and-external-scripts-loaded-securely') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
sri-implemented-and-external-scripts-loaded-securely: integrity attribute exists on all external scripts,
and scripts loaded [default for HTML]
sri-not-implemented-and-scripts-loaded-securely: SRI isn't needed, because the page isn't HTML
sri-not-implemented-and-scripts-loaded-insecurely: SRI isn't implemented, and scripts are downloaded over HTTP
sri-not-implemented-but-all-scripts-loaded-from-secure-origin: SRI isn't implemented,
but all scripts come from secure origins
sri-not-implemented-but-no-scripts-loaded: SRI isn't implemented, because the page doesn't load any scripts
sri-not-implemented-response-not-html: SRI isn't needed, because the page isn't HTML [default for non-HTML]
:return: dictionary with:
data: all external scripts and their integrity / crossorigin attributes
expectation: test expectation
pass: whether the site's external scripts met expectations
result: short string describing the result of the test
output = {
'data': {},
'expectation': expectation,
'pass': False,
'result': None,
response = reqs['responses']['auto']
# Return the new result if it's worse than the existing result, otherwise just the current result
def only_if_worse(result: str) -> str:
goodness = ['sri-implemented-and-external-scripts-loaded-securely',
if not output['result']:
return result
elif goodness.index(result) > goodness.index(output['result']):
return result
return output['result']
# If the response to get / fails
if response.status_code != 200:
output['result'] = 'request-did-not-return-status-code-200'
# If the content isn't HTML, there's no scripts to load; this is okay
elif response.headers.get('Content-Type', '').split(';')[0] != 'text/html':
output['expectation'] = 'sri-not-implemented-response-not-html'
output['result'] = 'sri-not-needed-response-not-html'
# Try to parse the HTML
soup = bs(reqs['resources']['/'], 'html.parser')
output['result'] = 'html-not-parsable'
return output
# Track to see if any scripts were on foreign TLDs
scripts_on_foreign_origin = False
# Get all the scripts
scripts = soup.find_all('script')
for script in scripts:
if script.has_attr('src'):
# Script tag parameters
src = urlparse(script['src'])
integrity = getattr(script, 'integrity')
crossorigin = getattr(script, 'crossorigin')
# Check to see if they're on the same TLD
sametld = True if tld.get_tld(response.url) == tld.get_tld(script['src'], fail_silently=True) else False
# Check to see if it's the same origin, same or a trusted Mozilla subdomain
if (src.netloc == '' or
sametld or
src.netloc.split('.')[-2] in MOZILLA_DOMAINS):
secureorigin = True
secureorigin = False
scripts_on_foreign_origin = True
# Add it to the scripts data result, if it's not a relative URI or on a Mozilla subdomain
if not secureorigin:
output['data'][script['src']] = {
'crossorigin': crossorigin,
'integrity': integrity
# See if it's a secure scheme
if src.scheme and src.scheme == 'https':
securescheme = True
securescheme = False
if integrity and not securescheme:
output['result'] = only_if_worse('sri-implemented-and-external-scripts-loaded-insecurely')
elif not integrity and securescheme:
output['result'] = only_if_worse('sri-not-implemented-and-scripts-loaded-securely')
elif not integrity and not securescheme:
output['result'] = only_if_worse('sri-not-implemented-and-scripts-loaded-insecurely')
# If the page doesn't load any scripts
if not scripts:
output['result'] = 'sri-not-implemented-but-no-scripts-loaded'
# If all the scripts are loaded from a secure origin, not triggering a need for SRI
elif scripts and not scripts_on_foreign_origin:
output['result'] = 'sri-not-implemented-but-all-scripts-loaded-from-secure-origin'
# If the page loaded from a foreign origin, but everything included SRI
elif scripts and scripts_on_foreign_origin and not output['result']:
output['result'] = 'sri-implemented-and-external-scripts-loaded-securely'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
elif output['result'] in ('sri-not-implemented-response-not-html',
output['pass'] = True
return output
@ -0,0 +1,319 @@
from urllib.parse import urlparse
import requests
def content_security_policy(reqs: dict, expectation='csp-implemented-with-unsafe-allowed-in-style-src-only') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
csp-implemented-with-no-unsafe: CSP implemented with no unsafe inline keywords
csp-implemented-with-unsafe-allowed-in-style-src-only: Allow the 'unsafe' keyword in style-src only [default]
csp-implemented-with-unsafe: CSP implemented with using either unsafe-eval or unsafe-inline
csp-implemented-with-insecure-scheme: CSP implemented with having sources over http:
csp-not-implemented: CSP not implemented
:return: dictionary with:
data: the raw CSP header
expectation: test expectation
pass: whether the site's configuration met its expectation
result: short string describing the result of the test
output = {
'data': None,
'expectation': expectation,
'pass': False,
'result': None,
response = reqs['responses']['auto']
# Check to see the state of the CSP header
if 'Content-Security-Policy' in response.headers:
# Store the CSP policy, if it's implemented
output['data'] = response.headers['Content-Security-Policy'].strip()
# Decompose the CSP; could probably do this in one step, but it's complicated enough
csp = [ directive.strip().split(' ', 1) for directive in output['data'].split(';')]
csp = { directive[0].lower(): (directive[1] if len(directive) > 1 else '') for directive in csp }
output['result'] = 'csp-header-invalid-header'
return output
for directive, value in csp.items():
if 'unsafe-' in value and directive == 'style-src' and not output['result']:
output['result'] = 'csp-implemented-with-unsafe-allowed-in-style-src-only'
elif 'unsafe-' in value:
output['result'] = 'csp-implemented-with-unsafe'
elif urlparse(response.url).scheme == 'https' and 'http:' in value:
output['result'] = 'csp-implemented-with-insecure-scheme'
if not output['result']:
output['result'] = 'csp-implemented-with-no-unsafe'
output['result'] = 'csp-not-implemented'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
elif expectation == ('csp-implemented-with-unsafe-allowed-in-style-src-only' and
output['result'] == 'csp-implemented-with-no-unsafe'):
output['pass'] = True
return output
def cookies(reqs: dict, expectation='secure-cookies-with-httponly-sessions') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
secure-cookies-with-httponly-sessions: All cookies have secure flag set, all session cookies are HttpOnly
insecure-cookie-set: Allowed to set cookies without the secure flag
httponly-flag-not-set-on-session-cookies: Allowed to have session cookies without the HttpOnly flag
:return: dictionary with:
data: the cookie jar
expectation: test expectation
pass: whether the site's configuration met its expectation
result: short string describing the result of the test
output = {
'data': None,
'expectation': expectation,
'pass': False,
'result': None,
session = reqs['session'] # all requests and their associated cookies
# If there are no cookies
if not session.cookies:
output['result'] = 'no-cookies-found'
jar = {}
for cookie in session.cookies:
# The httponly functionality is a bit broken
if not hasattr(cookie, 'httponly'):
if 'httponly' in [key.lower() for key in cookie._rest]:
cookie.httponly = True
cookie.httponly = False
# Add it to the jar
jar[] = {i: getattr(cookie, i, None) for i in ['domain', 'expires', 'httponly', 'max-age', 'path', 'port', 'secure']}
# All cookies must be set with the secure flag, but httponly not being set overrides it
if not and not output['result']:
output['result'] = 'insecure-cookie-set'
# Login and session cookies should be set with HttpOnly
if any(i in for i in ['login', 'sess']) and cookie.httponly == False:
output['result'] = 'httponly-flag-not-set-on-session-cookies'
# Save the cookie jar
output['data'] = jar
# Got through the cookie check properly
if not output['result']:
output['result'] = 'secure-cookies-with-httponly-sessions'
# Check to see if the test passed or failed
if not session.cookies:
output['pass'] = True
elif expectation == output['result']:
output['pass'] = True
return output
def strict_transport_security(reqs: dict, expectation='hsts-implemented-max-age-at-least-six-months') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
hsts-implemented-max-age-least-six-months: HSTS implemented with a max age of at least six months (15768000)
hsts-implemented-max-age-less-than-six-months: CSP implemented with using either unsafe-eval or unsafe-inline
hsts-not-implemented: CSP not implemented
:return: dictionary with:
data: the raw HSTS header
expectation: test expectation
includesubdomains: whether the includeSubDomains directive is set
pass: whether the site's configuration met its expectation
preload: whether the preload flag is set
result: short string describing the result of the test
SIX_MONTHS = 15768000
output = {
'data': None,
'expectation': expectation,
'includesubdomains': None,
'max-age': None,
'pass': False,
'preload': None,
'result': None,
response = reqs['responses']['https']
if response == None:
return output
if 'Strict-Transport-Security' in response.headers:
output['data'] = response.headers['Strict-Transport-Security']
sts = [i.lower().strip() for i in output['data'].split(';')]
for parameter in sts:
if parameter.startswith('max-age='):
output['max-age'] = int(parameter[8:])
elif parameter == 'includesubdomains':
output['includesubdomains'] = True
elif parameter == 'preload':
output['preload'] = True
if output['max-age']:
if output['max-age'] < SIX_MONTHS: # must be at least six months
output['result'] = 'hsts-implemented-max-age-less-than-six-months'
output['result'] = 'hsts-implemented-max-age-at-least-six-months'
output['result'] = 'hsts-invalid-header'
# If they're not included, then they're considered to be unset
if not output['includesubdomains']:
output['includesubdomains'] = False
if not output['preload']:
output['preload'] = False
output['result'] = 'hsts-invalid-header'
# If HSTS isn't set in the headers
output['result'] = 'hsts-not-implemented'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
return output
def x_content_type_options(reqs: dict, expectation='x-content-type-options-nosniff') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
x-content-type-options-nosniff: X-Content-Type-Options set to "nosniff" [default]
x-content-type-options-not-implemented: X-Content-Type-Options header missing
:return: dictionary with:
data: the raw X-Content-Type-Options header
expectation: test expectation
pass: whether the site's configuration met its expectation
result: short string describing the result of the test
output = {
'data': None,
'expectation': expectation,
'pass': False,
'result': None,
response = reqs['responses']['auto']
if 'X-Content-Type-Options' in response.headers:
output['data'] = response.headers['X-Content-Type-Options']
if output['data'].lower() == 'nosniff':
output['result'] = 'x-content-type-options-nosniff'
output['result'] = 'x-content-type-options-invalid-header'
output['result'] = 'x-content-type-options-not-implemented'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
return output
def x_frame_options(reqs: dict, expectation='x-frame-options-sameorigin-or-deny') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
x-frame-options-sameorigin-or-deny: X-Frame-Options set to "sameorigin" or "deny" [default]
x-frame-options-allow-from-origin: X-Frame-Options set to ALLOW-FROM uri
x-frame-options-not-implemented: X-Frame-Options header missing
:return: dictionary with:
data: the raw X-Content-Type-Options header
expectation: test expectation
pass: whether the site's configuration met its expectation
result: short string describing the result of the test
output = {
'data': None,
'expectation': expectation,
'pass': False,
'result': None,
response = reqs['responses']['auto']
if 'X-Frame-Options' in response.headers:
output['data'] = response.headers['X-Frame-Options']
if output['data'].lower() in ('deny', 'sameorigin'):
output['result'] = 'x-frame-options-sameorigin-or-deny'
elif 'allow-from ' in output['data'].lower():
output['result'] = 'x-frame-options-allow-from-origin'
output['result'] = 'x-frame-options-invalid-header'
output['result'] = 'x-frame-options-not-implemented'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
return output
def x_xss_protection(reqs: dict, expectation='x-xss-protection-1-mode-block') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
x-xss-protection-1-mode-block: X-XSS-Protection set to "1; block" [default]
x-xss-protection-0: X-XSS-Protection set to "0" (disabled)
x-xss-protection-not-implemented: X-XSS-Protection header missing
:return: dictionary with:
data: the raw X-XSS-Protection header
expectation: test expectation
pass: whether the site's configuration met its expectation
result: short string describing the result of the test
output = {
'data': None,
'expectation': expectation,
'pass': False,
'result': None,
response = reqs['responses']['auto']
if 'X-XSS-Protection' in response.headers:
output['data'] = response.headers['X-XSS-Protection']
if output['data'].lower().replace(' ', '').strip() == '1;mode=block':
output['result'] = 'x-xss-protection-1-mode-block'
elif output['data'].strip() == '0':
output['result'] = 'x-xss-protection-0'
output['result'] = 'x-xss-protection-invalid-header'
output['result'] = 'x-xss-protection-not-implemented'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
return output
@ -0,0 +1,175 @@
from urllib.parse import urlparse
import requests
def cross_origin_resource_sharing(reqs: dict, expectation='cross-origin-resource-sharing-not-implemented') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
cross-origin-resource-sharing-not-implemented: ACAO and the XML files don't exist [default]
cross-origin-research-sharing-implemented: One of them does
:return: dictionary with:
data: the ACAO header, clientaccesspolicy.xml file, and crossorigin.xml file
expectation: test expectation
pass: whether the site's configuration met its expectation
result: short string describing the result of the test
output = {
'data': {
'acao': None,
'clientaccesspolicy': None,
'crossorigin': None
'expectation': expectation,
'pass': False,
'result': None,
acao = reqs['responses']['auto']
if 'Access-Control-Allow-Origin' in acao.headers:
output['data']['acao'] = acao.headers['Access-Control-Allow-Origin']
if '*' in output['acao']:
output['result'] = 'cross-origin-resource-sharing-implemented'
# TODO: check to see if it's a limited clientaccesspolicy.xml file
if reqs['resources']['/clientaccesspolicy.xml'] == 200:
output['result'] = 'cross-origin-resource-sharing-implemented'
output['data']['clientaccesspolicy'] = reqs['resources']['/clientaccesspolicy.xml']
# TODO: check to see if it's a limited crossorigin.xml file
if reqs['resources']['/crossorigin.xml']:
output['result'] = 'cross-origin-resource-sharing-implemented'
output['data']['crossorigin'] = reqs['resources']['/crossorigin.xml']
if not output['data']['acao'] and not output['data']['clientaccesspolicy'] and not output['data']['crossorigin']:
output['result'] = 'cross-origin-resource-sharing-not-implemented'
# Check to see if the test passed or failed
if expectation == output['result']:
output['pass'] = True
return output
def redirection(reqs: dict, expectation='http-to-https-with-initial-redirect-to-same-host') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
http-to-https-with-initial-redirect-to-same-host: Redirects from http to https,
first redirection stays on host [default]
no-https-redirect: Site allowed to be served over HTTP
not-listening-for-http: Site doesn't listen for HTTP requests at all
off-host-redirection-from-http: Initial HTTP allowed to go from one host to another, still redirects to HTTPS
:return: dictionary with:
destination: final location of where GET / over HTTP ends
expectation: test expectation
pass: whether the site's configuration met its expectation
path: the URLs that the requests followed to get to destination
redirects: whether the site does any redirections at all
result: short string describing the result of the test
status-code: HTTP status code for the final redirection (typically 301 or 302)
response = reqs['responses']['http']
output = {
'destination': response.url,
'expectation': expectation,
'pass': False,
'redirects': True,
'result': None,
'route': [],
'status_code': response.status_code,
if not response:
output['result'] = 'not-listening-for-http'
elif response.history:
for entry in response.history:
src = urlparse(entry.url)
dst = urlparse(entry.headers['Location'])
# Add the result to the path that requests followed
# http should never redirect to another http location -- should always go to https first
if dst.scheme != 'https':
output['result'] = 'no-https-redirect'
output['status_code'] = entry.status_code
# If it's an http -> https redirection, make sure it redirects to the same host. If that's not done, then
# HSTS cannot be properly set on the original host
elif src.scheme == 'http' and dst.scheme == 'https' and src.netloc != dst.netloc:
output['result'] = 'off-host-redirection-from-http'
output['status_code'] = entry.status_code
# Store the final status code for the redirection
output['status_code'] = response.history[-1].status_code
if not output['result']:
output['result'] = 'http-to-https-with-initial-redirect-to-same-host'
# No redirections took place
output['result'] = 'no-https-redirect'
output['redirects'] = False
# Append the final location to the path
# Check to see if the test passed or failed
if expectation == output['result'] or output['result'] == 'not-listening-for-http':
output['pass'] = True
return output
def tls_configuration(reqs: dict, expectation='intermediate-or-modern-tls-configuration') -> dict:
:param reqs: dictionary containing all the request and response objects
:param expectation: test expectation
intermediate-or-modern-tls-configuration: intermediate or modern TLS configuration [default]
modern-tls-configuration: modern TLS configuration only
intermediate-tls-configuration: intermediate TLS configuration only
old-tls-configuration: old TLS configuration only
bad-tls-configuration: known bad TLS configuration
:return: dictionary with:
expectation: test expectation
pass: whether the site's configuration met its expectation
result: short string describing the result of the test
tls_observatory_scan_id: TLS observatory scan id, for result lookups
EVALUATION_HEADER = '* Mozilla evaluation: '
output = {
'expectation': expectation,
'pass': False,
'result': None,
'tls_observatory_scan_id': None,
tlsobs = reqs['responses']['tlsobs']
if tlsobs is None:
output['result'] = 'tls-observatory-scan-failed'
for line in tlsobs.split('\n'):
if line.startswith(SCANNING_HEADER):
output['tls_observatory_scan_id'] = int(line.split(' ')[-1][:-1])
elif line.startswith(EVALUATION_HEADER):
level = line.split(EVALUATION_HEADER)[-1]
output['result'] = level + '-tls-configuration' # intermediate-tls-configuration
# Quick shortcut to see if the test passed or failed
if level in expectation:
output['pass'] = True
return output
from .grade import grade
__all__ = ['grade']
import database
grade_order = ['A+', 'A', 'A-', 'B+', 'B', 'B-', 'C+', 'C', 'C-', 'F']
output = {
'grade': 'A+',
'grade_reasons': {}
def __set_grade(grade: str, test: str, reason=None) -> str:
Updates the grade, but only if it's worse than the current grade
:param grade: the new maximum grade
:return: the current grade after possible updating
if grade_order.index(grade) > grade_order.index(output['grade']):
output['grade'] = grade
if reason:
reason += ' Grade capped at {grade}.'.format(grade=grade)
output['grade_reasons'][test] = reason
return output['grade']
def grade(scan_id: int) -> dict:
# Get the test results from the database
test_results = database.select_test_results(scan_id)
# TODO: this needs a ton of fleshing out
# Grade the CSP stuff
test = 'content-security-policy'
result = test_results[test]['result']
if result == 'csp-implemented-with-no-unsafe':
elif result == 'csp-implemented-with-unsafe-allowed-in-style-src-only':
__set_grade('A', test, 'CSP implemented with unsafe-inline in style-src.')
__set_grade('B', test, 'CSP not implemented or implemented improperly.')
# Grade the TLS stuff
test = 'tls-configuration'
result = test_results[test]['result']
if result == 'old-tls-configuration':
__set_grade('C', test, 'TLS configuration uses the Mozilla old configuration.')
elif result == 'bad-tls-configuration':
__set_grade('F', test, 'TLS configuration doesn\'t match any known good Mozilla configurations.')
return database.insert_scan_grade(scan_id, output)
from .retriever import *
from urllib.parse import urlparse
import requests
import subprocess
# Create a session, returning the session and the HTTP response in a dictionary
def __create_session(url: str) -> dict:
s = requests.Session()
r = s.get(url)
# Store the domain and scheme in the session
s.url = urlparse(r.url)
return {'session': s, 'response': r}
def __get(session, relative_path='/'):
return session.get(session.url.scheme + '://' + session.url.netloc + relative_path)
return None
def __get_page_text(response: requests.Response) -> str:
if response.status_code == 200:
return response.text
return None
def __get_tlsobs_result(hostname: str):
return subprocess.check_output(['tlsobs', hostname]).decode('utf-8')
def retrieve_all(hostname: str, headers=None) -> dict:
retrievals = {
'hostname': hostname,
'resources': {
'responses': {
'auto': None, # whichever of 'http' or 'https' actually works, with 'https' as higher priority
'http': None,
'https': None,
'tlsobs': None
'session': None,
# The list of resources to get
resources = (
# HTTP headers stuff (avoiding mutable arguments)
# TODO: pull private / public headers from database
if not headers:
headers = {}
# Create some reusable sessions, one for HTTP and one for HTTPS
http_session = __create_session('http://' + hostname + '/')
https_session = __create_session('https://' + hostname + '/')
# If neither one works, then the site just can't be loaded
if not http_session['session'] and not https_session['session']:
return retrievals
# Store the HTTP only and HTTPS only responses (some things can only be retrieved over one or the other)
retrievals['responses']['http'] = http_session['response']
retrievals['responses']['https'] = https_session['response']
if https_session['session']:
retrievals['responses']['auto'] = https_session['response']
retrievals['session'] = https_session['session']
retrievals['responses']['auto'] = http_session['response']
retrievals['session'] = http_session['session']
# Store the contents of the base page
retrievals['resources']['/'] = __get_page_text(retrievals['responses']['auto'])
# Store all the files we retrieve
for resource in resources:
resp = __get(retrievals['session'], resource)
retrievals['resources'][resource] = __get_page_text(resp)
# Store the TLS Observatory response
retrievals['responses']['tlsobs'] = __get_tlsobs_result(hostname)
return retrievals
from database import get_cursor, insert_test_result
from scanner import STATE_FAILED
from scanner.retriever import retrieve_all
from celery import Celery
from os import environ
import scanner.analyzer
import sys
app = Celery('http_observatory_scanner', broker=environ['BROKER_URL'])
# TODO: make this into a Celery task
def scan(hostname: str, site_id: int, scan_id: int):
# Attempt to retrieve all the resources
reqs = retrieve_all(hostname)
# TODO: have more specific error messages
e = sys.exc_info()[1] # get the error message
# If we are unsuccessful in close out the scan in the database if it failed
with get_cursor() as cur:
cur.execute("""UPDATE scans
SET (status, end_time, error) = (%s, NOW(), %s)
WHERE id = %s
(STATE_FAILED, repr(e), scan_id))
# Get all the tests
tests = [f for _, f in scanner.analyzer.__dict__.items() if callable(f)]
for test in tests:
# TODO: Get overridden expectation
test_name = test.__name__.replace('_', '-')
insert_test_result(site_id, scan_id, test_name, test(reqs))
from .api import *
__all__ = [
from scanner import STATE_FINISHED
from scanner.grader import grade
from scanner.tasks import scan
from flask import Blueprint, abort, jsonify
import database
api = Blueprint('api', __name__)
# TODO Implement GET, which just returns scan status?
# @api.route('/api/v1/scan/<hostname>', methods=['GET'])
# def get_scan_hostname(hostname):
# abort(403)
@api.route('/api/v1/scan/<hostname>', methods=['GET', 'POST'])
def api_post_scan_hostname(hostname: str):
hostname = hostname.lower()
# Get the site's id number
site_id = database.select_site_id(hostname)
# Next, let's see if there's a recent scan
recent_scan_row = database.select_scan_recent_scan(site_id)
# If there was a recent scan, just return it
if recent_scan_row:
if recent_scan_row['state'] == STATE_FINISHED and recent_scan_row['grade'] == None:
recent_scan_row = grade(recent_scan_row['id'])
# TODO: clean this up
return jsonify(recent_scan_row)
# Otherwise, let's start up a scan
row = database.insert_scan(site_id)
scan_id = row['id']
# Begin the dispatch process
scan(hostname, site_id, scan_id)
# And return the scan data
# TODO: clean this up
return jsonify(row)
@api.route('/api/v1/results/<scan_id>', methods=['GET'])
def api_get_test_results(scan_id: int):
scan_id = int(scan_id)
except ValueError:
# Get all the test results for the given scan id and return it
return jsonify(database.select_test_results(scan_id))
from flask import Flask
from website.backend import api
app = Flask(__name__)
def main() -> str:
return 'Welcome to the HTTP Observatory backend service!'
if __name__ == '__main__':
Ссылка в новой задаче