Upgrade from GAE taskqueue to Google Cloud Tasks (#1141)

* Upgrade from GAE taskqueue to Google Cloud Tasks.

* added unit tests

* Add link to Monorail file that I referenced.

* Add grpcio to travis build

* Try installing grpcio locally first

* Try installing grpcio via sudo

* Try a smaller set of deps for travis.

* Debug travis config

* Avoided missing imports while running unit tests on travis

* Removed travis debugging lines
This commit is contained in:
Jason Robbins 2021-01-15 15:50:36 -08:00 коммит произвёл GitHub
Родитель beca20042d
Коммит c610e44e99
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 352 добавлений и 23 удалений

Просмотреть файл

@ -27,7 +27,7 @@ before_install:
install:
- npm install -g gulp
- npm install
- npm run deps
- npm run travis-deps
script:
- npm run lint
- npm run test

Просмотреть файл

@ -51,7 +51,7 @@ handlers:
- url: /tasks/.*
script: notifier.app
login: admin # Prevents raw access to this handler. Tasks runs as admin.
# Header checks prevent raw access to this handler. Tasks have headers.
- url: /_ah/bounce
script: notifier.app
@ -127,3 +127,7 @@ includes:
inbound_services:
- mail
- mail_bounce
libraries:
- name: grpcio
version: 1.0.0

Просмотреть файл

@ -1,10 +1,20 @@
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import os
import sys
# name of the django settings module
os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'
lib_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'lib')
from google.appengine.ext import vendor
vendor.add('lib') # add third party libs to "lib" folder.
vendor.add(lib_path) # add third party libs to "lib" folder.
# Add libraries to pkg_resources working set to find the distribution.
import pkg_resources
pkg_resources.working_set.add_entry(lib_path)
import six
reload(six)

112
cloud_tasks_helpers.py Normal file
Просмотреть файл

@ -0,0 +1,112 @@
from __future__ import division
from __future__ import print_function
# -*- coding: utf-8 -*-
# Copyright 2020 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This code is based on a file from Monorail:
# https://chromium.googlesource.com/infra/infra/+/master/appengine/monorail/framework/cloud_tasks_helpers.py
import logging
import json
from google.appengine.api import urlfetch
import settings
if not settings.UNIT_TEST_MODE:
import grpc # See requirements.dev.txt.
from google.api_core import retry
from google.cloud import tasks
_client = None
# Default exponential backoff retry config for enqueueing, not to be confused
# with retry config for dispatching, which exists per queue.
_DEFAULT_RETRY = None
if not settings.UNIT_TEST_MODE:
_DEFAULT_RETRY = retry.Retry(
initial=.1, maximum=1.6, multiplier=2, deadline=10)
class LocalCloudTasksClient(object):
"""We have no GCT server running locally, so hit the target synchronously."""
def queue_path(self, project, location, queue):
"""Return a fully-qualified queue string."""
# This is value is not actually used, but it might be good for debugging.
return "projects/{project}/locations/{location}/queues/{queue}".format(
project=project, location=location, queue=queue)
def create_task(self, unused_parent, task, **kwargs):
"""Immediately hit the target URL."""
uri = task.get('app_engine_http_request').get('relative_uri')
target_url = 'http://localhost:8080' + uri
body = task.get('app_engine_http_request').get('body')
logging.info('Making request to %r', target_url)
handler_response = urlfetch.fetch(
target_url, payload=body, method=urlfetch.POST,
follow_redirects=False,
# This header can only be set on internal requests, not by users.
headers={'X-AppEngine-QueueName': 'default'})
logging.info('Task handler status: %d', handler_response.status_code)
logging.info('Task handler text: %r', handler_response.content)
def _get_client():
"""Returns a cloud tasks client."""
global _client
if not _client:
if settings.DEV_MODE:
_client = LocalCloudTasksClient()
else:
_client = tasks.CloudTasksClient()
return _client
def _make_task(handler_path, task_params):
body_json = json.dumps(task_params)
return {
'app_engine_http_request': {
'relative_uri': handler_path,
'body': body_json,
}
}
def enqueue_task(handler_path, task_params, queue='default', **kwargs):
"""Enqueue a JSON task item for Google Cloud Tasks.
Args:
handler_path: Rooted path of the task handler.
task_params: Task parameters dict.
queue: A string indicating name of the queue to add task to.
kwargs: Additional arguments to pass to cloud task client's create_task
Returns:
Successfully created Task object.
"""
task = _make_task(handler_path, task_params)
client = _get_client()
parent = client.queue_path(
settings.APP_ID, settings.CLOUD_TASKS_REGION, queue)
target = task.get('app_engine_http_request').get('relative_uri')
logging.info('Enqueueing %s task to %s', target, parent)
kwargs.setdefault('retry', _DEFAULT_RETRY)
return client.create_task(parent, task, **kwargs)

