This commit is contained in:
Kyle Lahnakoski 2018-05-23 23:28:19 -04:00
Родитель 36b4656f5a a21d933ea8
Коммит 42f5e63670
3 изменённых файлов: 193 добавлений и 2 удалений

Просмотреть файл

@ -21,6 +21,11 @@ from mo_dots import listwrap
from mo_future import BytesIO
from mo_logs import constants, Log, startup, Except
settings = {}
logger = logging.getLogger('esFrontLine')
logger.setLevel(logging.DEBUG)
app = Flask(__name__)
auth = HawkAuth()
@ -190,7 +195,6 @@ class WSGICopyBody(object):
settings = None
app.wsgi_app = WSGICopyBody(app.wsgi_app)
def main():
global settings

Просмотреть файл

@ -1,4 +1,5 @@
Flask==0.10.1
requests==2.3.0
mohawk==0.3.4
elasticsearch
elasticsearch==6.2.0
responses==0.9.0

186
tests/test_auth.py Normal file
Просмотреть файл

@ -0,0 +1,186 @@
import unittest
import logging
import responses
import re
from esFrontLine import app as frontline
from esFrontLine.auth import HawkAuth
from mohawk import Sender
VALID_USER = {
"id": "babadie@mozilla.com",
"key": "dummySecret",
"algorithm": "sha256"
}
INVALID_USER = {
"id": "babadie@mozilla.com",
"key": "INVALID_SECRET",
"algorithm": "sha256"
}
def mock_hawk(method, url='/', body='', user=VALID_USER):
'''
Helper to create an HAWK header towards mock server
'''
full_url = 'http://localhost' + url
sender = Sender(user, full_url, method, body, content_type='application/json')
return {
'Authorization': sender.request_header,
'Content-Type': 'application/json',
}
class TestAuthentication(unittest.TestCase):
'''
Test the authentication process through Hawk
'''
def setUp(self):
logging.basicConfig()
# Setup test settings
frontline.settings = {
'whitelist': ['test-index', 'protected-data', ],
'elasticsearch': [{
"host":"http://unittest",
"port":9200
}],
}
frontline.auth.load_users([{
"hawk": VALID_USER,
"resources": ['test-index', ],
}])
# Setup flask app Client
frontline.app.testing = True
self.client = frontline.app.test_client()
# Setup responses
mock_es = re.compile('http://unittest:9200/.*')
responses.add(
responses.GET,
mock_es,
body='{}',
content_type='application/json',
)
responses.add(
responses.HEAD,
mock_es,
body='',
content_type='application/json',
)
@responses.activate
def test_head(self):
'''
HEAD request should always work when authenticated
'''
# No auth
r = self.client.head('/')
self.assertEqual(r.status_code, 403)
# Invalid auth
r = self.client.head('/', headers={'Authorization': 'Invalid token'})
self.assertEqual(r.status_code, 403)
# Valid auth
valid_hawk = mock_hawk('HEAD')
r = self.client.head('/', headers=valid_hawk)
self.assertEqual(r.status_code, 200)
# Replay not allowed
r = self.client.head('/', headers=valid_hawk)
self.assertEqual(r.status_code, 403)
# Invalid secret
invalid_hawk = mock_hawk('HEAD', user=INVALID_USER)
r = self.client.head('/', headers=invalid_hawk)
self.assertEqual(r.status_code, 403)
@responses.activate
def test_get(self):
'''
Test some GET requests with authentication
'''
search_url = '/test-index/_search'
# No auth, no query
r = self.client.get(search_url )
self.assertEqual(r.status_code, 403)
# Valid auth but missing query
valid_hawk = mock_hawk('GET', search_url, '{}')
r = self.client.get(search_url, data='{}', headers=valid_hawk)
self.assertEqual(r.status_code, 400)
# Valid auth + query
valid_hawk = mock_hawk('GET', search_url, '{"query":{}}')
r = self.client.get(search_url, data='{"query":{}}', headers=valid_hawk)
self.assertEqual(r.status_code, 200)
@responses.activate
def test_user_syntax(self):
'''
Test user syntax from settings
'''
auth = HawkAuth()
self.assertEqual(len(auth.users), 0)
# Missing hawk
with self.assertRaises(Exception) as e:
auth.load_users([{
'user': 'test',
}])
self.assertEqual(e.exception.message, 'Error on user #1: Missing "hawk" setting in user config.')
self.assertEqual(len(auth.users), 0)
# Missing resources
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': 'test',
}])
self.assertEqual(e.exception.message, 'Error on user #1: Missing "resources" setting in user config.')
self.assertEqual(len(auth.users), 0)
# Resources not a list
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': 'test',
'resources': 'test',
}])
self.assertEqual(e.exception.message, 'Error on user #1: "resources" must be JSON list')
self.assertEqual(len(auth.users), 0)
# hawk not a dict
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': 'test',
'resources': ['test'],
}])
self.assertEqual(e.exception.message, 'Error on user #1: "hawk" must be a JSON dictionary')
self.assertEqual(len(auth.users), 0)
# Invalid hawk
with self.assertRaises(Exception) as e:
auth.load_users([{
'hawk': {},
'resources': ['test'],
}])
self.assertEqual(e.exception.message, 'Error on user #1: "hawk" can only contains algorithm, id, key.')
self.assertEqual(len(auth.users), 0)
# Valid hawk
auth.load_users([{
'hawk': {
'algorithm': 'sha1',
'id': 'testId',
'key': 'testKey',
},
'resources': ['test'],
}])
self.assertEqual(len(auth.users), 1)
if __name__ == '__main__':
unittest.main()