* Use new session contents

* Added unit tests
This commit is contained in:
Jason Robbins 2022-03-10 12:22:19 -08:00 коммит произвёл GitHub
Родитель cc182d544e
Коммит 1871cdee22
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 252 добавлений и 24 удалений

Просмотреть файл

@ -13,19 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from google.oauth2 import id_token
from google.auth.transport import requests
from flask import session
from framework import basehandlers
from framework import xsrf
import settings
class LoginAPI(basehandlers.APIHandler):
"""Create a session using the id_token generated by Google Sign-In."""
@ -34,12 +32,14 @@ class LoginAPI(basehandlers.APIHandler):
message = "Unable to Authenticate"
try:
unused_idinfo = id_token.verify_oauth2_token(
idinfo = id_token.verify_oauth2_token(
token, requests.Request(),
settings.GOOGLE_SIGN_IN_CLIENT_ID)
# userid = idinfo['sub']
# email = idinfo['email']
session["id_token"] = token
user_info = {
'email': idinfo['email'],
}
signature = xsrf.generate_token(str(user_info))
session['signed_user_info'] = user_info, signature
message = "Done"
# print(idinfo['email'], file=sys.stderr)
except ValueError:

70
api/login_api_test.py Normal file
Просмотреть файл

@ -0,0 +1,70 @@
# Copyright 2022 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testing_config # Must be imported before the module under test.
import flask
from flask import session
from unittest import mock
import werkzeug.exceptions # Flask HTTP stuff.
from api import login_api
from framework import xsrf
test_app = flask.Flask(__name__)
test_app.secret_key = 'testing secret'
class LoginAPITest(testing_config.CustomTestCase):
def setUp(self):
self.handler = login_api.LoginAPI()
self.request_path = '/api/v0/login'
def test_post__missing_id_token(self):
"""We reject login requests that don't have any id_token."""
params = {}
with test_app.test_request_context(self.request_path, json=params):
session.clear()
session['something else'] = 'some other aspect of the session'
with self.assertRaises(werkzeug.exceptions.BadRequest):
self.handler.do_post()
self.assertEqual(1, len(session))
def test_post__invalid_id_token(self):
"""We reject login requests that have an invalid id_token."""
params = {'id_token': 'fake bad token'}
with test_app.test_request_context(self.request_path, json=params):
session['something else'] = 'some other aspect of the session'
actual_response = self.handler.do_post()
self.assertEqual({'message': 'Invalid token'}, actual_response)
self.assertEqual(1, len(session))
@mock.patch('google.oauth2.id_token.verify_oauth2_token')
def test_post__normal(self, mock_verify):
"""We log in the user if they provide a good id_token."""
mock_verify.return_value = {'email': 'user@example.com'}
params = {'id_token': 'fake bad token'}
with test_app.test_request_context(self.request_path, json=params):
session.clear()
session['something else'] = 'some other aspect of the session'
actual_response = self.handler.do_post()
self.assertEqual({'message': 'Done'}, actual_response)
self.assertEqual(2, len(session))
user_info, signature = session['signed_user_info']
self.assertEqual({'email': 'user@example.com'}, user_info)
xsrf.validate_token(
signature,
str(user_info),
timeout=xsrf.REFRESH_TOKEN_TIMEOUT_SEC)

42
api/logout_api_test.py Normal file
Просмотреть файл

@ -0,0 +1,42 @@
# Copyright 2022 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testing_config # Must be imported before the module under test.
import flask
from flask import session
from unittest import mock
import werkzeug.exceptions # Flask HTTP stuff.
from api import logout_api
test_app = flask.Flask(__name__)
test_app.secret_key = 'testing secret'
class LogoutAPITest(testing_config.CustomTestCase):
def setUp(self):
self.handler = logout_api.LogoutAPI()
self.request_path = '/api/v0/logout'
def test_post__normal(self):
"""We log out the user whenever they request that."""
params = {}
with test_app.test_request_context(self.request_path, json=params):
session['signed_user_info'] = {'email': 'x'}, 'fake signature'
session['something else'] = 'some other aspect of the session'
actual_response = self.handler.do_post()
self.assertEqual({'message': 'Done'}, actual_response)
self.assertEqual(0, len(session))

Просмотреть файл

@ -37,7 +37,6 @@ from internals import models
from django.template.loader import render_to_string
import django
from google.oauth2 import id_token
from google.auth.transport import requests
from flask import session
import sys

Просмотреть файл

@ -1,7 +1,11 @@
import logging
import os
from flask import session
from google.oauth2 import id_token
from google.auth.transport import requests
from framework import xsrf
import settings
@ -185,28 +189,41 @@ class User(object):
def get_current_user():
if settings.UNIT_TEST_MODE:
user_via_env = None
if os.environ['USER_EMAIL']!= '':
current_user = User(
email=os.environ['USER_EMAIL'],
_user_id=os.environ['USER_ID'])
else:
current_user = None
return current_user
user_via_env = User(email=os.environ['USER_EMAIL'])
return user_via_env
token = session.get('id_token')
current_user = None
if token:
# TODO(jrobbins): Remove this code path after 30 days.
jwt = session.get('id_token')
if jwt:
try:
idinfo = id_token.verify_oauth2_token(
token, requests.Request(), settings.GOOGLE_SIGN_IN_CLIENT_ID)
current_user = User(email=idinfo['email'], _user_id=idinfo['sub'])
jwt, requests.Request(), settings.GOOGLE_SIGN_IN_CLIENT_ID)
user_via_jwt = User(email=idinfo['email'])
return user_via_jwt
except ValueError:
# Remove the id_token from session if it is invalid or expired
session.clear()
current_user = None
# If anything is not right, give the user a fresh session.
session.clear()
pass
return current_user
user_info, signature = session.get('signed_user_info', (None, None))
if user_info:
try:
xsrf.validate_token(
signature,
str(user_info),
timeout=xsrf.REFRESH_TOKEN_TIMEOUT_SEC)
user_via_signed_user_info = User(email=user_info['email'])
return user_via_signed_user_info
except xsrf.TokenIncorrect:
# If anything is not right, give the user a fresh session.
session.clear()
pass
return None # User is not signed in.
def is_current_user_admin():