Просмотреть файл

@ -290,6 +290,14 @@ class FlaskHandler(flask.views.MethodView):
"""Property for POST values dict."""
return flask.request.form
def require_task_header(self):
"""Abort if this is not a Google Cloud Tasks request."""
if settings.UNIT_TEST_MODE:
return
if 'X-AppEngine-QueueName' not in self.request.headers:
logging.info('Lacking X-AppEngine-QueueName header')
self.abort(403)
def split_input(self, field_name, delim='\\r?\\n'):
"""Split the input lines, strip whitespace, and skip blank lines."""
input_text = flask.request.form.get(field_name) or ''

3
dispatch.yaml Normal file
Просмотреть файл

@ -0,0 +1,3 @@
dispatch:
- url: "*/tasks/*"
service: notifier

Просмотреть файл

@ -29,7 +29,6 @@ from django import forms
import ramcache
from google.appengine.api import users
from google.appengine.ext import db
from google.appengine.api import taskqueue
# File imports.
import common

Просмотреть файл

@ -28,9 +28,10 @@ from google.appengine.ext import db
from google.appengine.api import mail
import ramcache
from google.appengine.api import urlfetch
from google.appengine.api import taskqueue
from google.appengine.api import users
import cloud_tasks_helpers
import common
import settings
import util
@ -944,17 +945,14 @@ class Feature(DictModel):
changed_props.append({
'prop_name': prop_name, 'old_val': old_val, 'new_val': new_val})
payload = json.dumps({
params = {
'changes': changed_props,
'is_update': is_update,
'feature': self.format_for_template(version=2)
})
}
# Create task to email subscribers.
queue = taskqueue.Queue()#name='emailer')
task = taskqueue.Task(method="POST", url='/tasks/email-subscribers',
target='notifier', payload=payload)
queue.add(task)
cloud_tasks_helpers.enqueue_task('/tasks/email-subscribers', params)
def put(self, notify=True, **kwargs):

Просмотреть файл

@ -30,12 +30,12 @@ from google.appengine.ext import db
from google.appengine.api import mail
from google.appengine.api import urlfetch
from google.appengine.api import users
from google.appengine.api import taskqueue
from google.appengine.ext.webapp.mail_handlers import BounceNotification
from django.template.loader import render_to_string
from django.utils.html import conditional_escape as escape
import cloud_tasks_helpers
import common
import settings
import models
@ -218,23 +218,25 @@ class FeatureChangeHandler(common.FlaskHandler):
"""This task handles a feature creation or update by making email tasks."""
def process_post_data(self):
self.require_task_header()
json_body = self.request.get_json(force=True)
feature = json_body.get('feature') or None
is_update = json_body.get('is_update') or False
changes = json_body.get('changes') or []
logging.info('Starting to notify subscribers for feature %r', feature)
# Email feature subscribers if the feature exists and there were
# actually changes to it.
feature = models.Feature.get_by_id(feature['id'])
if feature and (is_update and len(changes) or not is_update):
email_tasks = make_email_tasks(
feature, is_update=is_update, changes=changes)
logging.info('Processing %d email tasks', len(email_tasks))
for one_email_dict in email_tasks:
payload = json.dumps(one_email_dict)
task = taskqueue.Task(
method='POST', url='/tasks/outbound-email', payload=payload,
target='notifier')
taskqueue.Queue().add(task)
cloud_tasks_helpers.enqueue_task(
'/tasks/outbound-email', one_email_dict)
return {'message': 'Done'}
@ -243,6 +245,8 @@ class OutboundEmailHandler(common.FlaskHandler):
"""Task to send a notification email to one recipient."""
def process_post_data(self):
self.require_task_header()
json_body = self.request.get_json(force=True)
to = json_body['to']
subject = json_body['subject']

Просмотреть файл

