Flesh out permissions.py and simplify page logic. (#1237)

* Flesh out permissions.py and simplify page logic.

* Addressed review comments
This commit is contained in:
Jason Robbins 2021-03-24 10:11:13 -07:00 коммит произвёл GitHub
Родитель 9d8d441b94
Коммит 52e38880b3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 216 добавлений и 154 удалений

Просмотреть файл

@ -27,6 +27,7 @@ import flask.views
from google.appengine.api import users
from google.appengine.ext import db
from framework import permissions
from framework import ramcache
import settings
import models
@ -53,13 +54,13 @@ class BaseHandler(flask.views.MethodView):
"""Support webapp2-style, e.g., return self.redirect(url)."""
return flask.redirect(url)
class APIHandler(BaseHandler):
def get_current_user(self):
# TODO(jrobbins): oauth support
return users.get_current_user()
class APIHandler(BaseHandler):
def get_headers(self):
"""Add CORS and Chrome Frame to all responses."""
headers = {
@ -169,14 +170,15 @@ class FlaskHandler(BaseHandler):
'TEMPLATE_CACHE_TIME': settings.TEMPLATE_CACHE_TIME
}
user = users.get_current_user()
user = self.get_current_user()
if user:
user_pref = models.UserPref.get_signed_in_user_pref()
common_data['login'] = (
'Sign out', users.create_logout_url(dest_url=current_path))
common_data['user'] = {
'can_edit': self.user_can_edit(user),
'is_admin': users.is_current_user_admin(),
'can_create_feature': permissions.can_create_feature(user),
'can_edit': permissions.can_edit_any_feature(user),
'is_admin': permissions.can_admin_site(user),
'email': user.email(),
'dismissed_cues': json.dumps(user_pref.dismissed_cues),
}
@ -222,26 +224,6 @@ class FlaskHandler(BaseHandler):
# handler_data is a string or redirect response object.
return handler_data, headers
def user_can_edit(self, user):
if not user:
return False
can_edit = False
if users.is_current_user_admin():
can_edit = True
elif user.email().endswith(('@chromium.org', '@google.com')):
can_edit = True
else:
# TODO(ericbidelman): cache user lookup.
query = models.AppUser.all(keys_only=True).filter('email =', user.email())
found_user = query.get()
if found_user is not None:
can_edit = True
return can_edit
@property
def form(self):
"""Property for POST values dict."""

Просмотреть файл

@ -278,41 +278,6 @@ class FlaskHandlerTests(unittest.TestCase):
self.assertIn('some/other/path', actual_response.headers['location'])
self.assertIn('Access-Control-Allow-Origin', actual_headers)
def test_user_can_edit__anon(self):
"""Anon visitors cannot edit features."""
actual = self.handler.user_can_edit(None)
self.assertFalse(actual)
def test_user_can_edit__normal(self):
"""Non-registered signed in users cannot edit features."""
u = users.User(email='user@example.com')
actual = self.handler.user_can_edit(u)
self.assertFalse(actual)
def test_user_can_edit__registered(self):
"""Users who have been registed by admins may edit features."""
u = users.User(email='registered@example.com')
actual = self.handler.user_can_edit(u)
self.assertTrue(actual)
def test_user_can_edit__preferred_domains(self):
"""Users signed in with certain email addresses may edit."""
u = users.User(email='user@chromium.org')
actual = self.handler.user_can_edit(u)
self.assertTrue(actual)
u = users.User(email='user@google.com')
actual = self.handler.user_can_edit(u)
self.assertTrue(actual)
u = users.User(email='user@this-is-not-google.com')
actual = self.handler.user_can_edit(u)
self.assertFalse(actual)
u = users.User(email='user@this-is-not.google.com')
actual = self.handler.user_can_edit(u)
self.assertFalse(actual)
def test_require_task_header__while_testing(self):
"""During unit testing of task handlers, we allow it."""
with test_app.test_request_context('/test'):

Просмотреть файл

@ -21,32 +21,104 @@ import flask
from google.appengine.api import users
import models
def can_admin_site():
"""Return true if the user is a site admin."""
def can_admin_site(unused_user):
"""Return True if the current user is allowed to administer the site."""
# TODO(jrobbins): replace this with user.is_admin.
return users.is_current_user_admin()
def can_view_feature(unused_user, unused_feature):
"""Return True if the user is allowed to view the given feature."""
# Note, for now there are no private features, only unlisted ones.
return True
def can_create_feature(user):
"""Return True if the user is allowed to create features."""
if not user:
return False
if can_admin_site(user):
return True
# TODO(jrobbins): generalize this.
if user.email().endswith(('@chromium.org', '@google.com')):
return True
query = models.AppUser.all(keys_only=True).filter('email =', user.email())
found_user = query.get()
if found_user is not None:
return True
return False
def can_edit_any_feature(user):
"""Return True if the user is allowed to edit features."""
return can_create_feature(user)
def can_edit_feature(user, feature):
"""Return True if the user is allowed to edit the given feature."""
# TODO(jrobbins): make this per-feature
if not can_view_feature(user, feature):
return False
return can_edit_any_feature(user)
def _reject_or_proceed(
handler_obj, handler_method, handler_args, handler_kwargs,
perm_function):
"""Redirect, abort(403), or call handler_method."""
user = handler_obj.get_current_user()
req = handler_obj.request
# Give the user a chance to sign in
if not user and req.method == 'GET':
return handler_obj.redirect(users.create_login_url(req.full_path))
if not perm_function(user):
handler_obj.abort(403)
else:
return handler_method(handler_obj, *handler_args, **handler_kwargs)
def require_admin_site(handler):
"""Handler decorator to require the user have edit permission."""
"""Handler decorator to require the user can admin the site."""
def check_login(self, *args, **kwargs):
if not can_admin_site():
flask.abort(403)
return
return _reject_or_proceed(
self, handler, args, kwargs, can_admin_site)
return handler(self, *args, **kwargs) # Call the handler method
return check_login
def require_edit_permission(handler):
"""Handler decorator to require the user have edit permission."""
def require_view_feature(handler):
"""Handler decorator to require the user can view the current feature."""
# TODO(jrobbins): make this per-feature
def check_login(self, *args, **kwargs):
user = users.get_current_user()
if not user:
return self.redirect(users.create_login_url(self.request.uri))
elif not self.user_can_edit(user):
flask.abort(403)
return
return _reject_or_proceed(
self, handler, args, kwargs, can_view_feature)
return check_login
def require_create_feature(handler):
"""Handler decorator to require the user can create a feature."""
def check_login(self, *args, **kwargs):
return _reject_or_proceed(
self, handler, args, kwargs, can_create_feature)
return check_login
def require_edit_feature(handler):
"""Handler decorator to require the user can edit the current feature."""
# TODO(jrobbins): make this per-feature
def check_login(self, *args, **kwargs):
return _reject_or_proceed(
self, handler, args, kwargs, can_edit_any_feature)
return handler(self, *args, **kwargs) # Call the handler method
return check_login

Просмотреть файл

@ -21,10 +21,13 @@ import testing_config # Must be imported before the module under test.
import mock
import werkzeug.exceptions # Flask HTTP stuff.
from google.appengine.api import users
from framework import basehandlers
from framework import permissions
class MockHandler(object):
class MockHandler(basehandlers.BaseHandler):
def __init__(self):
self.called_with = None
@ -32,24 +35,74 @@ class MockHandler(object):
@permissions.require_admin_site
def do_get(self, *args):
self.called_with = args
return {'message': 'did get'}
@permissions.require_admin_site
def do_post(self, *args):
self.called_with = args
return {'message': 'did post'}
class CanAdminSiteTests(unittest.TestCase):
test_app = basehandlers.FlaskApplication(
[('/path', MockHandler),
],
debug=True)
def test_can_admin_site__normal_user(self):
"""A normal user is not allowed to administer the site."""
class PermissionFunctionTests(unittest.TestCase):
def check_function_results(
self, func, additional_args,
normal='missing', special='missing', admin='missing', anon='missing'):
"""Test func under four conditions and check expected results."""
# Test normal users
testing_config.sign_in('user@example.com', 123)
self.assertFalse(permissions.can_admin_site())
user = users.get_current_user()
self.assertEqual(normal, func(user, *additional_args))
def test_can_admin_site__admin_user(self):
"""An admin user is allowed to administer the site."""
# Test special users
# TODO(jrobbins): generalize this.
testing_config.sign_in('user@google.com', 123)
user = users.get_current_user()
self.assertEqual(special, func(user, *additional_args))
testing_config.sign_in('user@chromium.org', 123)
user = users.get_current_user()
self.assertEqual(special, func(user, *additional_args))
# Test admin users
testing_config.sign_in('user@example.com', 123, is_admin=True)
self.assertTrue(permissions.can_admin_site())
user = users.get_current_user()
self.assertEqual(admin, func(user, *additional_args))
def test_can_admin_site__anon(self):
"""An anon visitor is not allowed to administer the site."""
# Test anonymous visitors
testing_config.sign_out()
self.assertFalse(permissions.can_admin_site())
user = users.get_current_user()
self.assertEqual(anon, func(user, *additional_args))
def test_can_admin_site(self):
self.check_function_results(
permissions.can_admin_site, tuple(),
normal=False, special=False, admin=True, anon=False)
def test_can_view_feature(self):
self.check_function_results(
permissions.can_view_feature, (None,),
normal=True, special=True, admin=True, anon=True)
def test_can_create_feature(self):
self.check_function_results(
permissions.can_create_feature, tuple(),
normal=False, special=True, admin=True, anon=False)
def test_can_edit_any_feature(self):
self.check_function_results(
permissions.can_edit_any_feature, tuple(),
normal=False, special=True, admin=True, anon=False)
def test_can_edit_feature(self):
self.check_function_results(
permissions.can_edit_feature, (None,),
normal=False, special=True, admin=True, anon=False)
class RequireAdminSiteTests(unittest.TestCase):
@ -58,21 +111,44 @@ class RequireAdminSiteTests(unittest.TestCase):
"""Wrapped method rejects call from normal user."""
handler = MockHandler()
testing_config.sign_in('user@example.com', 123)
with self.assertRaises(werkzeug.exceptions.Forbidden):
handler.do_get()
with test_app.test_request_context('/path', method='POST'):
with self.assertRaises(werkzeug.exceptions.Forbidden):
handler.do_post()
self.assertEqual(handler.called_with, None)
def test_require_admin_site__normal_user(self):
"""Wrapped method rejects call from normal user."""
def test_require_admin_site__googler(self):
"""Wrapped method rejects call from googler."""
handler = MockHandler()
testing_config.sign_in('user@google.com', 123)
with test_app.test_request_context('/path', method='POST'):
with self.assertRaises(werkzeug.exceptions.Forbidden):
handler.do_post()
self.assertEqual(handler.called_with, None)
def test_require_admin_site__admin(self):
"""Wrapped method accepts call from an admin user."""
handler = MockHandler()
testing_config.sign_in('admin@example.com', 123, is_admin=True)
handler.do_get()
self.assertEqual(handler.called_with, tuple())
with test_app.test_request_context('/path'):
actual_response = handler.do_get(123, 234)
self.assertEqual(handler.called_with, (123, 234))
self.assertEqual({'message': 'did get'}, actual_response)
with test_app.test_request_context('/path', method='POST'):
actual_response = handler.do_post(345, 456)
self.assertEqual(handler.called_with, (345, 456))
self.assertEqual({'message': 'did post'}, actual_response)
def test_require_admin_site__anon(self):
"""Wrapped method rejects call from anon."""
"""Wrapped method rejects call from anon, but offers sign-in."""
handler = MockHandler()
testing_config.sign_out()
with self.assertRaises(werkzeug.exceptions.Forbidden):
handler.do_get()
with test_app.test_request_context('/path'):
actual_response = handler.do_get(123, 234)
self.assertEqual(handler.called_with, None)
self.assertEqual(302, actual_response.status_code)
with test_app.test_request_context('/path', method='POST'):
with self.assertRaises(werkzeug.exceptions.Forbidden):
handler.do_post(345, 456)
self.assertEqual(handler.called_with, None)

Просмотреть файл

@ -54,7 +54,7 @@ class PopulateSubscribersHandler(basehandlers.FlaskHandler):
user.put()
f.close()
@permissions.require_edit_permission
@permissions.require_admin_site
def get_template_data(self):
if settings.PROD:
return 'Handler not allowed in production.'
@ -88,7 +88,7 @@ class BlinkHandler(basehandlers.FlaskHandler):
return True
@permissions.require_edit_permission
@permissions.require_admin_site
def get_template_data(self):
components = models.BlinkComponent.all().order('name').fetch(None)
subscribers = models.FeatureOwner.all().order('name').fetch(None)
@ -110,7 +110,7 @@ class BlinkHandler(basehandlers.FlaskHandler):
return template_data
# Remove user from component subscribers.
@permissions.require_edit_permission
@permissions.require_admin_site
def put(self):
params = self.request.get_json(force=True)
self.__update_subscribers_list(False, user_id=params.get('userId'),
@ -119,7 +119,7 @@ class BlinkHandler(basehandlers.FlaskHandler):
return {'done': True}
# Add user to component subscribers.
@permissions.require_edit_permission
@permissions.require_admin_site
def process_post_data(self):
params = self.request.get_json(force=True)
@ -133,7 +133,7 @@ class SubscribersHandler(basehandlers.FlaskHandler):
TEMPLATE_PATH = 'admin/subscribers.html'
@permissions.require_edit_permission
@permissions.require_admin_site
def get_template_data(self):
users = models.FeatureOwner.all().order('name').fetch(None)
feature_list = models.Feature.get_chronological()

Просмотреть файл

@ -21,6 +21,7 @@ import logging
import settings
from framework import basehandlers
from framework import permissions
from framework import utils
import models
from framework import ramcache
@ -36,7 +37,7 @@ class FeaturesJsonHandler(basehandlers.FlaskHandler):
def get_template_data(self, version=2):
user = users.get_current_user()
feature_list = models.Feature.get_chronological(
version=version, show_unlisted=self.user_can_edit(user))
version=version, show_unlisted=permissions.can_view_feature(user, None))
return feature_list

Просмотреть файл

@ -31,6 +31,7 @@ from google.appengine.api import users
from google.appengine.ext import db
from framework import basehandlers
from framework import permissions
from framework import utils
from pages import guideforms
import models
@ -111,13 +112,9 @@ class FeatureNew(basehandlers.FlaskHandler):
TEMPLATE_PATH = 'guide/new.html'
@permissions.require_edit_feature
def get_template_data(self):
user = users.get_current_user()
if user is None:
return self.redirect(users.create_login_url(self.request.path))
if not self.user_can_edit(user):
self.abort(403)
user = self.get_current_user()
new_feature_form = guideforms.NewFeatureForm(
initial={'owner': user.email()})
@ -126,11 +123,8 @@ class FeatureNew(basehandlers.FlaskHandler):
}
return template_data
@permissions.require_edit_feature
def process_post_data(self):
user = users.get_current_user()
if user is None or (user and not self.user_can_edit(user)):
self.abort(403)
owners = self.split_emails('owner')
blink_components = (
@ -174,15 +168,8 @@ class ProcessOverview(basehandlers.FlaskHandler):
progress_so_far[progress_item] = str(detected)
return progress_so_far
@permissions.require_edit_feature
def get_template_data(self, feature_id):
user = users.get_current_user()
if user is None:
# Redirect to public URL for unauthenticated users.
return self.redirect(utils.format_feature_url(feature_id))
if not self.user_can_edit(user):
self.abort(403)
f = models.Feature.get_by_id(long(feature_id))
if f is None:
self.abort(404)
@ -246,15 +233,8 @@ class FeatureEditStage(basehandlers.FlaskHandler):
return f, feature_process
@permissions.require_edit_feature
def get_template_data(self, feature_id, stage_id):
user = users.get_current_user()
if user is None:
# Redirect to public URL for unauthenticated users.
return self.redirect(utils.format_feature_url(feature_id))
if not self.user_can_edit(user):
self.abort(403)
f, feature_process = self.get_feature_and_process(feature_id)
stage_name = ''
@ -296,11 +276,8 @@ class FeatureEditStage(basehandlers.FlaskHandler):
})
return template_data
@permissions.require_edit_feature
def process_post_data(self, feature_id, stage_id=0):
user = users.get_current_user()
if user is None or (user and not self.user_can_edit(user)):
self.abort(403)
if feature_id:
feature = models.Feature.get_by_id(feature_id)
if feature is None:
@ -533,15 +510,8 @@ class FeatureEditAllFields(FeatureEditStage):
TEMPLATE_PATH = 'guide/editall.html'
@permissions.require_edit_feature
def get_template_data(self, feature_id):
user = users.get_current_user()
if user is None:
# Redirect to public URL for unauthenticated users.
return self.redirect(utils.format_feature_url(feature_id))
if not self.user_can_edit(user):
self.abort(403)
f, feature_process = self.get_feature_and_process(feature_id)
feature_edit_dict = f.format_for_edit()

