Run mypy in Github continuous integration checks (#2324)

* run mypy in CI

* more type hint fixing

* Update requirements.txt

* google.cloud import workaround

https://github.com/python/mypy/issues/10360

* Revert "google.cloud import workaround"

This reverts commit e03dbc7bbeb67acc81f57b9c5d905b9141952dbc.

* mypy google.cloud workaround

https://github.com/python/mypy/issues/12985

* fix regular tests

* remove --install-types flag

* last mypy fix

* type fixes for address_reasons

* move email lists creation

* update for newly merged changes

* Fix regular tests

* Changes suggested by @jrobbins

* Catch invalid requests

* small type hint

* Change methods to properly override

* revert to previous implementation

* revert test

* add back original validate request type method

* remove unused import and rearrange methods

* remove comment string casting; add test back

* add ndb ignore

* Update test_html_rendering.html

* mypy fix

* remove merge addition
This commit is contained in:
Daniel Smith 2022-10-24 06:40:21 -07:00 коммит произвёл GitHub
Родитель 9a35f31c6e
Коммит 49fb21595d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
34 изменённых файлов: 150 добавлений и 118 удалений

3
.github/workflows/ci.yml поставляемый
Просмотреть файл

@ -41,6 +41,9 @@ jobs:
- name: lint
run: npm run lint
- name: mypy
run: mypy --ignore-missing-imports --exclude cs-env/ --exclude appengine_config.py .
- name: test
run: npm test

Просмотреть файл

@ -15,7 +15,7 @@
import logging
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from framework import basehandlers
from framework import permissions

Просмотреть файл

@ -14,7 +14,7 @@
# limitations under the License.
import logging
from typing import Any, Optional
from typing import Any
from framework import basehandlers
from framework import permissions
@ -24,7 +24,6 @@ from internals import notifier
def comment_to_json_dict(comment: Activity) -> dict[str, Any]:
return {
'comment_id': comment.key.id(),
'feature_id': comment.feature_id,
@ -56,8 +55,8 @@ class CommentsAPI(basehandlers.APIHandler):
is_admin = permissions.can_admin_site(user)
# Filter deleted comments the user can't see.
comments = filter(
lambda c: self._should_show_comment(c, user.email(), is_admin), comments)
comments = list(filter(
lambda c: self._should_show_comment(c, user.email(), is_admin), comments))
dicts = [comment_to_json_dict(c) for c in comments]
return {'comments': dicts}
@ -110,7 +109,7 @@ class CommentsAPI(basehandlers.APIHandler):
def do_patch(self, **kwargs) -> dict[str, str]:
comment_id = self.get_param('commentId', required=True)
comment: Optional[Activity] = Activity.get_by_id(comment_id)
comment: Activity = Activity.get_by_id(comment_id)
user = self.get_current_user(required=True)
if not permissions.can_admin_site(user) and (

Просмотреть файл

@ -27,7 +27,7 @@ from internals import search
class FeaturesAPI(basehandlers.APIHandler):
"""Features are the the main records that we track."""
def get_one_feature(self, feature_id: int) -> dict:
def get_one_feature(self, feature_id: int) -> dict[str, Any]:
features = core_models.Feature.get_by_ids([feature_id])
if not features:
self.abort(404, msg='Feature %r not found' % feature_id)
@ -64,7 +64,7 @@ class FeaturesAPI(basehandlers.APIHandler):
'features': features_on_page,
}
def do_get(self, **kwargs) -> dict:
def do_get(self, **kwargs) -> dict[str, Any]:
"""Handle GET requests for a single feature or a search."""
feature_id = kwargs.get('feature_id', None)
if feature_id:

Просмотреть файл

@ -18,13 +18,14 @@ import json
import logging
import os
import re
from typing import Optional
import flask
import flask.views
import werkzeug.exceptions
import google.appengine.api
from google.cloud import ndb
from google.cloud import ndb # type: ignore
import settings
from framework import csp
@ -268,8 +269,8 @@ class APIHandler(BaseHandler):
class FlaskHandler(BaseHandler):
TEMPLATE_PATH = None # Subclasses should define this.
HTTP_CACHE_TYPE = None # Subclasses can use 'public' or 'private'
TEMPLATE_PATH: Optional[str] = None # Subclasses should define this.
HTTP_CACHE_TYPE: Optional[str] = None # Subclasses can use 'public' or 'private'
JSONIFY = False # Set to True for JSON feeds.
IS_INTERNAL_HANDLER = False # Subclasses can skip XSRF check.
@ -362,7 +363,6 @@ class FlaskHandler(BaseHandler):
location = self.request.url.replace('www.', '', 1)
logging.info('Striping www and redirecting to %r', location)
return self.redirect(location)
handler_data = self.get_template_data(*args, **kwargs)
users.refresh_user_session()

Просмотреть файл

@ -528,19 +528,19 @@ class FlaskHandlerTests(testing_config.CustomTestCase):
actual)
def test_get_template_data__missing(self):
"""Every subsclass should overide get_template_data()."""
"""Every subclass should overide get_template_data()."""
self.handler = basehandlers.FlaskHandler()
with self.assertRaises(NotImplementedError):
self.handler.get_template_data()
def test_get_template_path__missing(self):
"""Subsclasses that don't define TEMPLATE_PATH trigger error."""
"""Subclasses that don't define TEMPLATE_PATH trigger error."""
self.handler = basehandlers.FlaskHandler()
with self.assertRaises(ValueError):
self.handler.get_template_path({})
def test_get_template_path__specified_in_class(self):
"""Subsclasses can define TEMPLATE_PATH."""
"""Subclasses can define TEMPLATE_PATH."""
actual = self.handler.get_template_path({})
self.assertEqual('test_template.html', actual)
@ -551,7 +551,7 @@ class FlaskHandlerTests(testing_config.CustomTestCase):
self.assertEqual('special.html', actual)
def test_process_post_data__missing(self):
"""Subsclasses that don't override process_post_data() give a 405."""
"""Subclasses that don't override process_post_data() give a 405."""
self.handler = basehandlers.FlaskHandler()
with self.assertRaises(werkzeug.exceptions.MethodNotAllowed):
self.handler.process_post_data()

Просмотреть файл

@ -26,7 +26,7 @@ import settings
if not settings.UNIT_TEST_MODE:
import grpc # See requirements.dev.txt.
from google.api_core import retry
from google.cloud import tasks
from google.cloud import tasks # type: ignore

Просмотреть файл

@ -17,7 +17,6 @@ import base64
import copy
import logging
import os
import six
import flask
@ -101,7 +100,7 @@ def get_csp_header_key():
def build_policy(policy):
"""Builds the CSP policy string from the internal representation."""
csp_directives = [
k + ' ' + ' '.join(v) for k, v in six.iteritems(policy) if v is not None
k + ' ' + ' '.join(v) for k, v in policy.items() if v is not None
]
if REPORT_URI:
csp_directives.append('report-uri %s' % REPORT_URI)

Просмотреть файл

@ -52,11 +52,11 @@ class CspTest(unittest.TestCase):
"""We can get the regular strict policy."""
policy = csp.get_default_policy(nonce='12345')
self.assertCountEqual(list(csp.DEFAULT_POLICY.keys()), list(policy.keys()))
self.assertIn('strict-dynamic', policy['script-src'])
self.assertIn('\'strict-dynamic\'', policy['script-src'])
self.assertIn("'nonce-12345'", policy['script-src'])
@mock.patch('framework.csp.USE_NONCE_ONLY_POLICY', True)
def test_get_default_policy__strict(self):
def test_get_default_policy__strict_two(self):
"""We can get the even stricter nonce-only policy."""
policy = csp.get_default_policy(nonce='12345')
self.assertCountEqual(list(csp.NONCE_ONLY_POLICY.keys()), list(policy.keys()))
@ -71,7 +71,7 @@ class CspTest(unittest.TestCase):
csp.get_csp_header_key())
@mock.patch('framework.csp.REPORT_ONLY', True)
def test_get_csp_header_key__enforced(self):
def test_get_csp_header_key__enforced_two(self):
"""We can get the header used when only reporting violations."""
self.assertEqual(
csp.HEADER_KEY_REPORT_ONLY,

Просмотреть файл

@ -20,7 +20,7 @@ import random
import string
import time
from google.cloud import ndb
from google.cloud import ndb # type: ignore
# For random key generation

Просмотреть файл

@ -35,7 +35,7 @@ API_OWNERS_URL = (
'main/third_party/blink/API_OWNERS?format=TEXT')
ApprovalFieldDef = collections.namedtuple(
'ApprovalField',
'ApprovalFieldDef',
'name, description, field_id, rule, approvers')
# Note: This can be requested manually through the UI, but it is not
@ -193,7 +193,8 @@ def set_vote(
set_on=now, set_by=set_by_email)
new_vote.put()
update_gate_approval_state(gate)
if gate:
update_gate_approval_state(gate)
def get_gate_by_type(feature_id: int, gate_type: int):
"""Return a single gate based on the feature and gate type."""

Просмотреть файл

@ -14,6 +14,7 @@
import collections
from typing import Optional
WEBCOMPONENTS = 1
@ -149,35 +150,35 @@ GATE_EXTEND_ORIGIN_TRIAL = 3
GATE_SHIP = 4
# Prototype stage types for every feature type.
STAGE_TYPES_PROTOTYPE = {
STAGE_TYPES_PROTOTYPE: dict[int, Optional[int]] = {
FEATURE_TYPE_INCUBATE_ID: STAGE_BLINK_PROTOTYPE,
FEATURE_TYPE_EXISTING_ID: STAGE_FAST_PROTOTYPE,
FEATURE_TYPE_CODE_CHANGE_ID: None,
FEATURE_TYPE_DEPRECATION_ID: None
}
# Dev trial stage types for every feature type.
STAGE_TYPES_DEV_TRIAL = {
STAGE_TYPES_DEV_TRIAL: dict[int, Optional[int]] = {
FEATURE_TYPE_INCUBATE_ID: STAGE_BLINK_DEV_TRIAL,
FEATURE_TYPE_EXISTING_ID: STAGE_FAST_DEV_TRIAL,
FEATURE_TYPE_CODE_CHANGE_ID: STAGE_PSA_DEV_TRIAL,
FEATURE_TYPE_DEPRECATION_ID: STAGE_DEP_DEV_TRIAL
}
# Origin trial stage types for every feature type.
STAGE_TYPES_ORIGIN_TRIAL = {
STAGE_TYPES_ORIGIN_TRIAL: dict[int, Optional[int]] = {
FEATURE_TYPE_INCUBATE_ID: STAGE_BLINK_ORIGIN_TRIAL,
FEATURE_TYPE_EXISTING_ID: STAGE_FAST_ORIGIN_TRIAL,
FEATURE_TYPE_CODE_CHANGE_ID: None,
FEATURE_TYPE_DEPRECATION_ID: STAGE_DEP_DEPRECATION_TRIAL
}
# Origin trial extension stage types for every feature type.
STAGE_TYPES_EXTEND_ORIGIN_TRIAL = {
STAGE_TYPES_EXTEND_ORIGIN_TRIAL: dict[int, Optional[int]] = {
FEATURE_TYPE_INCUBATE_ID: STAGE_BLINK_EXTEND_ORIGIN_TRIAL,
FEATURE_TYPE_EXISTING_ID: STAGE_FAST_EXTEND_ORIGIN_TRIAL,
FEATURE_TYPE_CODE_CHANGE_ID: None,
FEATURE_TYPE_DEPRECATION_ID: STAGE_DEP_EXTEND_DEPRECATION_TRIAL
}
# Shipping stage types for every feature type.
STAGE_TYPES_SHIPPING = {
STAGE_TYPES_SHIPPING: dict[int, Optional[int]] = {
FEATURE_TYPE_INCUBATE_ID: STAGE_BLINK_SHIPPING,
FEATURE_TYPE_EXISTING_ID: STAGE_FAST_SHIPPING,
FEATURE_TYPE_CODE_CHANGE_ID: STAGE_PSA_SHIPPING,
@ -185,7 +186,7 @@ STAGE_TYPES_SHIPPING = {
}
# Mapping of original field names to the new stage types the fields live on.
STAGE_TYPES_BY_FIELD_MAPPING = {
STAGE_TYPES_BY_FIELD_MAPPING: dict[str, dict[int, Optional[int]]] = {
'finch_url': STAGE_TYPES_SHIPPING,
'experiment_goals': STAGE_TYPES_ORIGIN_TRIAL,
'experiment_risks': STAGE_TYPES_ORIGIN_TRIAL,

Просмотреть файл

@ -22,7 +22,7 @@ import logging
import re
from typing import Any, Optional
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from framework import rediscache
from framework import users
@ -205,7 +205,7 @@ class Feature(DictModel):
self.migrate_views()
logging.info('In format_for_template for %s',
repr(self)[:settings.MAX_LOG_LINE])
d = self.to_dict()
d: dict[str, Any] = self.to_dict()
is_released = self.impl_status_chrome in RELEASE_IMPL_STATES
d['is_released'] = is_released
@ -494,7 +494,8 @@ class Feature(DictModel):
@classmethod
def get_by_ids(
self, feature_ids: list[int], update_cache: bool=False) -> list[dict]:
self, feature_ids: list[int],
update_cache: bool=False) -> list[dict[str, Any]]:
"""Return a list of JSON dicts for the specified features.
Because the cache may rarely have stale data, this should only be
@ -514,7 +515,7 @@ class Feature(DictModel):
result_dict[feature_id] = feature
for future in futures:
unformatted_feature = future.get_result()
unformatted_feature: Optional[Feature] = future.get_result()
if unformatted_feature and not unformatted_feature.deleted:
feature = unformatted_feature.format_for_template()
feature['updated_display'] = (
@ -525,7 +526,7 @@ class Feature(DictModel):
result_dict[unformatted_feature.key.integer_id()] = feature
result_list = [
result_dict.get(feature_id) for feature_id in feature_ids
result_dict[feature_id] for feature_id in feature_ids
if feature_id in result_dict]
return result_list
@ -632,7 +633,7 @@ class Feature(DictModel):
@classmethod
def get_in_milestone(self, milestone: int,
show_unlisted: bool=False) -> dict[int, list[dict[str, Any]]]:
show_unlisted: bool=False) -> dict[str, list[dict[str, Any]]]:
"""Return {reason: [feaure_dict]} with all the reasons a feature can
be part of a milestone.
@ -648,7 +649,7 @@ class Feature(DictModel):
if cached_features_by_type:
features_by_type = cached_features_by_type
else:
all_features: dict[int, list[Feature]] = {}
all_features: dict[str, list[Feature]] = {}
all_features[IMPLEMENTATION_STATUS[ENABLED_BY_DEFAULT]] = []
all_features[IMPLEMENTATION_STATUS[DEPRECATED]] = []
all_features[IMPLEMENTATION_STATUS[REMOVED]] = []
@ -782,10 +783,11 @@ class Feature(DictModel):
def crbug_number(self) -> Optional[str]:
if not self.bug_url:
return
return None
m = re.search(r'[\/|?id=]([0-9]+)$', self.bug_url)
if m:
return m.group(1)
return None
def new_crbug_url(self) -> str:
url = 'https://bugs.chromium.org/p/chromium/issues/entry'
@ -1132,7 +1134,7 @@ class FeatureEntry(ndb.Model): # Copy from Feature
@classmethod
def get_feature_entry(self, feature_id: int, update_cache: bool=False
) -> FeatureEntry:
) -> Optional[FeatureEntry]:
KEY = feature_cache_key(FeatureEntry.DEFAULT_CACHE_KEY, feature_id)
feature = rediscache.get(KEY)
@ -1174,7 +1176,7 @@ class FeatureEntry(ndb.Model): # Copy from Feature
procesing a POST to edit data. For editing use case, load the
data from NDB directly.
"""
result_dict = {}
result_dict: dict[int, int] = {}
futures = []
for fe_id in entry_ids:
@ -1192,9 +1194,8 @@ class FeatureEntry(ndb.Model): # Copy from Feature
rediscache.set(store_key, entry)
result_dict[entry.key.integer_id()] = entry
result_list = [
result_dict.get(fe_id) for fe_id in entry_ids
if fe_id in result_dict]
result_list = [result_dict[fe_id] for fe_id in entry_ids
if fe_id in result_dict]
return result_list
# Note: get_in_milestone will be in a new file legacy_queries.py.

Просмотреть файл

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any
from googleapiclient.discovery import build
from googleapiclient.discovery_cache.base import Cache
@ -20,7 +21,7 @@ import settings
from framework import basehandlers
class MemoryCache(Cache):
_CACHE = {}
_CACHE: dict[Any, Any] = {}
def get(self, url):
return MemoryCache._CACHE.get(url)

Просмотреть файл

@ -13,7 +13,7 @@
# limitations under the License.
from google.cloud import ndb
from google.cloud import ndb # type: ignore
# UMA metrics.

Просмотреть файл

@ -19,10 +19,11 @@ from datetime import datetime, timedelta
import collections
import logging
import os
from typing import Optional
import urllib
from framework import permissions
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from flask import escape
from flask import render_template
@ -33,8 +34,9 @@ from framework import users
import settings
from internals import approval_defs
from internals import core_enums
from internals import core_models
from internals import user_models
from internals.core_models import Feature
from internals.user_models import (
AppUser, BlinkComponent, FeatureOwner, UserPref)
def format_email_body(is_update, feature, changes):
@ -78,13 +80,11 @@ def format_email_body(is_update, feature, changes):
return body
def accumulate_reasons(addr_reasons, user_list, reason):
def accumulate_reasons(
addr_reasons: dict[str, list], addr_list: list[str], reason: str) -> None:
"""Add a reason string for each user."""
for user in user_list:
if type(user) == str:
addr_reasons[user].append(reason)
else:
addr_reasons[user.email].append(reason)
for email in addr_list:
addr_reasons[email].append(reason)
def convert_reasons_to_task(
@ -118,12 +118,13 @@ WEBVIEW_RULE_REASON = (
WEBVIEW_RULE_ADDRS = ['webview-leads-external@google.com']
def apply_subscription_rules(feature, changes):
def apply_subscription_rules(
feature: Feature, changes: list) -> dict[str, list[str]]:
"""Return {"reason": [addrs]} for users who set up rules."""
# Note: for now this is hard-coded, but it will eventually be
# configurable through some kind of user preference.
changed_field_names = {c['prop_name'] for c in changes}
results = {}
results: dict[str, list[str]] = {}
# Check if feature has some other milestone set, but not webview.
if (feature.shipped_android_milestone and
@ -135,10 +136,15 @@ def apply_subscription_rules(feature, changes):
return results
def make_email_tasks(feature, is_update=False, changes=[]):
def make_email_tasks(feature: Feature, is_update: bool=False,
changes: Optional[list]=None):
"""Return a list of task dicts to notify users of feature changes."""
feature_watchers = user_models.FeatureOwner.query(
user_models.FeatureOwner.watching_all_features == True).fetch(None)
if changes is None:
changes = []
watchers: list[FeatureOwner] = FeatureOwner.query(
FeatureOwner.watching_all_features == True).fetch(None)
watcher_emails: list[str] = [watcher.email for watcher in watchers]
email_html = format_email_body(is_update, feature, changes)
if is_update:
@ -148,7 +154,7 @@ def make_email_tasks(feature, is_update=False, changes=[]):
subject = 'new feature: %s' % feature.name
triggering_user_email = feature.created_by.email()
addr_reasons = collections.defaultdict(list) # [{email_addr: [reason,...]}]
addr_reasons: dict[str, list[str]] = collections.defaultdict(list)
accumulate_reasons(
addr_reasons, feature.owner,
@ -163,26 +169,27 @@ def make_email_tasks(feature, is_update=False, changes=[]):
'You are CC\'d on this feature'
)
accumulate_reasons(
addr_reasons, feature_watchers,
addr_reasons, watcher_emails,
'You are watching all feature changes')
# There will always be at least one component.
for component_name in feature.blink_components:
component = user_models.BlinkComponent.get_by_name(component_name)
component = BlinkComponent.get_by_name(component_name)
if not component:
logging.warning('Blink component "%s" not found.'
'Not sending email to subscribers' % component_name)
continue
owner_emails: list[str] = [owner.email for owner in component.owners]
subscriber_emails: list[str] = [sub.email for sub in component.subscribers]
accumulate_reasons(
addr_reasons, component.owners,
addr_reasons, owner_emails,
'You are an owner of this feature\'s component')
accumulate_reasons(
addr_reasons, component.subscribers,
addr_reasons, subscriber_emails,
'You subscribe to this feature\'s component')
starrers = FeatureStar.get_feature_starrers(feature.key.integer_id())
accumulate_reasons(addr_reasons, starrers, 'You starred this feature')
starrer_emails: list[str] = [user.email for user in starrers]
accumulate_reasons(addr_reasons, starrer_emails, 'You starred this feature')
rule_results = apply_subscription_rules(feature, changes)
for reason, sub_addrs in rule_results.items():
@ -223,7 +230,7 @@ class FeatureStar(ndb.Model):
return # No need to update anything in datastore
# Load feature directly from NDB so as to never get a stale cached copy.
feature = core_models.Feature.get_by_id(feature_id)
feature = Feature.get_by_id(feature_id)
feature.star_count += 1 if starred else -1
if feature.star_count < 0:
logging.error('count would be < 0: %r', (email, feature_id, starred))
@ -243,16 +250,16 @@ class FeatureStar(ndb.Model):
return sorted(feature_ids, reverse=True)
@classmethod
def get_feature_starrers(self, feature_id):
def get_feature_starrers(self, feature_id: int) -> list[UserPref]:
"""Return list of UserPref objects for starrers that want notifications."""
q = FeatureStar.query()
q = q.filter(FeatureStar.feature_id == feature_id)
q = q.filter(FeatureStar.starred == True)
feature_stars = q.fetch(None)
feature_stars: list[FeatureStar] = q.fetch(None)
logging.info('found %d stars for %r', len(feature_stars), feature_id)
emails = [fs.email for fs in feature_stars]
emails: list[str] = [fs.email for fs in feature_stars]
logging.info('looking up %r', repr(emails)[:settings.MAX_LOG_LINE])
user_prefs = user_models.UserPref.get_prefs_for_emails(emails)
user_prefs = UserPref.get_prefs_for_emails(emails)
user_prefs = [up for up in user_prefs
if up.notify_as_starrer and not up.bounced]
return user_prefs
@ -287,7 +294,7 @@ class NotifyInactiveUsersHandler(basehandlers.FlaskHandler):
if now is None:
now = datetime.now()
q = user_models.AppUser.query()
q = AppUser.query()
users = q.fetch()
inactive_users = []
inactive_cutoff = now - timedelta(days=self.INACTIVE_WARN_DAYS)
@ -341,7 +348,7 @@ class FeatureChangeHandler(basehandlers.FlaskHandler):
# Email feature subscribers if the feature exists and there were
# actually changes to it.
# Load feature directly from NDB so as to never get a stale cached copy.
feature = core_models.Feature.get_by_id(feature['id'])
feature = Feature.get_by_id(feature['id'])
if feature and (is_update and len(changes) or not is_update):
email_tasks = make_email_tasks(
feature, is_update=is_update, changes=changes)

Просмотреть файл

@ -20,7 +20,7 @@ from datetime import datetime
import flask
from unittest import mock
import werkzeug.exceptions # Flask HTTP stuff.
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from framework import users
@ -151,14 +151,15 @@ class EmailFormattingTest(testing_config.CustomTestCase):
self.assertEqual({}, addr_reasons)
# Adding some users builds up a bigger reason dictionary.
notifier.accumulate_reasons(addr_reasons, [self.component_owner_1],
'a reason')
notifier.accumulate_reasons(
addr_reasons, [self.component_owner_1.email], 'a reason')
self.assertEqual(
{'owner_1@example.com': ['a reason']},
addr_reasons)
notifier.accumulate_reasons(
addr_reasons, [self.component_owner_1, self.watcher_1], 'another reason')
addr_reasons, [self.component_owner_1.email, self.watcher_1.email],
'another reason')
self.assertEqual(
{'owner_1@example.com': ['a reason', 'another reason'],
'watcher_1@example.com': ['another reason'],

Просмотреть файл

@ -202,7 +202,7 @@ class ProgressDetectorsTest(testing_config.CustomTestCase):
self.feature_1.i2e_lgtms = ['api_owner@chromium.org']
self.assertTrue(detector(self.feature_1))
def test_one_i2e_lgtm(self):
def test_two_i2e_lgtm(self):
detector = processes.PROGRESS_DETECTORS[
'One LGTM on Request for Deprecation Trial']
self.assertFalse(detector(self.feature_1))

Просмотреть файл

@ -15,6 +15,7 @@
from datetime import datetime, timedelta
import json
import logging
from typing import Optional
import requests
from flask import render_template
@ -67,11 +68,11 @@ def build_email_tasks(
class AbstractReminderHandler(basehandlers.FlaskHandler):
JSONIFY = True
SUBJECT_FORMAT = '%s'
EMAIL_TEMPLATE_PATH = None # Subclasses must override
SUBJECT_FORMAT: Optional[str] = '%s'
EMAIL_TEMPLATE_PATH: Optional[str] = None # Subclasses must override
ANCHOR_CHANNEL = 'current' # the stable channel
FUTURE_MILESTONES_TO_CONSIDER = 0
MILESTONE_FIELDS = None # Subclasses must override
MILESTONE_FIELDS: Optional[tuple] = None # Subclasses must override
def get_template_data(self, **kwargs):
"""Sends notifications to users requesting feature updates for accuracy."""

Просмотреть файл

@ -21,7 +21,7 @@ from unittest import mock
from internals import core_models
from internals import reminders
from google.cloud import ndb
from google.cloud import ndb # type: ignore
test_app = flask.Flask(__name__,
template_folder=settings.get_flask_template_path())

Просмотреть файл

@ -19,7 +19,7 @@ from __future__ import annotations
import datetime
import logging
from typing import Optional
from google.cloud import ndb
from google.cloud import ndb # type: ignore
class Approval(ndb.Model):

Просмотреть файл

@ -13,7 +13,7 @@
# limitations under the License.
import logging
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from framework.basehandlers import FlaskHandler
from internals import approval_defs

Просмотреть файл

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud import ndb
from google.cloud import ndb # type: ignore
import testing_config # Must be imported before the module under test.
from datetime import datetime

Просмотреть файл

@ -12,9 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
# Import needed to reference a class within its own class method.
# https://stackoverflow.com/a/33533514
from __future__ import annotations
from google.cloud import ndb
import logging
from typing import Optional
from google.cloud import ndb # type: ignore
from framework import rediscache
from framework import users
@ -66,16 +71,16 @@ class UserPref(ndb.Model):
user_pref.put()
@classmethod
def get_prefs_for_emails(cls, emails):
def get_prefs_for_emails(cls, emails: list[str]) -> list[UserPref]:
"""Return a list of UserPrefs for each of the given emails."""
result = []
result: list[UserPref] = []
CHUNK_SIZE = 25 # Query 25 at a time because IN operator is limited to 30.
chunks = [emails[i : i + CHUNK_SIZE]
for i in range(0, len(emails), CHUNK_SIZE)]
for chunk_emails in chunks:
q = UserPref.query()
q = q.filter(UserPref.email.IN(chunk_emails))
chunk_prefs = q.fetch(None)
chunk_prefs: list[UserPref] = q.fetch(None)
result.extend(chunk_prefs)
found_set = set(up.email for up in chunk_prefs)
@ -113,7 +118,7 @@ class AppUser(ndb.Model):
rediscache.delete(cache_key)
@classmethod
def get_app_user(cls, email: str) -> users.User:
def get_app_user(cls, email: str) -> Optional[AppUser]:
"""Return the AppUser for the specified user, or None."""
cache_key = 'user|%s' % email
cached_app_user = rediscache.get(cache_key)
@ -122,11 +127,11 @@ class AppUser(ndb.Model):
query = cls.query()
query = query.filter(cls.email == email)
found_app_user_or_none = query.get()
if found_app_user_or_none is None:
found_app_user: Optional[AppUser] = query.get()
if found_app_user is None:
return None
rediscache.set(cache_key, found_app_user_or_none)
return found_app_user_or_none
rediscache.set(cache_key, found_app_user)
return found_app_user
def list_with_component(l, component):
@ -238,7 +243,7 @@ class BlinkComponent(ndb.Model):
c.put()
@classmethod
def get_by_name(self, component_name):
def get_by_name(self, component_name: str) -> Optional[BlinkComponent]:
"""Fetch blink component with given name."""
q = self.query()
q = q.filter(self.name == component_name)

13
main.py
Просмотреть файл

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from api import accounts_api
from api import approvals_api
from api import blink_components_api
@ -70,7 +71,7 @@ if not settings.UNIT_TEST_MODE and not settings.DEV_MODE:
# or not.
metrics_chart_routes = [
metrics_chart_routes: list[tuple] = [
('/data/timeline/cssanimated', metricsdata.AnimatedTimelineHandler),
('/data/timeline/csspopularity', metricsdata.PopularityTimelineHandler),
('/data/timeline/featurepopularity',
@ -83,7 +84,7 @@ metrics_chart_routes = [
# TODO(jrobbins): Advance this to v1 once we have it fleshed out
API_BASE = '/api/v0'
api_routes = [
api_routes: list[tuple] = [
(API_BASE + '/features', features_api.FeaturesAPI),
(API_BASE + '/features/<int:feature_id>', features_api.FeaturesAPI),
(API_BASE + '/features/<int:feature_id>/approvals',
@ -145,7 +146,7 @@ spa_pages = [
('/settings', {'require_signin': True}),
]
spa_page_routes = []
spa_page_routes: list[tuple] = []
for route in spa_pages:
page_defaults = {}
if isinstance(route, tuple):
@ -153,7 +154,7 @@ for route in spa_pages:
page_defaults.update(additional_defaults)
spa_page_routes.append((route, basehandlers.SPAHandler, page_defaults))
spa_page_post_routes = [
spa_page_post_routes: list[Any] = [
('/guide/new', guide.FeatureCreateHandler),
('/guide/edit/<int:feature_id>', guide.FeatureEditHandler),
('/guide/stage/<int:feature_id>/<int:stage_id>', guide.FeatureEditHandler),
@ -161,7 +162,7 @@ spa_page_post_routes = [
('/guide/verify_accuracy/<int:feature_id>', guide.FeatureEditHandler),
]
mpa_page_routes = [
mpa_page_routes: list[tuple] = [
('/admin/subscribers', blink_handler.SubscribersHandler),
('/admin/blink', blink_handler.BlinkHandler),
('/admin/users/new', users.UserListHandler),
@ -185,7 +186,7 @@ mpa_page_routes = [
('/omaha_data', metrics.OmahaDataHandler),
]
internals_routes = [
internals_routes: list[tuple] = [
('/cron/metrics', fetchmetrics.YesterdayHandler),
('/cron/histograms', fetchmetrics.HistogramsHandler),
('/cron/update_blink_components', fetchmetrics.BlinkComponentHandler),

Просмотреть файл

@ -21,7 +21,7 @@ import werkzeug
import html5lib
import settings
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from pages import blink_handler
from internals import user_models

Просмотреть файл

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from framework.basehandlers import FlaskHandler
import testing_config # Must be imported first
import os
@ -34,8 +36,8 @@ TESTDATA = testing_config.Testdata(__file__)
class TestWithFeature(testing_config.CustomTestCase):
REQUEST_PATH_FORMAT = 'subclasses fill this in'
HANDLER_CLASS = 'subclasses fill this in'
REQUEST_PATH_FORMAT: Optional[str] = None
HANDLER_CLASS: Optional[object] = None
def setUp(self):
self.app_user = user_models.AppUser(email='registered@example.com')
@ -51,10 +53,10 @@ class TestWithFeature(testing_config.CustomTestCase):
self.feature_1.put()
self.feature_id = self.feature_1.key.integer_id()
self.request_path = self.REQUEST_PATH_FORMAT % {
'feature_id': self.feature_id,
}
self.handler = self.HANDLER_CLASS()
self.request_path = (self.REQUEST_PATH_FORMAT %
{'feature_id': self.feature_id} if self.REQUEST_PATH_FORMAT else '')
if self.HANDLER_CLASS:
self.handler = self.HANDLER_CLASS()
def tearDown(self):
self.feature_1.key.delete()
@ -119,6 +121,7 @@ class FeatureListHandlerTest(TestWithFeature):
class FeatureListTemplateTest(TestWithFeature):
REQUEST_PATH_FORMAT = None
HANDLER_CLASS = featurelist.FeatureListHandler
def setUp(self):

Просмотреть файл

@ -16,7 +16,7 @@
from datetime import datetime
import logging
from typing import Any
from google.cloud import ndb
from google.cloud import ndb # type: ignore
# Appengine imports.
from framework import rediscache

Просмотреть файл

@ -22,7 +22,7 @@ import werkzeug
import html5lib
import settings
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from pages import intentpreview
from internals import core_enums
from internals import core_models

Просмотреть файл

@ -77,7 +77,7 @@ limitations under the License.
</head>
<body class="loading" data-path="/subclasses fill this in?">
<body class="loading" data-path="/?">
<div id="app-content-container">
<div>
@ -85,7 +85,7 @@ limitations under the License.
<div class="toolbar-content">
<chromedash-header
appTitle="Local testing"
currentPage="/subclasses fill this in?"
currentPage="/?"
googleSignInClientId="914217904764-enfcea61q4hqe7ak8kkuteglrbhk8el1.apps.googleusercontent.com">
</chromedash-header>
</div>

Просмотреть файл

@ -18,7 +18,7 @@ import flask
import html5lib
import settings
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from unittest import mock
from internals import user_models

Просмотреть файл

@ -5,3 +5,10 @@
coverage==5.5
gunicorn==20.1.0
# Mypy and dependencies
mypy==0.982
types-protobuf
types-redis
types-requests
types-setuptools

Просмотреть файл

@ -1,5 +1,6 @@
import logging
import os
from typing import Any, Optional
ROOT_DIR = os.path.abspath(os.path.dirname(__file__))
@ -11,7 +12,8 @@ def get_flask_template_path() -> str:
# By default, send all email to an archive for debugging.
# For the live cr-status server, this setting is None.
SEND_ALL_EMAIL_TO = 'cr-status-staging-emails+%(user)s+%(domain)s@google.com'
SEND_ALL_EMAIL_TO: Optional[str] = (
'cr-status-staging-emails+%(user)s+%(domain)s@google.com')
BOUNCE_ESCALATION_ADDR = 'cr-status-bounces@google.com'

Просмотреть файл

@ -16,7 +16,7 @@ import logging
import os
import unittest
from google.cloud import ndb
from google.cloud import ndb # type: ignore
from pathlib import Path
os.environ['SERVER_SOFTWARE'] = 'test ' + os.environ.get('SERVER_SOFTWARE', '')