@ -4,10 +4,14 @@ threadsafe: true
service: notifier
handlers:
- url: /.*
- url: /tasks/.*
script: notifier.app
login: admin
# Header checks prevent raw access to this handler. Tasks have headers.
includes:
- skip_files.yaml
- env_vars.yaml
libraries:
- name: grpcio
version: 1.0.0

Просмотреть файл

@ -8,6 +8,8 @@
},
"scripts": {
"deps": "pip install -t lib -r requirements.txt --upgrade",
"travis-deps": "pip install -t lib -r requirements.travis.txt --upgrade",
"dev-deps": "pip install -r requirements.dev.txt --upgrade",
"test": "python -m unittest discover -p *_test.py -s tests -b",
"coverage": "python -m coverage erase && python -m coverage run -m unittest discover -p *_test.py -s tests -b && python -m coverage html",
"lint": "gulp lint-fix && lit-analyzer \"static/elements/chromedash-!(featurelist)*.js\"",

5
requirements.dev.txt Normal file
Просмотреть файл

@ -0,0 +1,5 @@
# This must be installed on your local venv or system, not in lib.
# Via the command:
# python -m pip install --no-deps -r requirements.dev.txt
grpcio==1.31.0

8
requirements.travis.txt Normal file
Просмотреть файл

@ -0,0 +1,8 @@
Django==1.11.29
mock==3.0.5
funcsigs
coverage
Flask==1.1.2
# Google cloud tasks is not used on travis because one of its dependencies
# will not compiled. The unit tests use a fake object as the GCT client.

Просмотреть файл

@ -1,4 +1,27 @@
Django==1.11.29
mock==3.0.5
funcsigs
coverage
Flask==1.1.2
google-cloud-tasks==1.5.0
# Required by google-cloud-tasks
googleapis-common-protos==1.52.0
enum34==1.1.10
grpc-google-iam-v1==0.12.3
google-api-core==1.22.0
pytz==2020.1
google-auth==1.20.1
setuptools==44.1.1
requests==2.24.0
six==1.15.0
urllib3==1.25.10
certifi==2020.6.20
chardet==3.0.4
idna==2.10
pyasn1-modules==0.2.8
cachetools==3.1.1
rsa==4.5
# See also: requirements.dev.txt

Просмотреть файл

@ -32,4 +32,4 @@ gcloud app deploy \
--project $appName \
--version $deployVersion \
--no-promote \
$BASEDIR/../app.yaml $BASEDIR/../notifier.yaml
$BASEDIR/../app.yaml $BASEDIR/../notifier.yaml $BASEDIR/../dispatch.yaml

Просмотреть файл

@ -35,10 +35,12 @@ PROD = False
DEBUG = True
SEND_EMAIL = False # Just log email
DEV_MODE = os.environ['SERVER_SOFTWARE'].startswith('Development')
UNIT_TEST_MODE = os.environ['SERVER_SOFTWARE'].startswith('test')
APP_ID = app_identity.get_application_id()
SITE_URL = 'http://%s.appspot.com/' % APP_ID
CLOUD_TASKS_REGION = 'us-central1'
if APP_ID == 'testbed-test':
APP_TITLE = 'Local testing'

Просмотреть файл

@ -0,0 +1,104 @@
from __future__ import division
from __future__ import print_function
# Copyright 2020 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License")
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest
import testing_config # Must be imported before the module under test.
from google.appengine.api import urlfetch
import cloud_tasks_helpers
# Note that testing_config sets cloud_tasks_helpers._client to a fake.
class LocalCloudTasksClientTest(unittest.TestCase):
def setUp(self):
self.client = cloud_tasks_helpers.LocalCloudTasksClient()
def test_queue_path(self):
"""We get back a string like the kind that GCT uses."""
actual = self.client.queue_path('P', 'L', 'Q')
self.assertEqual(
'projects/P/locations/L/queues/Q',
actual)
@mock.patch('google.appengine.api.urlfetch.fetch')
def test_create_task(self, mock_fetch):
"""The local stub makes a synchronous HTTP request to the task handler."""
parent = 'parent'
task = cloud_tasks_helpers._make_task('/handler', {'a': 1})
mock_fetch.return_value = testing_config.Blank(
status_code=200, content='content')
actual = self.client.create_task(parent, task)
self.assertIsNone(actual)
mock_fetch.assert_called_once_with(
'http://localhost:8080/handler',
payload='{"a": 1}', method=urlfetch.POST,
follow_redirects=False,
headers={'X-AppEngine-QueueName': 'default'})
class CloudTasksHelpersTest(unittest.TestCase):
def test_get_client__unit_tests(self):
"""During unit testing, we are using a fake object."""
actual = cloud_tasks_helpers._get_client()
self.assertEqual(
testing_config.FakeCloudTasksClient,
type(actual))
@mock.patch('settings.DEV_MODE', True)
def test_get_client__dev_mode(self):
"""When running locally, we make a LocalCloudTasksClient."""
orig_client = cloud_tasks_helpers._client
try:
cloud_tasks_helpers._client = None
actual = cloud_tasks_helpers._get_client()
self.assertEqual(
cloud_tasks_helpers.LocalCloudTasksClient,
type(actual))
finally:
cloud_tasks_helpers._client = orig_client
def test_make_task(self):
"""We can make a task info dict in the expected format."""
handler_path = '/handler'
task_params = {'a': 1}
actual = cloud_tasks_helpers._make_task(handler_path, task_params)
self.assertEqual(
{ 'app_engine_http_request': {
'relative_uri': '/handler',
'body': '{"a": 1}',
}
},
actual)
def test_enqueue_task(self):
"""We can call the GCT client to enqueue a task."""
handler_path = '/handler'
task_params = {'a': 1}
actual = cloud_tasks_helpers.enqueue_task(handler_path, task_params)
self.assertEqual('fake task', actual)
self.assertEqual('/handler', cloud_tasks_helpers._client.uri)
self.assertEqual('{"a": 1}', cloud_tasks_helpers._client.body)

Просмотреть файл

@ -378,3 +378,22 @@ class FlaskHandlerTests(unittest.TestCase):
u = users.User(email='user@this-is-not.google.com')
actual = self.handler.user_can_edit(u)
self.assertFalse(actual)
def test_require_task_header__while_testing(self):
"""During unit testing of task handlers, we allow it."""
with test_app.test_request_context('/test'):
self.handler.require_task_header()
@mock.patch('settings.UNIT_TEST_MODE', False)
def test_require_task_header__normal(self):
"""If the incoming request is from GCT, we allow it."""
headers = {'X-AppEngine-QueueName': 'default'}
with test_app.test_request_context('/test', headers=headers):
self.handler.require_task_header()
@mock.patch('settings.UNIT_TEST_MODE', False)
def test_require_task_header__missing(self):
"""If the incoming request is not from GCT, abort."""
with test_app.test_request_context('/test'):
with self.assertRaises(werkzeug.exceptions.Forbidden):
self.handler.require_task_header()

Просмотреть файл

@ -15,6 +15,7 @@ from __future__ import print_function
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sys
import unittest
@ -35,7 +36,8 @@ import dev_appserver
dev_appserver.fix_sys_path()
lib_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'lib')
sys.path.insert(0, lib_path)
from google.appengine.ext import vendor
vendor.add(lib_path) # add third party libs to "lib" folder.
from google.appengine.ext import ndb
from google.appengine.ext import testbed
@ -62,7 +64,6 @@ def setUpOurTestbed():
ourTestbed.init_mail_stub()
ourTestbed.init_modules_stub()
ourTestbed.init_search_stub()
ourTestbed.init_taskqueue_stub()
ourTestbed.init_urlfetch_stub()
ourTestbed.init_user_stub()
ourTestbed.init_xmpp_stub()
@ -73,6 +74,29 @@ def setUpOurTestbed():
setUpOurTestbed()
import cloud_tasks_helpers
class FakeCloudTasksClient(object):
"""We have no GCT server for unit tests, so just log."""
def queue_path(self, project, location, queue):
"""Return a fully-qualified queue string."""
# This is value is not actually used, but it might be good for debugging.
return "projects/{project}/locations/{location}/queues/{queue}".format(
project=project, location=location, queue=queue)
def create_task(self, unused_parent, task, **kwargs):
"""Just log that the task would have been created URL."""
self.uri = task.get('app_engine_http_request').get('relative_uri')
self.body = task.get('app_engine_http_request').get('body')
logging.info('Task uri: %r', self.uri)
logging.info('Task body: %r', self.body)
return 'fake task'
cloud_tasks_helpers._client = FakeCloudTasksClient()
class Blank(object):
"""Simple class that assigns all named args to attributes.
Tip: supply a lambda to define a method.