This commit is contained in:
Bastien Abadie 2018-05-21 16:24:35 +02:00
Родитель ca64d9844e
Коммит 1132096e7e
5 изменённых файлов: 195 добавлений и 6 удалений

Просмотреть файл

@ -22,6 +22,11 @@ from werkzeug.exceptions import abort
from auth import HawkAuth, AuthException
import sys # REQUIRED FOR DYNAMIC DEBUG
settings = {}
logger = logging.getLogger('esFrontLine')
logger.setLevel(logging.DEBUG)
app = Flask(__name__)
auth = HawkAuth()
@ -211,8 +216,6 @@ class WSGICopyBody(object):
app.wsgi_app = WSGICopyBody(app.wsgi_app)
logger = None
settings = {}
def main():
@ -237,8 +240,6 @@ def main():
settings["args"] = args
settings["whitelist"] = listwrap(settings.get("whitelist", None))
globals()["logger"] = logging.getLogger('esFrontLine')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
for d in listwrap(settings["debug"]["log"]):

Просмотреть файл

@ -37,7 +37,7 @@ class HawkAuth(object):
assert isinstance(resources, list), '"resources" must be JSON list'
assert len(resources) > 0, '"resources" cannot be empty'
assert isinstance(hawk, dict), '"hawk" must be a JSON dictionary'
assert hawk.keys() == ['algorithm', 'id', 'key'], \
assert {'algorithm', 'id', 'key'}.symmetric_difference(hawk.keys()) == set(), \
'"hawk" can only contains algorithm, id, key.'
self.users[user['hawk']['id']] = user

Просмотреть файл

@ -13,7 +13,7 @@ class HawkConnection(Urllib3HttpConnection):
# Save credentials
assert isinstance(hawk_credentials, dict), 'hawk_credentials should be a dict'
assert hawk_credentials.keys() == ['algorithm', 'id', 'key'], \
assert {'algorithm', 'id', 'key'}.symmetric_difference(hawk_credentials.keys()) == set(), \
'hawk_credentials can only contains algorithm, id, key.'
self._hawk_credentials = hawk_credentials

2
requirements.txt Normal file
Просмотреть файл

@ -0,0 +1,2 @@
elasticsearch==6.2.0
responses==0.9.0

186
tests/test_auth.py Normal file
Просмотреть файл

@ -0,0 +1,186 @@
import unittest
import logging
import responses
import re
from esFrontLine import app as frontline
from esFrontLine.auth import HawkAuth
from mohawk import Sender
VALID_USER = {
"id": "babadie@mozilla.com",
"key": "dummySecret",
"algorithm": "sha256"
}
INVALID_USER = {
"id": "babadie@mozilla.com",
"key": "INVALID_SECRET",
"algorithm": "sha256"
}
def mock_hawk(method, url='/', body='', user=VALID_USER):
'''
Helper to create an HAWK header towards mock server
'''
full_url = 'http://localhost' + url
sender = Sender(user, full_url, method, body, content_type='application/json')
return {
'Authorization': sender.request_header,
'Content-Type': 'application/json',
}
class TestAuthentication(unittest.TestCase):
'''
Test the authentication process through Hawk
'''
def setUp(self):
logging.basicConfig()
# Setup test settings
frontline.settings = {
'whitelist': ['test-index', 'protected-data', ],
'elasticsearch': [{
"host":"http://unittest",
"port":9200
}],
}
frontline.auth.load_users([{
"hawk": VALID_USER,
"resources": ['test-index', ],
}])
# Setup flask app Client
frontline.app.testing = True
self.client = frontline.app.test_client()
# Setup responses
mock_es = re.compile('http://unittest:9200/.*')
responses.add(
responses.GET,
mock_es,
body='{}',
content_type='application/json',
)
responses.add(
responses.HEAD,
mock_es,
body='',
content_type='application/json',
)
@responses.activate
def test_head(self):
'''
HEAD request should always work when authenticated
'''
# No auth
r = self.client.head('/')
self.assertEqual(r.status_code, 403)
# Invalid auth
r = self.client.head('/', headers={'Authorization': 'Invalid token'})
self.assertEqual(r.status_code, 403)
# Valid auth
valid_hawk = mock_hawk('HEAD')
r = self.client.head('/', headers=valid_hawk)
self.assertEqual(r.status_code, 200)
# Replay not allowed
r = self.client.head('/', headers=valid_hawk)
self.assertEqual(r.status_code, 403)
# Invalid secret
invalid_hawk = mock_hawk('HEAD', user=INVALID_USER)
r = self.client.head('/', headers=invalid_hawk)
self.assertEqual(r.status_code, 403)
@responses.activate
def test_get(self):
'''
Test some GET requests with authentication
'''
search_url = '/test-index/_search'
# No auth, no query
r = self.client.get(search_url )
self.assertEqual(r.status_code, 403)
# Valid auth but missing query
valid_hawk = mock_hawk('GET', search_url, '{}')
r = self.client.get(search_url, data='{}', headers=valid_hawk)
self.assertEqual(r.status_code, 400)
# Valid auth + query
valid_hawk = mock_hawk('GET', search_url, '{"query":{}}')
r = self.client.get(search_url, data='{"query":{}}', headers=valid_hawk)
self.assertEqual(r.status_code, 200)
@responses.activate
def test_user_syntax(self):
'''
Test user syntax from settings
'''
auth = HawkAuth()
self.assertEqual(len(auth.users), 0)
# Missing hawk
with self.assertRaises(Exception) as e:
auth.load_users([{
'user': 'test',
}])
self.assertEqual(e.exception.message, 'Error on user #1: Missing "hawk" setting in user config.')
self.assertEqual(len(auth.users), 0)
# Missing resources
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': 'test',
}])
self.assertEqual(e.exception.message, 'Error on user #1: Missing "resources" setting in user config.')
self.assertEqual(len(auth.users), 0)
# Resources not a list
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': 'test',
'resources': 'test',
}])
self.assertEqual(e.exception.message, 'Error on user #1: "resources" must be JSON list')
self.assertEqual(len(auth.users), 0)
# hawk not a dict
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': 'test',
'resources': ['test'],
}])
self.assertEqual(e.exception.message, 'Error on user #1: "hawk" must be a JSON dictionary')
self.assertEqual(len(auth.users), 0)
# Invalid hawk
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': {},
'resources': ['test'],
}])
self.assertEqual(e.exception.message, 'Error on user #1: "hawk" can only contains algorithm, id, key.')
self.assertEqual(len(auth.users), 0)
# Valid hawk
auth.load_users([{
'hawk': {
'algorithm': 'sha1',
'id': 'testId',
'key': 'testKey',
},
'resources': ['test'],
}])
self.assertEqual(len(auth.users), 1)
if __name__ == '__main__':
unittest.main()