100
framework/users_test.py Normal file
Просмотреть файл

@ -0,0 +1,100 @@
# Copyright 2022 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testing_config # Must be imported before the module under test.
import flask
from flask import session
from unittest import mock
import werkzeug.exceptions # Flask HTTP stuff.
from framework import users
from framework import xsrf
test_app = flask.Flask(__name__)
test_app.secret_key = 'testing secret'
class UsersTest(testing_config.CustomTestCase):
def test_get_current_user__unittest_signed_in(self):
"""For unit tests, we know when the user is signed in."""
testing_config.sign_in('user_111@example.com', 111)
actual_user = users.get_current_user()
self.assertEqual('user_111@example.com', actual_user.email())
def test_get_current_user__unittest_signed_out(self):
"""For unit tests, we know when the user is signed out."""
testing_config.sign_out()
actual_user = users.get_current_user()
self.assertIsNone(actual_user)
@mock.patch('settings.UNIT_TEST_MODE', False)
@mock.patch('google.oauth2.id_token.verify_oauth2_token')
def test_get_current_user__signed_in_jwt(self, mock_verify):
"""A valid jwt means the user is signed in."""
mock_verify.return_value = {'email': 'user_222@example.com'}
with test_app.test_request_context('/any-path'):
session.clear()
session['id_token'] = 'fake good token'
actual_user = users.get_current_user()
self.assertEqual('user_222@example.com', actual_user.email())
@mock.patch('settings.UNIT_TEST_MODE', False)
@mock.patch('google.oauth2.id_token.verify_oauth2_token')
def test_get_current_user__bad_jwt(self, mock_verify):
"""We reject bad JWTs and clear the session."""
mock_verify.side_effect = ValueError()
with test_app.test_request_context('/any-path'):
session.clear()
session['id_token'] = 'fake bad token'
actual_user = users.get_current_user()
self.assertIsNone(actual_user)
self.assertEqual(0, len(session))
@mock.patch('settings.UNIT_TEST_MODE', False)
@mock.patch('framework.xsrf.validate_token')
def test_get_current_user__signed_in_sui(self, mock_validate):
"""A valid signed user_info means the user is signed in."""
with test_app.test_request_context('/any-path'):
session.clear()
session['signed_user_info'] = {'email': 'user_333@example.com'}, 'good'
actual_user = users.get_current_user()
self.assertEqual('user_333@example.com', actual_user.email())
mock_validate.assert_called_once()
@mock.patch('settings.UNIT_TEST_MODE', False)
@mock.patch('framework.xsrf.validate_token')
def test_get_current_user__bad_sui(self, mock_validate):
"""We reject bad signed user_info and clear the session."""
mock_validate.side_effect = xsrf.TokenIncorrect()
with test_app.test_request_context('/any-path'):
session.clear()
session['signed_user_info'] = {'email': 'anything'}, 'bad signature'
actual_user = users.get_current_user()
self.assertIsNone(actual_user)
self.assertEqual(0, len(session))
@mock.patch('settings.UNIT_TEST_MODE', False)
def test_get_current_user__signed_out(self):
"""When there is no jwt or user info in the session, user is signed out."""
with test_app.test_request_context('/any-path'):
session.clear()
actual_user = users.get_current_user()
self.assertIsNone(actual_user)
def test_is_currnet_user_admin(self):
"""We never consider a user an admin based on old GAE auth info."""
actual = users.is_current_user_admin()
self.assertFalse(actual)