Просмотреть файл

@ -61,14 +61,14 @@ class FeatureNewTest(unittest.TestCase):
def test_post__anon(self):
"""Anon cannot create features, gets a 403."""
testing_config.sign_out()
with guide.app.test_request_context('/guide/new'):
with guide.app.test_request_context('/guide/new', method='POST'):
with self.assertRaises(werkzeug.exceptions.Forbidden):
self.handler.process_post_data()
def test_post__non_allowed(self):
"""Non-allowed cannot create features, gets a 403."""
testing_config.sign_in('user1@example.com', 1234567890)
with guide.app.test_request_context('/guide/new'):
with guide.app.test_request_context('/guide/new', method='POST'):
with self.assertRaises(werkzeug.exceptions.Forbidden):
self.handler.post()
@ -80,7 +80,7 @@ class FeatureNewTest(unittest.TestCase):
'category': '1',
'name': 'Feature name',
'summary': 'Feature summary',
}):
}, method='POST'):
actual_response = self.handler.process_post_data()
self.assertEqual('302 FOUND', actual_response.status)
@ -263,7 +263,7 @@ class FeatureEditStageTest(unittest.TestCase):
def test_post__anon(self):
"""Anon cannot edit features, gets a 403."""
testing_config.sign_out()
with guide.app.test_request_context(self.request_path):
with guide.app.test_request_context(self.request_path, method='POST'):
with self.assertRaises(werkzeug.exceptions.Forbidden):
self.handler.process_post_data(
self.feature_1.key().id(), self.stage)
@ -272,7 +272,7 @@ class FeatureEditStageTest(unittest.TestCase):
"""Non-allowed cannot edit features, gets a 403."""
testing_config.sign_in('user1@example.com', 1234567890)
with guide.app.test_request_context(self.request_path):
with self.assertRaises(werkzeug.exceptions.Forbidden):
with self.assertRaises(werkzeug.exceptions.Forbidden, method='POST'):
self.handler.process_post_data(
self.feature_1.key().id(), self.stage)

Просмотреть файл

@ -21,6 +21,7 @@ from google.appengine.api import users
import models
import settings
from framework import basehandlers
from framework import permissions
from internals import processes
INTENT_PARAM = 'intent'
@ -33,11 +34,8 @@ class IntentEmailPreviewHandler(basehandlers.FlaskHandler):
TEMPLATE_PATH = 'admin/features/launch.html'
@permissions.require_edit_feature
def get_template_data(self, feature_id=None, stage_id=None):
user = users.get_current_user()
if user is None:
return self.redirect(users.create_login_url(self.request.path))
if not feature_id:
self.abort(404)
f = models.Feature.get_by_id(feature_id)
@ -46,9 +44,6 @@ class IntentEmailPreviewHandler(basehandlers.FlaskHandler):
intent_stage = stage_id if stage_id is not None else f.intent_stage
if not self.user_can_edit(user):
self.abort(403)
page_data = self.get_page_data(feature_id, f, intent_stage)
return page_data

Просмотреть файл

@ -22,6 +22,7 @@ import json
import logging
import os
from framework import permissions
from framework import ramcache
import requests
from google.appengine.api import users
@ -97,7 +98,7 @@ class ScheduleHandler(basehandlers.FlaskHandler):
def get_template_data(self):
user = users.get_current_user()
features = models.Feature.get_chronological(
show_unlisted=self.user_can_edit(user))
show_unlisted=permissions.can_edit_any_feature(user))
template_data = {
'features': json.dumps(features),
'channels': json.dumps(construct_chrome_channels_details(),