Move Profile and RegisteredSubdomain

Move Profile and RegisteredSubdomain models, signals, and admin from
emails app to privaterelay app.
This commit is contained in:
John Whitlock 2024-06-07 17:31:51 -05:00
Родитель 82d1c66b80
Коммит dbe2ae826e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 082C735D154FB750
23 изменённых файлов: 1857 добавлений и 1638 удалений

Просмотреть файл

@ -1,13 +1,6 @@
from django.contrib import admin
from .models import (
DeletedAddress,
DomainAddress,
Profile,
RegisteredSubdomain,
RelayAddress,
Reply,
)
from .models import DeletedAddress, DomainAddress, RelayAddress, Reply
@admin.register(Reply)
@ -17,7 +10,5 @@ class ReplyAdmin(admin.ModelAdmin):
admin.site.register(DeletedAddress)
admin.site.register(Profile)
admin.site.register(RelayAddress)
admin.site.register(DomainAddress)
admin.site.register(RegisteredSubdomain)

Просмотреть файл

@ -74,9 +74,6 @@ class EmailsConfig(AppConfig):
terms.append(word)
return terms
def ready(self):
import emails.signals # noqa: F401 (imported but unused warning)
def emails_config() -> EmailsConfig:
emails_config = apps.get_app_config("emails")

Просмотреть файл

@ -5,14 +5,10 @@ from __future__ import annotations
from django.contrib.auth.models import User
from django.db.models import Q, QuerySet
from privaterelay.cleaner_task import (
CleanerTask,
DataBisectSpec,
DataModelSpec,
)
from privaterelay.cleaner_task import CleanerTask, DataBisectSpec, DataModelSpec
from privaterelay.signals import create_user_profile
from .models import DomainAddress, Profile, RelayAddress
from .signals import create_user_profile
class ServerStorageCleaner(CleanerTask):

Просмотреть файл

@ -1,22 +1,10 @@
"""Exceptions raised by emails app"""
from django.conf import settings
from django.core.exceptions import BadRequest
from api.exceptions import ErrorContextType, RelayAPIException
class CannotMakeSubdomainException(BadRequest):
"""Exception raised by Profile due to error on subdomain creation.
Attributes:
message -- optional explanation of the error
"""
def __init__(self, message: str | None = None) -> None:
self.message = message
class CannotMakeAddressException(RelayAPIException):
"""Base exception for RelayAddress or DomainAddress creation failure."""

Просмотреть файл

@ -2,7 +2,7 @@
from django.db import migrations, models
import emails.validators
from privaterelay.validators import valid_available_subdomain
class Migration(migrations.Migration):
@ -20,7 +20,7 @@ class Migration(migrations.Migration):
max_length=63,
null=True,
unique=True,
validators=[emails.validators.valid_available_subdomain],
validators=[valid_available_subdomain],
),
),
]

Просмотреть файл

@ -0,0 +1,23 @@
# Generated by Django 4.2.13 on 2024-06-07 22:08
# Move Profile and RegisteredSubdomain to privaterelay app
# Other half is
# privaterelay/migrations/0010_move_profile_and_registered_subdomain_models.py
# See https://davit.hashnode.dev/django-move-model
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("emails", "0061_relayaddress_idx_ra_created_by_addon"),
]
state_operations = [
migrations.RemoveField(model_name="profile", name="user"),
migrations.DeleteModel(name="RegisteredSubdomain"),
migrations.DeleteModel(name="Profile"),
]
operations = [
migrations.SeparateDatabaseAndState(state_operations=state_operations),
]

Просмотреть файл

@ -3,10 +3,8 @@ from __future__ import annotations
import logging
import random
import string
import uuid
from collections import namedtuple
from collections.abc import Iterable
from datetime import UTC, datetime, timedelta
from datetime import UTC, datetime
from hashlib import sha256
from typing import Any, Literal, cast
@ -15,23 +13,10 @@ from django.contrib.auth.models import User
from django.core.validators import MinLengthValidator
from django.db import models, transaction
from django.db.models.base import ModelBase
from django.db.models.query import QuerySet
from django.utils.translation.trans_real import (
get_supported_language_variant,
parse_accept_lang_header,
)
from allauth.socialaccount.models import SocialAccount
from privaterelay.plans import get_premium_countries
from privaterelay.utils import (
AcceptLanguageError,
flag_is_active_in_task,
guess_country_from_accept_lang,
)
from privaterelay.models import Profile, RegisteredSubdomain, hash_subdomain
from .exceptions import (
CannotMakeSubdomainException,
DomainAddrDuplicateException,
DomainAddrUnavailableException,
DomainAddrUpdateException,
@ -42,17 +27,12 @@ from .validators import (
check_user_can_make_domain_address,
is_blocklisted,
valid_address,
valid_available_subdomain,
)
if settings.PHONES_ENABLED:
from phones.models import RealPhone, RelayNumber
__all__ = ["Profile", "RegisteredSubdomain", "hash_subdomain"]
logger = logging.getLogger("events")
abuse_logger = logging.getLogger("abusemetrics")
BounceStatus = namedtuple("BounceStatus", "paused type")
DOMAIN_CHOICES = [(1, "RELAY_FIREFOX_DOMAIN"), (2, "MOZMAIL_DOMAIN")]
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
@ -109,518 +89,6 @@ def address_default() -> str:
)
def hash_subdomain(subdomain: str, domain: str = settings.MOZMAIL_DOMAIN) -> str:
return sha256(f"{subdomain}.{domain}".encode()).hexdigest()
class Profile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
api_token = models.UUIDField(default=uuid.uuid4)
num_address_deleted = models.PositiveIntegerField(default=0)
date_subscribed = models.DateTimeField(blank=True, null=True)
date_subscribed_phone = models.DateTimeField(blank=True, null=True)
# TODO MPP-2972: delete date_phone_subscription_checked in favor of
# date_phone_subscription_next_reset
date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
num_deleted_relay_addresses = models.PositiveIntegerField(default=0)
num_deleted_domain_addresses = models.PositiveIntegerField(default=0)
num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
default=0, null=True
)
num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
subdomain = models.CharField(
blank=True,
null=True,
unique=True,
max_length=63,
db_index=True,
validators=[valid_available_subdomain],
)
# Whether we store the user's alias labels in the server
server_storage = models.BooleanField(default=True)
# Whether we store the caller/sender log for the user's relay number
store_phone_log = models.BooleanField(default=True)
# TODO: Data migration to set null to false
# TODO: Schema migration to remove null=True
remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
onboarding_state = models.PositiveIntegerField(default=0)
onboarding_free_state = models.PositiveIntegerField(default=0)
auto_block_spam = models.BooleanField(default=False)
forwarded_first_reply = models.BooleanField(default=False)
# Empty string means the profile was created through relying party flow
created_by = models.CharField(blank=True, null=True, max_length=63)
sent_welcome_email = models.BooleanField(default=False)
last_engagement = models.DateTimeField(blank=True, null=True, db_index=True)
def __str__(self):
return f"{self.user} Profile"
def save(
self,
force_insert: bool | tuple[ModelBase, ...] = False,
force_update: bool = False,
using: str | None = None,
update_fields: Iterable[str] | None = None,
) -> None:
# always lower-case the subdomain before saving it
# TODO: change subdomain field as a custom field inheriting from
# CharField to validate constraints on the field update too
if self.subdomain and not self.subdomain.islower():
self.subdomain = self.subdomain.lower()
if update_fields is not None:
update_fields = {"subdomain"}.union(update_fields)
super().save(
force_insert=force_insert,
force_update=force_update,
using=using,
update_fields=update_fields,
)
# any time a profile is saved with server_storage False, delete the
# appropriate server-stored Relay address data.
if not self.server_storage:
relay_addresses = RelayAddress.objects.filter(user=self.user)
relay_addresses.update(description="", generated_for="", used_on="")
domain_addresses = DomainAddress.objects.filter(user=self.user)
domain_addresses.update(description="", used_on="")
if settings.PHONES_ENABLED:
# any time a profile is saved with store_phone_log False, delete the
# appropriate server-stored InboundContact records
from phones.models import InboundContact, RelayNumber
if not self.store_phone_log:
try:
relay_number = RelayNumber.objects.get(user=self.user)
InboundContact.objects.filter(relay_number=relay_number).delete()
except RelayNumber.DoesNotExist:
pass
@property
def language(self):
if self.fxa and self.fxa.extra_data.get("locale"):
for accept_lang, _ in parse_accept_lang_header(
self.fxa.extra_data.get("locale")
):
try:
return get_supported_language_variant(accept_lang)
except LookupError:
continue
return "en"
# This method returns whether the locale associated with the user's Mozilla account
# includes a country code from a Premium country. This is less accurate than using
# get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
# prefer using that if a request context is available. In other contexts, for
# example when sending an email, this method can be useful.
@property
def fxa_locale_in_premium_country(self) -> bool:
if self.fxa and self.fxa.extra_data.get("locale"):
try:
country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
except AcceptLanguageError:
return False
premium_countries = get_premium_countries()
if country in premium_countries:
return True
return False
@property
def avatar(self) -> str | None:
if fxa := self.fxa:
return str(fxa.extra_data.get("avatar"))
return None
@property
def relay_addresses(self) -> QuerySet[RelayAddress]:
return RelayAddress.objects.filter(user=self.user)
@property
def domain_addresses(self) -> QuerySet[DomainAddress]:
return DomainAddress.objects.filter(user=self.user)
@property
def total_masks(self) -> int:
ra_count: int = self.relay_addresses.count()
da_count: int = self.domain_addresses.count()
return ra_count + da_count
@property
def at_mask_limit(self) -> bool:
if self.has_premium:
return False
ra_count: int = self.relay_addresses.count()
return ra_count >= settings.MAX_NUM_FREE_ALIASES
def check_bounce_pause(self) -> BounceStatus:
if self.last_hard_bounce:
last_hard_bounce_allowed = datetime.now(UTC) - timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS
)
if self.last_hard_bounce > last_hard_bounce_allowed:
return BounceStatus(True, "hard")
self.last_hard_bounce = None
self.save()
if self.last_soft_bounce:
last_soft_bounce_allowed = datetime.now(UTC) - timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS
)
if self.last_soft_bounce > last_soft_bounce_allowed:
return BounceStatus(True, "soft")
self.last_soft_bounce = None
self.save()
return BounceStatus(False, "")
@property
def bounce_status(self) -> BounceStatus:
return self.check_bounce_pause()
@property
def next_email_try(self) -> datetime:
bounce_pause, bounce_type = self.check_bounce_pause()
if not bounce_pause:
return datetime.now(UTC)
if bounce_type == "soft":
if not self.last_soft_bounce:
raise ValueError("self.last_soft_bounce must be truthy value.")
return self.last_soft_bounce + timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS
)
if bounce_type != "hard":
raise ValueError("bounce_type must be either 'soft' or 'hard'")
if not self.last_hard_bounce:
raise ValueError("self.last_hard_bounce must be truthy value.")
return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
@property
def last_bounce_date(self):
if self.last_hard_bounce:
return self.last_hard_bounce
if self.last_soft_bounce:
return self.last_soft_bounce
return None
@property
def at_max_free_aliases(self) -> bool:
relay_addresses_count: int = self.relay_addresses.count()
return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
@property
def fxa(self) -> SocialAccount | None:
# Note: we are NOT using .filter() here because it invalidates
# any profile instances that were queried with prefetch_related, which
# we use in at least the profile view to minimize queries
if not hasattr(self.user, "socialaccount_set"):
raise AttributeError("self.user must have socialaccount_set attribute")
for sa in self.user.socialaccount_set.all():
if sa.provider == "fxa":
return sa
return None
@property
def display_name(self) -> str | None:
# if display name is not set on FxA the
# displayName key will not exist on the extra_data
if fxa := self.fxa:
name = fxa.extra_data.get("displayName")
return name if name is None else str(name)
return None
@property
def custom_domain(self) -> str:
if not self.subdomain:
raise ValueError("self.subdomain must be truthy value.")
return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
@property
def has_premium(self) -> bool:
if not self.user.is_active:
return False
# FIXME: as we don't have all the tiers defined we are over-defining
# this to mark the user as a premium user as well
if not self.fxa:
return False
for premium_domain in PREMIUM_DOMAINS:
if self.user.email.endswith(f"@{premium_domain}"):
return True
user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
if sub in user_subscriptions:
return True
return False
@property
def has_phone(self) -> bool:
if not self.fxa:
return False
if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
if not flag_is_active_in_task("phones", self.user):
return False
if flag_is_active_in_task("free_phones", self.user):
return True
user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
if sub in user_subscriptions:
return True
return False
@property
def has_vpn(self) -> bool:
if not self.fxa:
return False
user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
for sub in settings.SUBSCRIPTIONS_WITH_VPN:
if sub in user_subscriptions:
return True
return False
@property
def emails_forwarded(self) -> int:
return (
sum(ra.num_forwarded for ra in self.relay_addresses)
+ sum(da.num_forwarded for da in self.domain_addresses)
+ self.num_email_forwarded_in_deleted_address
)
@property
def emails_blocked(self) -> int:
return (
sum(ra.num_blocked for ra in self.relay_addresses)
+ sum(da.num_blocked for da in self.domain_addresses)
+ self.num_email_blocked_in_deleted_address
)
@property
def emails_replied(self) -> int:
ra_sum = self.relay_addresses.aggregate(models.Sum("num_replied", default=0))
da_sum = self.domain_addresses.aggregate(models.Sum("num_replied", default=0))
return (
int(ra_sum["num_replied__sum"])
+ int(da_sum["num_replied__sum"])
+ self.num_email_replied_in_deleted_address
)
@property
def level_one_trackers_blocked(self) -> int:
return (
sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
+ sum(
da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
)
+ (self.num_level_one_trackers_blocked_in_deleted_address or 0)
)
@property
def joined_before_premium_release(self):
date_created = self.user.date_joined
return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
@property
def date_phone_registered(self) -> datetime | None:
if not settings.PHONES_ENABLED:
return None
try:
real_phone = RealPhone.objects.get(user=self.user)
relay_number = RelayNumber.objects.get(user=self.user)
except RealPhone.DoesNotExist:
return None
except RelayNumber.DoesNotExist:
return real_phone.verified_date
return relay_number.created_at or real_phone.verified_date
def add_subdomain(self, subdomain):
# Handles if the subdomain is "" or None
if not subdomain:
raise CannotMakeSubdomainException(
"error-subdomain-cannot-be-empty-or-null"
)
# subdomain must be all lowercase
subdomain = subdomain.lower()
if not self.has_premium:
raise CannotMakeSubdomainException("error-premium-set-subdomain")
if self.subdomain is not None:
raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
self.subdomain = subdomain
# The validator defined in the subdomain field does not get run in full_clean()
# when self.subdomain is "" or None, so we need to run the validator again to
# catch these cases.
valid_available_subdomain(subdomain)
self.full_clean()
self.save()
RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
return subdomain
def update_abuse_metric(
self,
address_created: bool = False,
replied: bool = False,
email_forwarded: bool = False,
forwarded_email_size: int = 0,
) -> datetime | None:
if self.user.email in settings.ALLOWED_ACCOUNTS:
return None
with transaction.atomic():
# look for abuse metrics created on the same UTC date, regardless of time.
midnight_utc_today = datetime.combine(
datetime.now(UTC).date(), datetime.min.time()
).astimezone(UTC)
midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
abuse_metric = (
self.user.abusemetrics_set.select_for_update()
.filter(
first_recorded__gte=midnight_utc_today,
first_recorded__lt=midnight_utc_tomorow,
)
.first()
)
if not abuse_metric:
abuse_metric = AbuseMetrics.objects.create(user=self.user)
AbuseMetrics.objects.filter(
first_recorded__lt=midnight_utc_today
).delete()
# increment the abuse metric
if address_created:
abuse_metric.num_address_created_per_day += 1
if replied:
abuse_metric.num_replies_per_day += 1
if email_forwarded:
abuse_metric.num_email_forwarded_per_day += 1
if forwarded_email_size > 0:
abuse_metric.forwarded_email_size_per_day += forwarded_email_size
abuse_metric.last_recorded = datetime.now(UTC)
abuse_metric.save()
# check user should be flagged for abuse
hit_max_create = False
hit_max_replies = False
hit_max_forwarded = False
hit_max_forwarded_email_size = False
hit_max_create = (
abuse_metric.num_address_created_per_day
>= settings.MAX_ADDRESS_CREATION_PER_DAY
)
hit_max_replies = (
abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
)
hit_max_forwarded = (
abuse_metric.num_email_forwarded_per_day
>= settings.MAX_FORWARDED_PER_DAY
)
hit_max_forwarded_email_size = (
abuse_metric.forwarded_email_size_per_day
>= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
)
if (
hit_max_create
or hit_max_replies
or hit_max_forwarded
or hit_max_forwarded_email_size
):
self.last_account_flagged = datetime.now(UTC)
self.save()
data = {
"uid": self.fxa.uid if self.fxa else None,
"flagged": self.last_account_flagged.timestamp(),
"replies": abuse_metric.num_replies_per_day,
"addresses": abuse_metric.num_address_created_per_day,
"forwarded": abuse_metric.num_email_forwarded_per_day,
"forwarded_size_in_bytes": (
abuse_metric.forwarded_email_size_per_day
),
}
# log for further secops review
abuse_logger.info("Abuse flagged", extra=data)
return self.last_account_flagged
@property
def is_flagged(self):
if not self.last_account_flagged:
return False
account_premium_feature_resumed = self.last_account_flagged + timedelta(
days=settings.PREMIUM_FEATURE_PAUSED_DAYS
)
if datetime.now(UTC) > account_premium_feature_resumed:
# premium feature has been resumed
return False
# user was flagged and the premium feature pause period is not yet over
return True
@property
def metrics_enabled(self) -> bool:
"""
Does the user allow us to record technical and interaction data?
This is based on the Mozilla accounts opt-out option, added around 2022. A user
can go to their Mozilla account profile settings, Data Collection and Use, and
deselect "Help improve Mozilla Account". This setting defaults to On, and is
sent as "metricsEnabled". Some older Relay accounts do not have
"metricsEnabled", and we default to On.
"""
if self.fxa:
return bool(self.fxa.extra_data.get("metricsEnabled", True))
return True
@property
def plan(self) -> Literal["free", "email", "phone", "bundle"]:
"""The user's Relay plan as a string."""
if self.has_premium:
if self.has_phone:
return "bundle" if self.has_vpn else "phone"
else:
return "email"
else:
return "free"
@property
def plan_term(self) -> Literal[None, "unknown", "1_month", "1_year"]:
"""The user's Relay plan term as a string."""
plan = self.plan
if plan == "free":
return None
if plan == "phone":
start_date = self.date_phone_subscription_start
end_date = self.date_phone_subscription_end
if start_date and end_date:
span = end_date - start_date
return "1_year" if span.days > 32 else "1_month"
return "unknown"
@property
def metrics_premium_status(self) -> str:
plan = self.plan
if plan == "free":
return "free"
return f"{plan}_{self.plan_term}"
class RegisteredSubdomain(models.Model):
subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
registered_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.subdomain_hash
class RelayAddress(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
address = models.CharField(max_length=64, default=address_default, unique=True)

Просмотреть файл

@ -1,73 +0,0 @@
import logging
from hashlib import sha256
from typing import Any
from django.contrib.auth.models import User
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver
from rest_framework.authtoken.models import Token
from emails.models import Profile
from emails.utils import incr_if_enabled, set_user_group
info_logger = logging.getLogger("eventsinfo")
@receiver(post_save, sender=User)
def create_user_profile(
sender: type[User], instance: User, created: bool, **kwargs: Any
) -> None:
if created:
set_user_group(instance)
Profile.objects.create(user=instance)
@receiver(pre_save, sender=Profile)
def measure_feature_usage(
sender: type[Profile], instance: Profile, **kwargs: Any
) -> None:
if instance._state.adding:
# if newly created Profile ignore the signal
return
curr_profile = Profile.objects.get(id=instance.id)
# measure tracker removal usage
changed_tracker_removal_setting = (
instance.remove_level_one_email_trackers
!= curr_profile.remove_level_one_email_trackers
)
if changed_tracker_removal_setting:
if instance.remove_level_one_email_trackers:
incr_if_enabled("tracker_removal_enabled")
if not instance.remove_level_one_email_trackers:
incr_if_enabled("tracker_removal_disabled")
if instance.fxa:
# TODO create a utility function or property for hashed fxa uid
hashed_uid = sha256(instance.fxa.uid.encode("utf-8")).hexdigest()
else:
hashed_uid = "_no_fxa_"
info_logger.info(
"tracker_removal_feature",
extra={
"enabled": instance.remove_level_one_email_trackers,
"hashed_uid": hashed_uid,
},
)
@receiver(post_save, sender=Profile)
def copy_auth_token(
sender: type[Profile],
instance: Profile | None = None,
created: bool = False,
**kwargs: Any,
) -> None:
if created and instance is not None:
# baker triggers created during tests
# so first check the user doesn't already have a Token
try:
Token.objects.get(user=instance.user)
return
except Token.DoesNotExist:
Token.objects.create(user=instance.user, key=instance.api_token)

Просмотреть файл

@ -1,9 +1,7 @@
import random
from datetime import UTC, datetime, timedelta
from datetime import UTC, datetime
from hashlib import sha256
from unittest import skip
from unittest.mock import Mock, patch
from uuid import uuid4
from django.conf import settings
from django.contrib.auth.models import User
@ -18,23 +16,18 @@ from privaterelay.tests.utils import (
make_free_test_user,
make_premium_test_user,
make_storageless_test_user,
phone_subscription,
premium_subscription,
vpn_subscription,
)
from ..apps import BadWords
from ..exceptions import (
CannotMakeAddressException,
CannotMakeSubdomainException,
DomainAddrDuplicateException,
DomainAddrUnavailableException,
)
from ..models import (
AbuseMetrics,
DeletedAddress,
DomainAddress,
Profile,
RelayAddress,
address_hash,
get_domain_numerical,
@ -42,7 +35,7 @@ from ..models import (
from ..utils import get_domains_from_settings
if settings.PHONES_ENABLED:
from phones.models import RealPhone, RelayNumber
pass
class AddressHashTest(TestCase):
@ -351,792 +344,6 @@ class RelayAddressTest(TestCase):
assert relay_address.metrics_id == f"R{relay_address.id}"
class ProfileTestCase(TestCase):
"""Base class for Profile tests."""
def setUp(self) -> None:
user = baker.make(User)
self.profile = user.profile
assert self.profile.server_storage is True
def get_or_create_social_account(self) -> SocialAccount:
"""Get the test user's social account, creating if needed."""
social_account, _ = SocialAccount.objects.get_or_create(
user=self.profile.user,
provider="fxa",
defaults={
"uid": str(uuid4()),
"extra_data": {"avatar": "image.png", "subscriptions": []},
},
)
return social_account
def upgrade_to_premium(self) -> None:
"""Add an unlimited emails subscription to the user."""
social_account = self.get_or_create_social_account()
social_account.extra_data["subscriptions"].append(premium_subscription())
social_account.save()
def upgrade_to_phone(self) -> None:
"""Add a phone plan to the user."""
social_account = self.get_or_create_social_account()
social_account.extra_data["subscriptions"].append(phone_subscription())
if not self.profile.has_premium:
social_account.extra_data["subscriptions"].append(premium_subscription())
social_account.save()
def upgrade_to_vpn_bundle(self) -> None:
"""Add a phone plan to the user."""
social_account = self.get_or_create_social_account()
social_account.extra_data["subscriptions"].append(vpn_subscription())
if not self.profile.has_premium:
social_account.extra_data["subscriptions"].append(premium_subscription())
if not self.profile.has_phone:
social_account.extra_data["subscriptions"].append(phone_subscription())
social_account.save()
class ProfileBounceTestCase(ProfileTestCase):
"""Base class for Profile tests that check for bounces."""
def set_hard_bounce(self) -> datetime:
"""
Set a hard bounce pause for the profile, return the bounce time.
This happens when the user's email server reports a hard bounce, such as
saying the email does not exist.
"""
self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS - 1
)
self.profile.save()
return self.profile.last_hard_bounce
def set_soft_bounce(self) -> datetime:
"""
Set a soft bounce for the profile, return the bounce time.
This happens when the user's email server reports a soft bounce, such as
saying the user's mailbox is full.
"""
self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS - 1
)
self.profile.save()
return self.profile.last_soft_bounce
class ProfileCheckBouncePause(ProfileBounceTestCase):
"""Tests for Profile.check_bounce_pause()"""
def test_no_bounces(self) -> None:
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is False
assert bounce_type == ""
def test_hard_bounce_pending(self) -> None:
self.set_hard_bounce()
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is True
assert bounce_type == "hard"
def test_soft_bounce_pending(self) -> None:
self.set_soft_bounce()
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is True
assert bounce_type == "soft"
def test_hard_and_soft_bounce_pending_shows_hard(self) -> None:
self.set_hard_bounce()
self.set_soft_bounce()
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is True
assert bounce_type == "hard"
def test_hard_bounce_over_resets_timer(self) -> None:
self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS + 1
)
self.profile.save()
assert self.profile.last_hard_bounce is not None
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is False
assert bounce_type == ""
assert self.profile.last_hard_bounce is None
def test_soft_bounce_over_resets_timer(self) -> None:
self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS + 1
)
self.profile.save()
assert self.profile.last_soft_bounce is not None
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is False
assert bounce_type == ""
assert self.profile.last_soft_bounce is None
class ProfileNextEmailTryDateTest(ProfileBounceTestCase):
"""Tests for Profile.next_email_try"""
def test_no_bounces_returns_today(self) -> None:
assert self.profile.next_email_try.date() == datetime.now(UTC).date()
def test_hard_bounce_returns_proper_datemath(self) -> None:
last_hard_bounce = self.set_hard_bounce()
expected_next_try_date = last_hard_bounce + timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS
)
assert self.profile.next_email_try.date() == expected_next_try_date.date()
def test_soft_bounce_returns_proper_datemath(self) -> None:
last_soft_bounce = self.set_soft_bounce()
expected_next_try_date = last_soft_bounce + timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS
)
assert self.profile.next_email_try.date() == expected_next_try_date.date()
def test_hard_and_soft_bounce_returns_hard_datemath(self) -> None:
last_soft_bounce = self.set_soft_bounce()
last_hard_bounce = self.set_hard_bounce()
assert last_soft_bounce != last_hard_bounce
expected_next_try_date = last_hard_bounce + timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS
)
assert self.profile.next_email_try.date() == expected_next_try_date.date()
class ProfileLastBounceDateTest(ProfileBounceTestCase):
"""Tests for Profile.last_bounce_date"""
def test_no_bounces_returns_None(self) -> None:
assert self.profile.last_bounce_date is None
def test_soft_bounce_returns_its_date(self) -> None:
self.set_soft_bounce()
assert self.profile.last_bounce_date == self.profile.last_soft_bounce
def test_hard_bounce_returns_its_date(self) -> None:
self.set_hard_bounce()
assert self.profile.last_bounce_date == self.profile.last_hard_bounce
def test_hard_and_soft_bounces_returns_hard_date(self) -> None:
self.set_soft_bounce()
self.set_hard_bounce()
assert self.profile.last_bounce_date == self.profile.last_hard_bounce
class ProfileHasPremiumTest(ProfileTestCase):
"""Tests for Profile.has_premium"""
def test_default_False(self) -> None:
assert self.profile.has_premium is False
def test_premium_subscription_returns_True(self) -> None:
self.upgrade_to_premium()
assert self.profile.has_premium is True
def test_phone_returns_True(self) -> None:
self.upgrade_to_phone()
assert self.profile.has_premium is True
def test_vpn_bundle_returns_True(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.has_premium is True
class ProfileHasPhoneTest(ProfileTestCase):
"""Tests for Profile.has_phone"""
def test_default_False(self) -> None:
assert self.profile.has_phone is False
def test_premium_subscription_returns_False(self) -> None:
self.upgrade_to_premium()
assert self.profile.has_phone is False
def test_phone_returns_True(self) -> None:
self.upgrade_to_phone()
assert self.profile.has_phone is True
def test_vpn_bundle_returns_True(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.has_phone is True
@pytest.mark.skipif(not settings.PHONES_ENABLED, reason="PHONES_ENABLED is False")
@override_settings(PHONES_NO_CLIENT_CALLS_IN_TEST=True)
class ProfileDatePhoneRegisteredTest(ProfileTestCase):
"""Tests for Profile.date_phone_registered"""
def test_default_None(self) -> None:
assert self.profile.date_phone_registered is None
def test_real_phone_no_relay_number_returns_verified_date(self) -> None:
self.upgrade_to_phone()
datetime_now = datetime.now(UTC)
RealPhone.objects.create(
user=self.profile.user,
number="+12223334444",
verified=True,
verified_date=datetime_now,
)
assert self.profile.date_phone_registered == datetime_now
def test_real_phone_and_relay_number_w_created_at_returns_created_at_date(
self,
) -> None:
self.upgrade_to_phone()
datetime_now = datetime.now(UTC)
phone_user = self.profile.user
RealPhone.objects.create(
user=phone_user,
number="+12223334444",
verified=True,
verified_date=datetime_now,
)
relay_number = RelayNumber.objects.create(user=phone_user)
assert self.profile.date_phone_registered == relay_number.created_at
def test_real_phone_and_relay_number_wo_created_at_returns_verified_date(
self,
) -> None:
self.upgrade_to_phone()
datetime_now = datetime.now(UTC)
phone_user = self.profile.user
real_phone = RealPhone.objects.create(
user=phone_user,
number="+12223334444",
verified=True,
verified_date=datetime_now,
)
relay_number = RelayNumber.objects.create(user=phone_user)
# since created_at is auto set, update to None
relay_number.created_at = None
relay_number.save()
assert self.profile.date_phone_registered == real_phone.verified_date
class ProfileTotalMasksTest(ProfileTestCase):
"""Tests for Profile.total_masks"""
def test_total_masks(self) -> None:
self.upgrade_to_premium()
self.profile.add_subdomain("totalmasks")
assert self.profile.total_masks == 0
num_relay_addresses = random.randint(0, 2)
for _ in list(range(num_relay_addresses)):
baker.make(RelayAddress, user=self.profile.user)
num_domain_addresses = random.randint(0, 2)
for i in list(range(num_domain_addresses)):
baker.make(DomainAddress, user=self.profile.user, address=f"mask{i}")
assert self.profile.total_masks == num_relay_addresses + num_domain_addresses
class ProfileAtMaskLimitTest(ProfileTestCase):
"""Tests for Profile.at_mask_limit"""
def test_premium_user_returns_False(self) -> None:
self.upgrade_to_premium()
assert self.profile.at_mask_limit is False
baker.make(
RelayAddress,
user=self.profile.user,
_quantity=settings.MAX_NUM_FREE_ALIASES,
)
assert self.profile.at_mask_limit is False
def test_free_user(self) -> None:
assert self.profile.at_mask_limit is False
baker.make(
RelayAddress,
user=self.profile.user,
_quantity=settings.MAX_NUM_FREE_ALIASES,
)
assert self.profile.at_mask_limit is True
class ProfileAddSubdomainTest(ProfileTestCase):
"""Tests for Profile.add_subdomain()"""
def test_new_unlimited_profile(self) -> None:
self.upgrade_to_premium()
assert self.profile.add_subdomain("newpremium") == "newpremium"
def test_lowercases_subdomain_value(self) -> None:
self.upgrade_to_premium()
assert self.profile.add_subdomain("mIxEdcAsE") == "mixedcase"
def test_non_premium_user_raises_exception(self) -> None:
expected_msg = "error-premium-set-subdomain"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("test")
def test_calling_again_raises_exception(self) -> None:
self.upgrade_to_premium()
subdomain = "test"
self.profile.subdomain = subdomain
self.profile.save()
expected_msg = "error-premium-cannot-change-subdomain"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain(subdomain)
def test_badword_subdomain_raises_exception(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-not-available"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("angry")
def test_blocked_word_subdomain_raises_exception(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-not-available"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("mozilla")
def test_empty_subdomain_raises(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-cannot-be-empty-or-null"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("")
def test_null_subdomain_raises(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-cannot-be-empty-or-null"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain(None)
def test_subdomain_with_space_at_end_raises(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-not-available"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("mydomain ")
class ProfileSaveTest(ProfileTestCase):
"""Tests for Profile.save()"""
def test_lowercases_subdomain_value(self) -> None:
self.upgrade_to_premium()
self.profile.subdomain = "mIxEdcAsE"
self.profile.save()
assert self.profile.subdomain == "mixedcase"
def test_lowercases_subdomain_value_with_update_fields(self) -> None:
"""With update_fields, the subdomain is still lowercased."""
self.upgrade_to_premium()
assert self.profile.subdomain is None
# Use QuerySet.update to avoid model .save()
Profile.objects.filter(id=self.profile.id).update(subdomain="mIxEdcAsE")
self.profile.refresh_from_db()
assert self.profile.subdomain == "mIxEdcAsE"
# Update a different field with update_fields to avoid a full model save
new_date_subscribed = datetime(2023, 3, 3, tzinfo=UTC)
self.profile.date_subscribed = new_date_subscribed
self.profile.save(update_fields={"date_subscribed"})
# Since .save() added to update_fields, subdomain is now lowercase
self.profile.refresh_from_db()
assert self.profile.date_subscribed == new_date_subscribed
assert self.profile.subdomain == "mixedcase"
TEST_DESCRIPTION = "test description"
TEST_USED_ON = TEST_GENERATED_FOR = "secret.com"
def add_relay_address(self) -> RelayAddress:
return baker.make(
RelayAddress,
user=self.profile.user,
description=self.TEST_DESCRIPTION,
generated_for=self.TEST_GENERATED_FOR,
used_on=self.TEST_USED_ON,
)
def add_domain_address(self) -> DomainAddress:
self.upgrade_to_premium()
self.profile.subdomain = "somesubdomain"
self.profile.save()
return baker.make(
DomainAddress,
user=self.profile.user,
address="localpart",
description=self.TEST_DESCRIPTION,
used_on=self.TEST_USED_ON,
)
def test_save_server_storage_true_doesnt_delete_data(self) -> None:
relay_address = self.add_relay_address()
self.profile.server_storage = True
self.profile.save()
relay_address.refresh_from_db()
assert relay_address.description == self.TEST_DESCRIPTION
assert relay_address.generated_for == self.TEST_GENERATED_FOR
assert relay_address.used_on == self.TEST_USED_ON
def test_save_server_storage_false_deletes_data(self) -> None:
relay_address = self.add_relay_address()
domain_address = self.add_domain_address()
self.profile.server_storage = False
self.profile.save()
relay_address.refresh_from_db()
domain_address.refresh_from_db()
assert relay_address.description == ""
assert relay_address.generated_for == ""
assert relay_address.used_on == ""
assert domain_address.description == ""
assert domain_address.used_on == ""
def add_four_relay_addresses(self, user: User | None = None) -> list[RelayAddress]:
if user is None:
user = self.profile.user
return baker.make(
RelayAddress,
user=user,
description=self.TEST_DESCRIPTION,
generated_for=self.TEST_GENERATED_FOR,
used_on=self.TEST_USED_ON,
_quantity=4,
)
def test_save_server_storage_false_deletes_ALL_data(self) -> None:
self.add_four_relay_addresses()
self.profile.server_storage = False
self.profile.save()
for relay_address in RelayAddress.objects.filter(user=self.profile.user):
assert relay_address.description == ""
assert relay_address.generated_for == ""
def test_save_server_storage_false_only_deletes_that_profiles_data(self) -> None:
other_user = make_free_test_user()
assert other_user.profile.server_storage is True
self.add_four_relay_addresses()
self.add_four_relay_addresses(user=other_user)
self.profile.server_storage = False
self.profile.save()
for relay_address in RelayAddress.objects.filter(user=self.profile.user):
assert relay_address.description == ""
assert relay_address.generated_for == ""
assert relay_address.used_on == ""
for relay_address in RelayAddress.objects.filter(user=other_user):
assert relay_address.description == self.TEST_DESCRIPTION
assert relay_address.generated_for == self.TEST_GENERATED_FOR
assert relay_address.used_on == self.TEST_USED_ON
class ProfileDisplayNameTest(ProfileTestCase):
"""Tests for Profile.display_name"""
def test_exists(self) -> None:
display_name = "Display Name"
social_account = self.get_or_create_social_account()
social_account.extra_data["displayName"] = display_name
social_account.save()
assert self.profile.display_name == display_name
def test_display_name_does_not_exist(self) -> None:
self.get_or_create_social_account()
assert self.profile.display_name is None
class ProfileLanguageTest(ProfileTestCase):
"""Test Profile.language"""
def test_no_fxa_extra_data_locale_returns_default_en(self) -> None:
social_account = self.get_or_create_social_account()
assert "locale" not in social_account.extra_data
assert self.profile.language == "en"
def test_no_fxa_locale_returns_default_en(self) -> None:
assert self.profile.language == "en"
def test_fxa_locale_de_returns_de(self) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["locale"] = "de,en-US;q=0.9,en;q=0.8"
social_account.save()
assert self.profile.language == "de"
class ProfileFxaLocaleInPremiumCountryTest(ProfileTestCase):
"""Tests for Profile.fxa_locale_in_premium_country"""
def set_fxa_locale(self, locale: str) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["locale"] = locale
social_account.save()
def test_when_premium_available_returns_True(self) -> None:
self.set_fxa_locale("de-DE,en-xx;q=0.9,en;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
def test_en_implies_premium_available(self) -> None:
self.set_fxa_locale("en;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
def test_when_premium_unavailable_returns_False(self) -> None:
self.set_fxa_locale("en-IN, en;q=0.8")
assert self.profile.fxa_locale_in_premium_country is False
def test_when_premium_available_by_language_code_returns_True(self) -> None:
self.set_fxa_locale("de;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
def test_invalid_language_code_returns_False(self) -> None:
self.set_fxa_locale("xx;q=0.8")
assert self.profile.fxa_locale_in_premium_country is False
def test_when_premium_unavailable_by_language_code_returns_False(self) -> None:
self.set_fxa_locale("zh;q=0.8")
assert self.profile.fxa_locale_in_premium_country is False
def test_no_fxa_account_returns_False(self) -> None:
assert self.profile.fxa_locale_in_premium_country is False
def test_in_estonia(self):
"""Estonia (EE) was added in August 2023."""
self.set_fxa_locale("et-ee,et;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
class ProfileJoinedBeforePremiumReleaseTest(ProfileTestCase):
"""Tests for Profile.joined_before_premium_release"""
def test_returns_True(self) -> None:
before = "2021-10-18 17:00:00+00:00"
self.profile.user.date_joined = datetime.fromisoformat(before)
assert self.profile.joined_before_premium_release
def test_returns_False(self) -> None:
after = "2021-10-28 17:00:00+00:00"
self.profile.user.date_joined = datetime.fromisoformat(after)
assert self.profile.joined_before_premium_release is False
class ProfileDefaultsTest(ProfileTestCase):
"""Tests for default Profile values"""
def test_user_created_after_premium_release_server_storage_True(self) -> None:
assert self.profile.server_storage
def test_emails_replied_new_user_aggregates_sum_of_replies_to_zero(self) -> None:
assert self.profile.emails_replied == 0
class ProfileEmailsRepliedTest(ProfileTestCase):
"""Tests for Profile.emails_replied"""
def test_premium_user_aggregates_replies_from_all_addresses(self) -> None:
self.upgrade_to_premium()
self.profile.subdomain = "test"
self.profile.num_email_replied_in_deleted_address = 1
self.profile.save()
baker.make(RelayAddress, user=self.profile.user, num_replied=3)
baker.make(
DomainAddress, user=self.profile.user, address="lower-case", num_replied=5
)
assert self.profile.emails_replied == 9
def test_free_user_aggregates_replies_from_relay_addresses(self) -> None:
baker.make(RelayAddress, user=self.profile.user, num_replied=3)
baker.make(RelayAddress, user=self.profile.user, num_replied=5)
assert self.profile.emails_replied == 8
class ProfileUpdateAbuseMetricTest(ProfileTestCase):
"""Tests for Profile.update_abuse_metric()"""
def setUp(self) -> None:
super().setUp()
self.get_or_create_social_account()
self.abuse_metric = baker.make(AbuseMetrics, user=self.profile.user)
patcher_logger = patch("emails.models.abuse_logger.info")
self.mocked_abuse_info = patcher_logger.start()
self.addCleanup(patcher_logger.stop)
# Selectively patch datatime.now() for emails models
# https://docs.python.org/3/library/unittest.mock-examples.html#partial-mocking
patcher = patch("emails.models.datetime")
mocked_datetime = patcher.start()
self.addCleanup(patcher.stop)
self.expected_now = datetime.now(UTC)
mocked_datetime.combine.return_value = datetime.combine(
datetime.now(UTC).date(), datetime.min.time()
)
mocked_datetime.now.return_value = self.expected_now
mocked_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
@override_settings(MAX_FORWARDED_PER_DAY=5)
def test_flags_profile_when_emails_forwarded_abuse_threshold_met(self) -> None:
self.abuse_metric.num_email_forwarded_per_day = 4
self.abuse_metric.save()
assert self.profile.last_account_flagged is None
self.profile.update_abuse_metric(email_forwarded=True)
self.abuse_metric.refresh_from_db()
assert self.profile.fxa
self.mocked_abuse_info.assert_called_once_with(
"Abuse flagged",
extra={
"uid": self.profile.fxa.uid,
"flagged": self.expected_now.timestamp(),
"replies": 0,
"addresses": 0,
"forwarded": 5,
"forwarded_size_in_bytes": 0,
},
)
assert self.abuse_metric.num_email_forwarded_per_day == 5
assert self.profile.last_account_flagged == self.expected_now
@override_settings(MAX_FORWARDED_EMAIL_SIZE_PER_DAY=100)
def test_flags_profile_when_forwarded_email_size_abuse_threshold_met(self) -> None:
self.abuse_metric.forwarded_email_size_per_day = 50
self.abuse_metric.save()
assert self.profile.last_account_flagged is None
self.profile.update_abuse_metric(forwarded_email_size=50)
self.abuse_metric.refresh_from_db()
assert self.profile.fxa
self.mocked_abuse_info.assert_called_once_with(
"Abuse flagged",
extra={
"uid": self.profile.fxa.uid,
"flagged": self.expected_now.timestamp(),
"replies": 0,
"addresses": 0,
"forwarded": 0,
"forwarded_size_in_bytes": 100,
},
)
assert self.abuse_metric.forwarded_email_size_per_day == 100
assert self.profile.last_account_flagged == self.expected_now
class ProfileMetricsEnabledTest(ProfileTestCase):
def test_no_fxa_means_metrics_enabled(self) -> None:
assert not self.profile.fxa
assert self.profile.metrics_enabled
def test_fxa_legacy_means_metrics_enabled(self) -> None:
self.get_or_create_social_account()
assert self.profile.fxa
assert "metricsEnabled" not in self.profile.fxa.extra_data
assert self.profile.metrics_enabled
def test_fxa_opt_in_means_metrics_enabled(self) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["metricsEnabled"] = True
social_account.save()
assert self.profile.fxa
assert self.profile.metrics_enabled
def test_fxa_opt_out_means_metrics_disabled(self) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["metricsEnabled"] = False
social_account.save()
assert self.profile.fxa
assert not self.profile.metrics_enabled
class ProfilePlanTest(ProfileTestCase):
def test_free_user(self) -> None:
assert self.profile.plan == "free"
def test_premium_user(self) -> None:
self.upgrade_to_premium()
assert self.profile.plan == "email"
def test_phone_user(self) -> None:
self.upgrade_to_phone()
assert self.profile.plan == "phone"
def test_vpn_bundle_user(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.plan == "bundle"
class ProfilePlanTermTest(ProfileTestCase):
def test_free_user(self) -> None:
assert self.profile.plan_term is None
def test_premium_user(self) -> None:
self.upgrade_to_premium()
assert self.profile.plan_term == "unknown"
def test_phone_user(self) -> None:
self.upgrade_to_phone()
assert self.profile.plan_term == "unknown"
def test_phone_user_1_month(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
assert self.profile.plan_term == "1_month"
def test_phone_user_1_year(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
assert self.profile.plan_term == "1_year"
def test_vpn_bundle_user(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.plan_term == "unknown"
class ProfileMetricsPremiumStatus(ProfileTestCase):
def test_free_user(self):
assert self.profile.metrics_premium_status == "free"
def test_premium_user(self) -> None:
self.upgrade_to_premium()
assert self.profile.metrics_premium_status == "email_unknown"
def test_phone_user(self) -> None:
self.upgrade_to_phone()
assert self.profile.metrics_premium_status == "phone_unknown"
def test_phone_user_1_month(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
assert self.profile.metrics_premium_status == "phone_1_month"
def test_phone_user_1_year(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
assert self.profile.metrics_premium_status == "phone_1_year"
def test_vpn_bundle_user(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.metrics_premium_status == "bundle_unknown"
class DomainAddressTest(TestCase):
def setUp(self):
self.subdomain = "test"

Просмотреть файл

@ -1,75 +0,0 @@
from hashlib import sha256
from unittest.mock import patch
from django.test import TestCase
from privaterelay.tests.utils import make_free_test_user
from ..models import Profile
class MeasureFeatureUsageSignalTest(TestCase):
"""Test measure_feature_usage signal handler"""
def setUp(self) -> None:
user = make_free_test_user()
self.profile = user.profile
patcher_incr = patch("emails.signals.incr_if_enabled")
self.mocked_incr = patcher_incr.start()
self.addCleanup(patcher_incr.stop)
patcher_logger = patch("emails.signals.info_logger.info")
self.mocked_events_info = patcher_logger.start()
self.addCleanup(patcher_logger.stop)
def test_remove_level_one_email_trackers_enabled(self) -> None:
self.profile.remove_level_one_email_trackers = True
self.profile.save()
assert self.profile.fxa
expected_hashed_uid = sha256(self.profile.fxa.uid.encode("utf-8")).hexdigest()
self.mocked_incr.assert_called_once_with("tracker_removal_enabled")
self.mocked_events_info.assert_called_once_with(
"tracker_removal_feature",
extra={
"enabled": True,
"hashed_uid": expected_hashed_uid,
},
)
def test_remove_level_one_email_trackers_disabled(self) -> None:
Profile.objects.filter(id=self.profile.id).update(
remove_level_one_email_trackers=True
)
self.profile.refresh_from_db()
self.profile.remove_level_one_email_trackers = False
self.profile.save()
assert self.profile.fxa
expected_hashed_uid = sha256(self.profile.fxa.uid.encode("utf-8")).hexdigest()
self.mocked_incr.assert_called_once_with("tracker_removal_disabled")
self.mocked_events_info.assert_called_once_with(
"tracker_removal_feature",
extra={
"enabled": False,
"hashed_uid": expected_hashed_uid,
},
)
def test_remove_level_one_email_trackers_unchanged(self) -> None:
self.profile.remove_level_one_email_trackers = False
self.profile.save()
self.mocked_incr.assert_not_called()
self.mocked_events_info.assert_not_called()
def test_unmonitored_field_change_does_not_emit_metric_and_logs(self) -> None:
self.profile.server_storage = False
self.profile.save()
self.mocked_incr.assert_not_called()
self.mocked_events_info.assert_not_called()
def test_profile_created_does_not_emit_metric_and_logs(self) -> None:
self.mocked_incr.assert_not_called()
self.mocked_events_info.assert_not_called()

Просмотреть файл

@ -2,27 +2,18 @@
from unittest.mock import Mock, patch
from django.contrib.auth.models import User
from django.test import TestCase
from waffle.testutils import override_flag
from privaterelay.tests.utils import make_free_test_user, make_premium_test_user
from ..exceptions import CannotMakeSubdomainException
from ..models import (
DomainAddress,
Profile,
RegisteredSubdomain,
RelayAddress,
hash_subdomain,
)
from ..models import DomainAddress, RelayAddress
from ..validators import (
has_bad_words,
is_blocklisted,
valid_address,
valid_address_pattern,
valid_available_subdomain,
)
@ -67,82 +58,6 @@ class IsBlocklistedTest(TestCase):
assert is_blocklisted("blocked-word")
class ValidAvailableSubdomainTest(TestCase):
"""Tests for valid_available_subdomain()"""
ERR_NOT_AVAIL = "error-subdomain-not-available"
ERR_EMPTY_OR_NULL = "error-subdomain-cannot-be-empty-or-null"
def reserve_subdomain_for_new_user(self, subdomain: str) -> User:
user = make_premium_test_user()
user.profile.add_subdomain(subdomain)
return user
def test_bad_word_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("angry")
def test_blocked_word_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("mozilla")
def test_taken_subdomain_raises(self) -> None:
subdomain = "thisisfine"
self.reserve_subdomain_for_new_user(subdomain)
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain(subdomain)
def test_taken_subdomain_different_case_raises(self) -> None:
self.reserve_subdomain_for_new_user("thIsIsfInE")
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("THiSiSFiNe")
def test_inactive_subdomain_raises(self) -> None:
"""subdomains registered by now deleted profiles are not available."""
subdomain = "thisisfine"
user = self.reserve_subdomain_for_new_user(subdomain)
user.delete()
registered_subdomain_count = RegisteredSubdomain.objects.filter(
subdomain_hash=hash_subdomain(subdomain)
).count()
assert Profile.objects.filter(subdomain=subdomain).count() == 0
assert registered_subdomain_count == 1
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain(subdomain)
def test_subdomain_with_space_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("my domain")
def test_subdomain_with_special_char_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("my@domain")
def test_subdomain_with_dash_succeeds(self) -> None:
valid_available_subdomain("my-domain")
def test_subdomain_with_dash_at_front_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("-mydomain")
def test_empty_subdomain_raises(self) -> None:
with self.assertRaisesMessage(
CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL
):
valid_available_subdomain("")
def test_null_subdomain_raises(self) -> None:
with self.assertRaisesMessage(
CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL
):
valid_available_subdomain(None)
def test_subdomain_with_space_at_end_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("mydomain ")
class ValidAddressPatternTest(TestCase):
def test_valid_address_pattern_is_valid(self) -> None:
assert valid_address_pattern("foo")

Просмотреть файл

@ -1,7 +1,6 @@
"""Field validators for emails models."""
import re
from typing import Any
from django.contrib.auth.models import User
@ -11,7 +10,6 @@ from .apps import BadWords, emails_config
from .exceptions import (
AccountIsInactiveException,
AccountIsPausedException,
CannotMakeSubdomainException,
DomainAddrFreeTierException,
DomainAddrNeedSubdomainException,
RelayAddrFreeTierLimitException,
@ -22,11 +20,6 @@ from .exceptions import (
# must be 1-63 lowercase alphanumeric characters and/or hyphens
_re_valid_address = re.compile("^(?![-.])[a-z0-9-.]{1,63}(?<![-.])$")
# A valid subdomain:
# can't start or end with a hyphen
# must be 1-63 alphanumeric characters and/or hyphens
_re_valid_subdomain = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$")
def badwords() -> BadWords:
"""Allow mocking of badwords in tests."""
@ -104,29 +97,3 @@ def valid_address(address: str, domain: str, subdomain: str | None = None) -> bo
def valid_address_pattern(address: str) -> bool:
"""Return if the local/user part of an address is valid."""
return _re_valid_address.match(address) is not None
def valid_available_subdomain(subdomain: Any) -> None:
"""Raise CannotMakeSubdomainException if the subdomain fails a validation test."""
from .models import RegisteredSubdomain, hash_subdomain
if not subdomain:
raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null")
subdomain = str(subdomain).lower()
# valid subdomains:
# have to meet the rules for length and characters
valid = _re_valid_subdomain.match(subdomain) is not None
# can't have "bad" words in them
bad_word = has_bad_words(subdomain)
# can't have "blocked" words in them
blocked_word = is_blocklisted(subdomain)
# can't be taken by someone else
taken = (
RegisteredSubdomain.objects.filter(
subdomain_hash=hash_subdomain(subdomain)
).count()
> 0
)
if not valid or bad_word or blocked_word or taken:
raise CannotMakeSubdomainException("error-subdomain-not-available")

6
privaterelay/admin.py Normal file
Просмотреть файл

@ -0,0 +1,6 @@
from django.contrib import admin
from .models import Profile, RegisteredSubdomain
admin.site.register(Profile)
admin.site.register(RegisteredSubdomain)

Просмотреть файл

@ -0,0 +1,14 @@
"""Exceptions raised by the privaterelay app"""
from django.core.exceptions import BadRequest
class CannotMakeSubdomainException(BadRequest):
"""Exception raised by Profile due to error on subdomain creation.
Attributes:
message -- optional explanation of the error
"""
def __init__(self, message: str | None = None) -> None:
self.message = message

Просмотреть файл

@ -0,0 +1,164 @@
# Generated by Django 4.2.13 on 2024-06-07 22:27
# Move Profile and RegisteredSubdomain to privaterelay app
# Other half is
# emails/migrations/0062_move_profile_and_registered_subdomain_models.pyo
# See https://davit.hashnode.dev/django-move-model
import uuid
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
from privaterelay.validators import valid_available_subdomain
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("privaterelay", "0009_remove_duplicate_index"),
("emails", "0062_move_profile_and_registered_subdomain_models"),
]
state_operations = [
migrations.CreateModel(
name="RegisteredSubdomain",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"subdomain_hash",
models.CharField(db_index=True, max_length=64, unique=True),
),
("registered_at", models.DateTimeField(auto_now_add=True)),
],
options={
"db_table": "emails_registeredsubdomain",
},
),
migrations.CreateModel(
name="Profile",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("api_token", models.UUIDField(default=uuid.uuid4)),
("num_address_deleted", models.PositiveIntegerField(default=0)),
("date_subscribed", models.DateTimeField(blank=True, null=True)),
("date_subscribed_phone", models.DateTimeField(blank=True, null=True)),
(
"date_phone_subscription_checked",
models.DateTimeField(blank=True, null=True),
),
(
"date_phone_subscription_start",
models.DateTimeField(blank=True, null=True),
),
(
"date_phone_subscription_reset",
models.DateTimeField(blank=True, null=True),
),
(
"date_phone_subscription_end",
models.DateTimeField(blank=True, null=True),
),
(
"address_last_deleted",
models.DateTimeField(blank=True, db_index=True, null=True),
),
(
"last_soft_bounce",
models.DateTimeField(blank=True, db_index=True, null=True),
),
(
"last_hard_bounce",
models.DateTimeField(blank=True, db_index=True, null=True),
),
(
"last_account_flagged",
models.DateTimeField(blank=True, db_index=True, null=True),
),
("num_deleted_relay_addresses", models.PositiveIntegerField(default=0)),
(
"num_deleted_domain_addresses",
models.PositiveIntegerField(default=0),
),
(
"num_email_forwarded_in_deleted_address",
models.PositiveIntegerField(default=0),
),
(
"num_email_blocked_in_deleted_address",
models.PositiveIntegerField(default=0),
),
(
"num_level_one_trackers_blocked_in_deleted_address",
models.PositiveIntegerField(default=0, null=True),
),
(
"num_email_replied_in_deleted_address",
models.PositiveIntegerField(default=0),
),
(
"num_email_spam_in_deleted_address",
models.PositiveIntegerField(default=0),
),
(
"subdomain",
models.CharField(
blank=True,
db_index=True,
max_length=63,
null=True,
unique=True,
validators=[valid_available_subdomain],
),
),
("server_storage", models.BooleanField(default=True)),
("store_phone_log", models.BooleanField(default=True)),
(
"remove_level_one_email_trackers",
models.BooleanField(default=False, null=True),
),
("onboarding_state", models.PositiveIntegerField(default=0)),
("onboarding_free_state", models.PositiveIntegerField(default=0)),
("auto_block_spam", models.BooleanField(default=False)),
("forwarded_first_reply", models.BooleanField(default=False)),
("created_by", models.CharField(blank=True, max_length=63, null=True)),
("sent_welcome_email", models.BooleanField(default=False)),
(
"last_engagement",
models.DateTimeField(blank=True, db_index=True, null=True),
),
(
"user",
models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"db_table": "emails_profile",
},
),
]
operations = [
migrations.SeparateDatabaseAndState(state_operations=state_operations),
]

572
privaterelay/models.py Normal file
Просмотреть файл

@ -0,0 +1,572 @@
from __future__ import annotations
import logging
import uuid
from collections import namedtuple
from datetime import UTC, datetime, timedelta
from hashlib import sha256
from typing import TYPE_CHECKING, Literal
from django.conf import settings
from django.contrib.auth.models import User
from django.db import models, transaction
from django.utils.translation.trans_real import (
get_supported_language_variant,
parse_accept_lang_header,
)
from allauth.socialaccount.models import SocialAccount
from .exceptions import CannotMakeSubdomainException
from .plans import get_premium_countries
from .utils import (
AcceptLanguageError,
flag_is_active_in_task,
guess_country_from_accept_lang,
)
from .validators import valid_available_subdomain
if TYPE_CHECKING:
from collections.abc import Iterable
from django.db.models.base import ModelBase
from django.db.models.query import QuerySet
from emails.models import DomainAddress, RelayAddress
abuse_logger = logging.getLogger("abusemetrics")
BounceStatus = namedtuple("BounceStatus", "paused type")
PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"]
def hash_subdomain(subdomain: str, domain: str = settings.MOZMAIL_DOMAIN) -> str:
return sha256(f"{subdomain}.{domain}".encode()).hexdigest()
class Profile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
api_token = models.UUIDField(default=uuid.uuid4)
num_address_deleted = models.PositiveIntegerField(default=0)
date_subscribed = models.DateTimeField(blank=True, null=True)
date_subscribed_phone = models.DateTimeField(blank=True, null=True)
# TODO MPP-2972: delete date_phone_subscription_checked in favor of
# date_phone_subscription_next_reset
date_phone_subscription_checked = models.DateTimeField(blank=True, null=True)
date_phone_subscription_start = models.DateTimeField(blank=True, null=True)
date_phone_subscription_reset = models.DateTimeField(blank=True, null=True)
date_phone_subscription_end = models.DateTimeField(blank=True, null=True)
address_last_deleted = models.DateTimeField(blank=True, null=True, db_index=True)
last_soft_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
last_hard_bounce = models.DateTimeField(blank=True, null=True, db_index=True)
last_account_flagged = models.DateTimeField(blank=True, null=True, db_index=True)
num_deleted_relay_addresses = models.PositiveIntegerField(default=0)
num_deleted_domain_addresses = models.PositiveIntegerField(default=0)
num_email_forwarded_in_deleted_address = models.PositiveIntegerField(default=0)
num_email_blocked_in_deleted_address = models.PositiveIntegerField(default=0)
num_level_one_trackers_blocked_in_deleted_address = models.PositiveIntegerField(
default=0, null=True
)
num_email_replied_in_deleted_address = models.PositiveIntegerField(default=0)
num_email_spam_in_deleted_address = models.PositiveIntegerField(default=0)
subdomain = models.CharField(
blank=True,
null=True,
unique=True,
max_length=63,
db_index=True,
validators=[valid_available_subdomain],
)
# Whether we store the user's alias labels in the server
server_storage = models.BooleanField(default=True)
# Whether we store the caller/sender log for the user's relay number
store_phone_log = models.BooleanField(default=True)
# TODO: Data migration to set null to false
# TODO: Schema migration to remove null=True
remove_level_one_email_trackers = models.BooleanField(null=True, default=False)
onboarding_state = models.PositiveIntegerField(default=0)
onboarding_free_state = models.PositiveIntegerField(default=0)
auto_block_spam = models.BooleanField(default=False)
forwarded_first_reply = models.BooleanField(default=False)
# Empty string means the profile was created through relying party flow
created_by = models.CharField(blank=True, null=True, max_length=63)
sent_welcome_email = models.BooleanField(default=False)
last_engagement = models.DateTimeField(blank=True, null=True, db_index=True)
class Meta:
db_table = "emails_profile"
def __str__(self):
return f"{self.user} Profile"
def save(
self,
force_insert: bool | tuple[ModelBase, ...] = False,
force_update: bool = False,
using: str | None = None,
update_fields: Iterable[str] | None = None,
) -> None:
from emails.models import DomainAddress, RelayAddress
# always lower-case the subdomain before saving it
# TODO: change subdomain field as a custom field inheriting from
# CharField to validate constraints on the field update too
if self.subdomain and not self.subdomain.islower():
self.subdomain = self.subdomain.lower()
if update_fields is not None:
update_fields = {"subdomain"}.union(update_fields)
super().save(
force_insert=force_insert,
force_update=force_update,
using=using,
update_fields=update_fields,
)
# any time a profile is saved with server_storage False, delete the
# appropriate server-stored Relay address data.
if not self.server_storage:
relay_addresses = RelayAddress.objects.filter(user=self.user)
relay_addresses.update(description="", generated_for="", used_on="")
domain_addresses = DomainAddress.objects.filter(user=self.user)
domain_addresses.update(description="", used_on="")
if settings.PHONES_ENABLED:
# any time a profile is saved with store_phone_log False, delete the
# appropriate server-stored InboundContact records
from phones.models import InboundContact, RelayNumber
if not self.store_phone_log:
try:
relay_number = RelayNumber.objects.get(user=self.user)
InboundContact.objects.filter(relay_number=relay_number).delete()
except RelayNumber.DoesNotExist:
pass
@property
def language(self):
if self.fxa and self.fxa.extra_data.get("locale"):
for accept_lang, _ in parse_accept_lang_header(
self.fxa.extra_data.get("locale")
):
try:
return get_supported_language_variant(accept_lang)
except LookupError:
continue
return "en"
# This method returns whether the locale associated with the user's Mozilla account
# includes a country code from a Premium country. This is less accurate than using
# get_countries_info_from_request_and_mapping(), which can use a GeoIP lookup, so
# prefer using that if a request context is available. In other contexts, for
# example when sending an email, this method can be useful.
@property
def fxa_locale_in_premium_country(self) -> bool:
if self.fxa and self.fxa.extra_data.get("locale"):
try:
country = guess_country_from_accept_lang(self.fxa.extra_data["locale"])
except AcceptLanguageError:
return False
premium_countries = get_premium_countries()
if country in premium_countries:
return True
return False
@property
def avatar(self) -> str | None:
if fxa := self.fxa:
return str(fxa.extra_data.get("avatar"))
return None
@property
def relay_addresses(self) -> QuerySet[RelayAddress]:
from emails.models import RelayAddress
return RelayAddress.objects.filter(user=self.user)
@property
def domain_addresses(self) -> QuerySet[DomainAddress]:
from emails.models import DomainAddress
return DomainAddress.objects.filter(user=self.user)
@property
def total_masks(self) -> int:
ra_count: int = self.relay_addresses.count()
da_count: int = self.domain_addresses.count()
return ra_count + da_count
@property
def at_mask_limit(self) -> bool:
if self.has_premium:
return False
ra_count: int = self.relay_addresses.count()
return ra_count >= settings.MAX_NUM_FREE_ALIASES
def check_bounce_pause(self) -> BounceStatus:
if self.last_hard_bounce:
last_hard_bounce_allowed = datetime.now(UTC) - timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS
)
if self.last_hard_bounce > last_hard_bounce_allowed:
return BounceStatus(True, "hard")
self.last_hard_bounce = None
self.save()
if self.last_soft_bounce:
last_soft_bounce_allowed = datetime.now(UTC) - timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS
)
if self.last_soft_bounce > last_soft_bounce_allowed:
return BounceStatus(True, "soft")
self.last_soft_bounce = None
self.save()
return BounceStatus(False, "")
@property
def bounce_status(self) -> BounceStatus:
return self.check_bounce_pause()
@property
def next_email_try(self) -> datetime:
bounce_pause, bounce_type = self.check_bounce_pause()
if not bounce_pause:
return datetime.now(UTC)
if bounce_type == "soft":
if not self.last_soft_bounce:
raise ValueError("self.last_soft_bounce must be truthy value.")
return self.last_soft_bounce + timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS
)
if bounce_type != "hard":
raise ValueError("bounce_type must be either 'soft' or 'hard'")
if not self.last_hard_bounce:
raise ValueError("self.last_hard_bounce must be truthy value.")
return self.last_hard_bounce + timedelta(days=settings.HARD_BOUNCE_ALLOWED_DAYS)
@property
def last_bounce_date(self):
if self.last_hard_bounce:
return self.last_hard_bounce
if self.last_soft_bounce:
return self.last_soft_bounce
return None
@property
def at_max_free_aliases(self) -> bool:
relay_addresses_count: int = self.relay_addresses.count()
return relay_addresses_count >= settings.MAX_NUM_FREE_ALIASES
@property
def fxa(self) -> SocialAccount | None:
# Note: we are NOT using .filter() here because it invalidates
# any profile instances that were queried with prefetch_related, which
# we use in at least the profile view to minimize queries
if not hasattr(self.user, "socialaccount_set"):
raise AttributeError("self.user must have socialaccount_set attribute")
for sa in self.user.socialaccount_set.all():
if sa.provider == "fxa":
return sa
return None
@property
def display_name(self) -> str | None:
# if display name is not set on FxA the
# displayName key will not exist on the extra_data
if fxa := self.fxa:
name = fxa.extra_data.get("displayName")
return name if name is None else str(name)
return None
@property
def custom_domain(self) -> str:
if not self.subdomain:
raise ValueError("self.subdomain must be truthy value.")
return f"@{self.subdomain}.{settings.MOZMAIL_DOMAIN}"
@property
def has_premium(self) -> bool:
if not self.user.is_active:
return False
# FIXME: as we don't have all the tiers defined we are over-defining
# this to mark the user as a premium user as well
if not self.fxa:
return False
for premium_domain in PREMIUM_DOMAINS:
if self.user.email.endswith(f"@{premium_domain}"):
return True
user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
for sub in settings.SUBSCRIPTIONS_WITH_UNLIMITED:
if sub in user_subscriptions:
return True
return False
@property
def has_phone(self) -> bool:
if not self.fxa:
return False
if settings.RELAY_CHANNEL != "prod" and not settings.IN_PYTEST:
if not flag_is_active_in_task("phones", self.user):
return False
if flag_is_active_in_task("free_phones", self.user):
return True
user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
for sub in settings.SUBSCRIPTIONS_WITH_PHONE:
if sub in user_subscriptions:
return True
return False
@property
def has_vpn(self) -> bool:
if not self.fxa:
return False
user_subscriptions = self.fxa.extra_data.get("subscriptions", [])
for sub in settings.SUBSCRIPTIONS_WITH_VPN:
if sub in user_subscriptions:
return True
return False
@property
def emails_forwarded(self) -> int:
return (
sum(ra.num_forwarded for ra in self.relay_addresses)
+ sum(da.num_forwarded for da in self.domain_addresses)
+ self.num_email_forwarded_in_deleted_address
)
@property
def emails_blocked(self) -> int:
return (
sum(ra.num_blocked for ra in self.relay_addresses)
+ sum(da.num_blocked for da in self.domain_addresses)
+ self.num_email_blocked_in_deleted_address
)
@property
def emails_replied(self) -> int:
ra_sum = self.relay_addresses.aggregate(models.Sum("num_replied", default=0))
da_sum = self.domain_addresses.aggregate(models.Sum("num_replied", default=0))
return (
int(ra_sum["num_replied__sum"])
+ int(da_sum["num_replied__sum"])
+ self.num_email_replied_in_deleted_address
)
@property
def level_one_trackers_blocked(self) -> int:
return (
sum(ra.num_level_one_trackers_blocked or 0 for ra in self.relay_addresses)
+ sum(
da.num_level_one_trackers_blocked or 0 for da in self.domain_addresses
)
+ (self.num_level_one_trackers_blocked_in_deleted_address or 0)
)
@property
def joined_before_premium_release(self):
date_created = self.user.date_joined
return date_created < datetime.fromisoformat("2021-10-22 17:00:00+00:00")
@property
def date_phone_registered(self) -> datetime | None:
if not settings.PHONES_ENABLED:
return None
from phones.models import RealPhone, RelayNumber
try:
real_phone = RealPhone.objects.get(user=self.user)
relay_number = RelayNumber.objects.get(user=self.user)
except RealPhone.DoesNotExist:
return None
except RelayNumber.DoesNotExist:
return real_phone.verified_date
return relay_number.created_at or real_phone.verified_date
def add_subdomain(self, subdomain):
# Handles if the subdomain is "" or None
if not subdomain:
raise CannotMakeSubdomainException(
"error-subdomain-cannot-be-empty-or-null"
)
# subdomain must be all lowercase
subdomain = subdomain.lower()
if not self.has_premium:
raise CannotMakeSubdomainException("error-premium-set-subdomain")
if self.subdomain is not None:
raise CannotMakeSubdomainException("error-premium-cannot-change-subdomain")
self.subdomain = subdomain
# The validator defined in the subdomain field does not get run in full_clean()
# when self.subdomain is "" or None, so we need to run the validator again to
# catch these cases.
valid_available_subdomain(subdomain)
self.full_clean()
self.save()
RegisteredSubdomain.objects.create(subdomain_hash=hash_subdomain(subdomain))
return subdomain
def update_abuse_metric(
self,
address_created: bool = False,
replied: bool = False,
email_forwarded: bool = False,
forwarded_email_size: int = 0,
) -> datetime | None:
if self.user.email in settings.ALLOWED_ACCOUNTS:
return None
with transaction.atomic():
# look for abuse metrics created on the same UTC date, regardless of time.
midnight_utc_today = datetime.combine(
datetime.now(UTC).date(), datetime.min.time()
).astimezone(UTC)
midnight_utc_tomorow = midnight_utc_today + timedelta(days=1)
abuse_metric = (
self.user.abusemetrics_set.select_for_update()
.filter(
first_recorded__gte=midnight_utc_today,
first_recorded__lt=midnight_utc_tomorow,
)
.first()
)
if not abuse_metric:
from emails.models import AbuseMetrics
abuse_metric = AbuseMetrics.objects.create(user=self.user)
AbuseMetrics.objects.filter(
first_recorded__lt=midnight_utc_today
).delete()
# increment the abuse metric
if address_created:
abuse_metric.num_address_created_per_day += 1
if replied:
abuse_metric.num_replies_per_day += 1
if email_forwarded:
abuse_metric.num_email_forwarded_per_day += 1
if forwarded_email_size > 0:
abuse_metric.forwarded_email_size_per_day += forwarded_email_size
abuse_metric.last_recorded = datetime.now(UTC)
abuse_metric.save()
# check user should be flagged for abuse
hit_max_create = False
hit_max_replies = False
hit_max_forwarded = False
hit_max_forwarded_email_size = False
hit_max_create = (
abuse_metric.num_address_created_per_day
>= settings.MAX_ADDRESS_CREATION_PER_DAY
)
hit_max_replies = (
abuse_metric.num_replies_per_day >= settings.MAX_REPLIES_PER_DAY
)
hit_max_forwarded = (
abuse_metric.num_email_forwarded_per_day
>= settings.MAX_FORWARDED_PER_DAY
)
hit_max_forwarded_email_size = (
abuse_metric.forwarded_email_size_per_day
>= settings.MAX_FORWARDED_EMAIL_SIZE_PER_DAY
)
if (
hit_max_create
or hit_max_replies
or hit_max_forwarded
or hit_max_forwarded_email_size
):
self.last_account_flagged = datetime.now(UTC)
self.save()
data = {
"uid": self.fxa.uid if self.fxa else None,
"flagged": self.last_account_flagged.timestamp(),
"replies": abuse_metric.num_replies_per_day,
"addresses": abuse_metric.num_address_created_per_day,
"forwarded": abuse_metric.num_email_forwarded_per_day,
"forwarded_size_in_bytes": (
abuse_metric.forwarded_email_size_per_day
),
}
# log for further secops review
abuse_logger.info("Abuse flagged", extra=data)
return self.last_account_flagged
@property
def is_flagged(self):
if not self.last_account_flagged:
return False
account_premium_feature_resumed = self.last_account_flagged + timedelta(
days=settings.PREMIUM_FEATURE_PAUSED_DAYS
)
if datetime.now(UTC) > account_premium_feature_resumed:
# premium feature has been resumed
return False
# user was flagged and the premium feature pause period is not yet over
return True
@property
def metrics_enabled(self) -> bool:
"""
Does the user allow us to record technical and interaction data?
This is based on the Mozilla accounts opt-out option, added around 2022. A user
can go to their Mozilla account profile settings, Data Collection and Use, and
deselect "Help improve Mozilla Account". This setting defaults to On, and is
sent as "metricsEnabled". Some older Relay accounts do not have
"metricsEnabled", and we default to On.
"""
if self.fxa:
return bool(self.fxa.extra_data.get("metricsEnabled", True))
return True
@property
def plan(self) -> Literal["free", "email", "phone", "bundle"]:
"""The user's Relay plan as a string."""
if self.has_premium:
if self.has_phone:
return "bundle" if self.has_vpn else "phone"
else:
return "email"
else:
return "free"
@property
def plan_term(self) -> Literal[None, "unknown", "1_month", "1_year"]:
"""The user's Relay plan term as a string."""
plan = self.plan
if plan == "free":
return None
if plan == "phone":
start_date = self.date_phone_subscription_start
end_date = self.date_phone_subscription_end
if start_date and end_date:
span = end_date - start_date
return "1_year" if span.days > 32 else "1_month"
return "unknown"
@property
def metrics_premium_status(self) -> str:
plan = self.plan
if plan == "free":
return "free"
return f"{plan}_{self.plan_term}"
class RegisteredSubdomain(models.Model):
subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True)
registered_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.subdomain_hash
class Meta:
db_table = "emails_registeredsubdomain"
@classmethod
def is_taken(cls, subdomain: str) -> bool:
return cls.objects.filter(subdomain_hash=hash_subdomain(subdomain)).exists()

Просмотреть файл

@ -458,10 +458,12 @@ SUBSCRIPTIONS_WITH_VPN: list[str] = config(
MAX_ONBOARDING_AVAILABLE = config("MAX_ONBOARDING_AVAILABLE", 0, cast=int)
MAX_ONBOARDING_FREE_AVAILABLE = config("MAX_ONBOARDING_FREE_AVAILABLE", 3, cast=int)
MAX_ADDRESS_CREATION_PER_DAY = config("MAX_ADDRESS_CREATION_PER_DAY", 100, cast=int)
MAX_REPLIES_PER_DAY = config("MAX_REPLIES_PER_DAY", 100, cast=int)
MAX_FORWARDED_PER_DAY = config("MAX_FORWARDED_PER_DAY", 1000, cast=int)
MAX_FORWARDED_EMAIL_SIZE_PER_DAY = config(
MAX_ADDRESS_CREATION_PER_DAY: int = config(
"MAX_ADDRESS_CREATION_PER_DAY", 100, cast=int
)
MAX_REPLIES_PER_DAY: int = config("MAX_REPLIES_PER_DAY", 100, cast=int)
MAX_FORWARDED_PER_DAY: int = config("MAX_FORWARDED_PER_DAY", 1000, cast=int)
MAX_FORWARDED_EMAIL_SIZE_PER_DAY: int = config(
"MAX_FORWARDED_EMAIL_SIZE_PER_DAY", 1_000_000_000, cast=int
)
PREMIUM_FEATURE_PAUSED_DAYS: int = config(

Просмотреть файл

@ -1,8 +1,18 @@
import logging
from hashlib import sha256
from django.contrib.auth.models import User
from django.db.models.signals import post_save, pre_save
from django.dispatch import receiver
from allauth.account.signals import user_logged_in, user_signed_up
from rest_framework.authtoken.models import Token
from emails.utils import incr_if_enabled
from emails.utils import incr_if_enabled, set_user_group
from .models import Profile
info_logger = logging.getLogger("eventsinfo")
@receiver(user_signed_up)
@ -25,3 +35,49 @@ def record_user_logged_in(request, user, **kwargs):
event = "user_signed_up"
if response:
response.set_cookie(f"server_ga_event:{event}", event, max_age=5)
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
if created:
set_user_group(instance)
Profile.objects.create(user=instance)
@receiver(pre_save, sender=Profile)
def measure_feature_usage(sender, instance, **kwargs):
if instance._state.adding:
# if newly created Profile ignore the signal
return
curr_profile = Profile.objects.get(id=instance.id)
# measure tracker removal usage
changed_tracker_removal_setting = (
instance.remove_level_one_email_trackers
!= curr_profile.remove_level_one_email_trackers
)
if changed_tracker_removal_setting:
if instance.remove_level_one_email_trackers:
incr_if_enabled("tracker_removal_enabled")
if not instance.remove_level_one_email_trackers:
incr_if_enabled("tracker_removal_disabled")
info_logger.info(
"tracker_removal_feature",
extra={
"enabled": instance.remove_level_one_email_trackers,
# TODO create a utility function or property for hashed fxa uid
"hashed_uid": sha256(instance.fxa.uid.encode("utf-8")).hexdigest(),
},
)
@receiver(post_save, sender=Profile)
def copy_auth_token(sender, instance=None, created=False, **kwargs):
if created:
# baker triggers created during tests
# so first check the user doesn't already have a Token
try:
Token.objects.get(user=instance.user)
return
except Token.DoesNotExist:
Token.objects.create(user=instance.user, key=instance.api_token)

Просмотреть файл

@ -0,0 +1,812 @@
import random
from datetime import UTC, datetime, timedelta
from unittest.mock import patch
from uuid import uuid4
from django.conf import settings
from django.contrib.auth.models import User
from django.test import TestCase, override_settings
import pytest
from allauth.socialaccount.models import SocialAccount
from model_bakery import baker
from emails.models import AbuseMetrics, DomainAddress, RelayAddress
from ..exceptions import CannotMakeSubdomainException
from ..models import Profile
from .utils import (
make_free_test_user,
phone_subscription,
premium_subscription,
vpn_subscription,
)
if settings.PHONES_ENABLED:
from phones.models import RealPhone, RelayNumber
class ProfileTestCase(TestCase):
"""Base class for Profile tests."""
def setUp(self) -> None:
user = baker.make(User)
self.profile = user.profile
assert self.profile.server_storage is True
def get_or_create_social_account(self) -> SocialAccount:
"""Get the test user's social account, creating if needed."""
social_account, _ = SocialAccount.objects.get_or_create(
user=self.profile.user,
provider="fxa",
defaults={
"uid": str(uuid4()),
"extra_data": {"avatar": "image.png", "subscriptions": []},
},
)
return social_account
def upgrade_to_premium(self) -> None:
"""Add an unlimited emails subscription to the user."""
social_account = self.get_or_create_social_account()
social_account.extra_data["subscriptions"].append(premium_subscription())
social_account.save()
def upgrade_to_phone(self) -> None:
"""Add a phone plan to the user."""
social_account = self.get_or_create_social_account()
social_account.extra_data["subscriptions"].append(phone_subscription())
if not self.profile.has_premium:
social_account.extra_data["subscriptions"].append(premium_subscription())
social_account.save()
def upgrade_to_vpn_bundle(self) -> None:
"""Add a phone plan to the user."""
social_account = self.get_or_create_social_account()
social_account.extra_data["subscriptions"].append(vpn_subscription())
if not self.profile.has_premium:
social_account.extra_data["subscriptions"].append(premium_subscription())
if not self.profile.has_phone:
social_account.extra_data["subscriptions"].append(phone_subscription())
social_account.save()
class ProfileBounceTestCase(ProfileTestCase):
"""Base class for Profile tests that check for bounces."""
def set_hard_bounce(self) -> datetime:
"""
Set a hard bounce pause for the profile, return the bounce time.
This happens when the user's email server reports a hard bounce, such as
saying the email does not exist.
"""
self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS - 1
)
self.profile.save()
return self.profile.last_hard_bounce
def set_soft_bounce(self) -> datetime:
"""
Set a soft bounce for the profile, return the bounce time.
This happens when the user's email server reports a soft bounce, such as
saying the user's mailbox is full.
"""
self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS - 1
)
self.profile.save()
return self.profile.last_soft_bounce
class ProfileCheckBouncePause(ProfileBounceTestCase):
"""Tests for Profile.check_bounce_pause()"""
def test_no_bounces(self) -> None:
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is False
assert bounce_type == ""
def test_hard_bounce_pending(self) -> None:
self.set_hard_bounce()
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is True
assert bounce_type == "hard"
def test_soft_bounce_pending(self) -> None:
self.set_soft_bounce()
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is True
assert bounce_type == "soft"
def test_hard_and_soft_bounce_pending_shows_hard(self) -> None:
self.set_hard_bounce()
self.set_soft_bounce()
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is True
assert bounce_type == "hard"
def test_hard_bounce_over_resets_timer(self) -> None:
self.profile.last_hard_bounce = datetime.now(UTC) - timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS + 1
)
self.profile.save()
assert self.profile.last_hard_bounce is not None
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is False
assert bounce_type == ""
assert self.profile.last_hard_bounce is None
def test_soft_bounce_over_resets_timer(self) -> None:
self.profile.last_soft_bounce = datetime.now(UTC) - timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS + 1
)
self.profile.save()
assert self.profile.last_soft_bounce is not None
bounce_paused, bounce_type = self.profile.check_bounce_pause()
assert bounce_paused is False
assert bounce_type == ""
assert self.profile.last_soft_bounce is None
class ProfileNextEmailTryDateTest(ProfileBounceTestCase):
"""Tests for Profile.next_email_try"""
def test_no_bounces_returns_today(self) -> None:
assert self.profile.next_email_try.date() == datetime.now(UTC).date()
def test_hard_bounce_returns_proper_datemath(self) -> None:
last_hard_bounce = self.set_hard_bounce()
expected_next_try_date = last_hard_bounce + timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS
)
assert self.profile.next_email_try.date() == expected_next_try_date.date()
def test_soft_bounce_returns_proper_datemath(self) -> None:
last_soft_bounce = self.set_soft_bounce()
expected_next_try_date = last_soft_bounce + timedelta(
days=settings.SOFT_BOUNCE_ALLOWED_DAYS
)
assert self.profile.next_email_try.date() == expected_next_try_date.date()
def test_hard_and_soft_bounce_returns_hard_datemath(self) -> None:
last_soft_bounce = self.set_soft_bounce()
last_hard_bounce = self.set_hard_bounce()
assert last_soft_bounce != last_hard_bounce
expected_next_try_date = last_hard_bounce + timedelta(
days=settings.HARD_BOUNCE_ALLOWED_DAYS
)
assert self.profile.next_email_try.date() == expected_next_try_date.date()
class ProfileLastBounceDateTest(ProfileBounceTestCase):
"""Tests for Profile.last_bounce_date"""
def test_no_bounces_returns_None(self) -> None:
assert self.profile.last_bounce_date is None
def test_soft_bounce_returns_its_date(self) -> None:
self.set_soft_bounce()
assert self.profile.last_bounce_date == self.profile.last_soft_bounce
def test_hard_bounce_returns_its_date(self) -> None:
self.set_hard_bounce()
assert self.profile.last_bounce_date == self.profile.last_hard_bounce
def test_hard_and_soft_bounces_returns_hard_date(self) -> None:
self.set_soft_bounce()
self.set_hard_bounce()
assert self.profile.last_bounce_date == self.profile.last_hard_bounce
class ProfileHasPremiumTest(ProfileTestCase):
"""Tests for Profile.has_premium"""
def test_default_False(self) -> None:
assert self.profile.has_premium is False
def test_premium_subscription_returns_True(self) -> None:
self.upgrade_to_premium()
assert self.profile.has_premium is True
def test_phone_returns_True(self) -> None:
self.upgrade_to_phone()
assert self.profile.has_premium is True
def test_vpn_bundle_returns_True(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.has_premium is True
class ProfileHasPhoneTest(ProfileTestCase):
"""Tests for Profile.has_phone"""
def test_default_False(self) -> None:
assert self.profile.has_phone is False
def test_premium_subscription_returns_False(self) -> None:
self.upgrade_to_premium()
assert self.profile.has_phone is False
def test_phone_returns_True(self) -> None:
self.upgrade_to_phone()
assert self.profile.has_phone is True
def test_vpn_bundle_returns_True(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.has_phone is True
@pytest.mark.skipif(not settings.PHONES_ENABLED, reason="PHONES_ENABLED is False")
@override_settings(PHONES_NO_CLIENT_CALLS_IN_TEST=True)
class ProfileDatePhoneRegisteredTest(ProfileTestCase):
"""Tests for Profile.date_phone_registered"""
def test_default_None(self) -> None:
assert self.profile.date_phone_registered is None
def test_real_phone_no_relay_number_returns_verified_date(self) -> None:
self.upgrade_to_phone()
datetime_now = datetime.now(UTC)
RealPhone.objects.create(
user=self.profile.user,
number="+12223334444",
verified=True,
verified_date=datetime_now,
)
assert self.profile.date_phone_registered == datetime_now
def test_real_phone_and_relay_number_w_created_at_returns_created_at_date(
self,
) -> None:
self.upgrade_to_phone()
datetime_now = datetime.now(UTC)
phone_user = self.profile.user
RealPhone.objects.create(
user=phone_user,
number="+12223334444",
verified=True,
verified_date=datetime_now,
)
relay_number = RelayNumber.objects.create(user=phone_user)
assert self.profile.date_phone_registered == relay_number.created_at
def test_real_phone_and_relay_number_wo_created_at_returns_verified_date(
self,
) -> None:
self.upgrade_to_phone()
datetime_now = datetime.now(UTC)
phone_user = self.profile.user
real_phone = RealPhone.objects.create(
user=phone_user,
number="+12223334444",
verified=True,
verified_date=datetime_now,
)
relay_number = RelayNumber.objects.create(user=phone_user)
# since created_at is auto set, update to None
relay_number.created_at = None
relay_number.save()
assert self.profile.date_phone_registered == real_phone.verified_date
class ProfileTotalMasksTest(ProfileTestCase):
"""Tests for Profile.total_masks"""
def test_total_masks(self) -> None:
self.upgrade_to_premium()
self.profile.add_subdomain("totalmasks")
assert self.profile.total_masks == 0
num_relay_addresses = random.randint(0, 2)
for _ in list(range(num_relay_addresses)):
baker.make(RelayAddress, user=self.profile.user)
num_domain_addresses = random.randint(0, 2)
for i in list(range(num_domain_addresses)):
baker.make(DomainAddress, user=self.profile.user, address=f"mask{i}")
assert self.profile.total_masks == num_relay_addresses + num_domain_addresses
class ProfileAtMaskLimitTest(ProfileTestCase):
"""Tests for Profile.at_mask_limit"""
def test_premium_user_returns_False(self) -> None:
self.upgrade_to_premium()
assert self.profile.at_mask_limit is False
baker.make(
RelayAddress,
user=self.profile.user,
_quantity=settings.MAX_NUM_FREE_ALIASES,
)
assert self.profile.at_mask_limit is False
def test_free_user(self) -> None:
assert self.profile.at_mask_limit is False
baker.make(
RelayAddress,
user=self.profile.user,
_quantity=settings.MAX_NUM_FREE_ALIASES,
)
assert self.profile.at_mask_limit is True
class ProfileAddSubdomainTest(ProfileTestCase):
"""Tests for Profile.add_subdomain()"""
def test_new_unlimited_profile(self) -> None:
self.upgrade_to_premium()
assert self.profile.add_subdomain("newpremium") == "newpremium"
def test_lowercases_subdomain_value(self) -> None:
self.upgrade_to_premium()
assert self.profile.add_subdomain("mIxEdcAsE") == "mixedcase"
def test_non_premium_user_raises_exception(self) -> None:
expected_msg = "error-premium-set-subdomain"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("test")
def test_calling_again_raises_exception(self) -> None:
self.upgrade_to_premium()
subdomain = "test"
self.profile.subdomain = subdomain
self.profile.save()
expected_msg = "error-premium-cannot-change-subdomain"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain(subdomain)
def test_badword_subdomain_raises_exception(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-not-available"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("angry")
def test_blocked_word_subdomain_raises_exception(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-not-available"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("mozilla")
def test_empty_subdomain_raises(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-cannot-be-empty-or-null"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("")
def test_null_subdomain_raises(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-cannot-be-empty-or-null"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain(None)
def test_subdomain_with_space_at_end_raises(self) -> None:
self.upgrade_to_premium()
expected_msg = "error-subdomain-not-available"
with self.assertRaisesMessage(CannotMakeSubdomainException, expected_msg):
self.profile.add_subdomain("mydomain ")
class ProfileSaveTest(ProfileTestCase):
"""Tests for Profile.save()"""
def test_lowercases_subdomain_value(self) -> None:
self.upgrade_to_premium()
self.profile.subdomain = "mIxEdcAsE"
self.profile.save()
assert self.profile.subdomain == "mixedcase"
def test_lowercases_subdomain_value_with_update_fields(self) -> None:
"""With update_fields, the subdomain is still lowercased."""
self.upgrade_to_premium()
assert self.profile.subdomain is None
# Use QuerySet.update to avoid model .save()
Profile.objects.filter(id=self.profile.id).update(subdomain="mIxEdcAsE")
self.profile.refresh_from_db()
assert self.profile.subdomain == "mIxEdcAsE"
# Update a different field with update_fields to avoid a full model save
new_date_subscribed = datetime(2023, 3, 3, tzinfo=UTC)
self.profile.date_subscribed = new_date_subscribed
self.profile.save(update_fields={"date_subscribed"})
# Since .save() added to update_fields, subdomain is now lowercase
self.profile.refresh_from_db()
assert self.profile.date_subscribed == new_date_subscribed
assert self.profile.subdomain == "mixedcase"
TEST_DESCRIPTION = "test description"
TEST_USED_ON = TEST_GENERATED_FOR = "secret.com"
def add_relay_address(self) -> RelayAddress:
return baker.make(
RelayAddress,
user=self.profile.user,
description=self.TEST_DESCRIPTION,
generated_for=self.TEST_GENERATED_FOR,
used_on=self.TEST_USED_ON,
)
def add_domain_address(self) -> DomainAddress:
self.upgrade_to_premium()
self.profile.subdomain = "somesubdomain"
self.profile.save()
return baker.make(
DomainAddress,
user=self.profile.user,
address="localpart",
description=self.TEST_DESCRIPTION,
used_on=self.TEST_USED_ON,
)
def test_save_server_storage_true_doesnt_delete_data(self) -> None:
relay_address = self.add_relay_address()
self.profile.server_storage = True
self.profile.save()
relay_address.refresh_from_db()
assert relay_address.description == self.TEST_DESCRIPTION
assert relay_address.generated_for == self.TEST_GENERATED_FOR
assert relay_address.used_on == self.TEST_USED_ON
def test_save_server_storage_false_deletes_data(self) -> None:
relay_address = self.add_relay_address()
domain_address = self.add_domain_address()
self.profile.server_storage = False
self.profile.save()
relay_address.refresh_from_db()
domain_address.refresh_from_db()
assert relay_address.description == ""
assert relay_address.generated_for == ""
assert relay_address.used_on == ""
assert domain_address.description == ""
assert domain_address.used_on == ""
def add_four_relay_addresses(self, user: User | None = None) -> list[RelayAddress]:
if user is None:
user = self.profile.user
return baker.make(
RelayAddress,
user=user,
description=self.TEST_DESCRIPTION,
generated_for=self.TEST_GENERATED_FOR,
used_on=self.TEST_USED_ON,
_quantity=4,
)
def test_save_server_storage_false_deletes_ALL_data(self) -> None:
self.add_four_relay_addresses()
self.profile.server_storage = False
self.profile.save()
for relay_address in RelayAddress.objects.filter(user=self.profile.user):
assert relay_address.description == ""
assert relay_address.generated_for == ""
def test_save_server_storage_false_only_deletes_that_profiles_data(self) -> None:
other_user = make_free_test_user()
assert other_user.profile.server_storage is True
self.add_four_relay_addresses()
self.add_four_relay_addresses(user=other_user)
self.profile.server_storage = False
self.profile.save()
for relay_address in RelayAddress.objects.filter(user=self.profile.user):
assert relay_address.description == ""
assert relay_address.generated_for == ""
assert relay_address.used_on == ""
for relay_address in RelayAddress.objects.filter(user=other_user):
assert relay_address.description == self.TEST_DESCRIPTION
assert relay_address.generated_for == self.TEST_GENERATED_FOR
assert relay_address.used_on == self.TEST_USED_ON
class ProfileDisplayNameTest(ProfileTestCase):
"""Tests for Profile.display_name"""
def test_exists(self) -> None:
display_name = "Display Name"
social_account = self.get_or_create_social_account()
social_account.extra_data["displayName"] = display_name
social_account.save()
assert self.profile.display_name == display_name
def test_display_name_does_not_exist(self) -> None:
self.get_or_create_social_account()
assert self.profile.display_name is None
class ProfileLanguageTest(ProfileTestCase):
"""Test Profile.language"""
def test_no_fxa_extra_data_locale_returns_default_en(self) -> None:
social_account = self.get_or_create_social_account()
assert "locale" not in social_account.extra_data
assert self.profile.language == "en"
def test_no_fxa_locale_returns_default_en(self) -> None:
assert self.profile.language == "en"
def test_fxa_locale_de_returns_de(self) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["locale"] = "de,en-US;q=0.9,en;q=0.8"
social_account.save()
assert self.profile.language == "de"
class ProfileFxaLocaleInPremiumCountryTest(ProfileTestCase):
"""Tests for Profile.fxa_locale_in_premium_country"""
def set_fxa_locale(self, locale: str) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["locale"] = locale
social_account.save()
def test_when_premium_available_returns_True(self) -> None:
self.set_fxa_locale("de-DE,en-xx;q=0.9,en;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
def test_en_implies_premium_available(self) -> None:
self.set_fxa_locale("en;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
def test_when_premium_unavailable_returns_False(self) -> None:
self.set_fxa_locale("en-IN, en;q=0.8")
assert self.profile.fxa_locale_in_premium_country is False
def test_when_premium_available_by_language_code_returns_True(self) -> None:
self.set_fxa_locale("de;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
def test_invalid_language_code_returns_False(self) -> None:
self.set_fxa_locale("xx;q=0.8")
assert self.profile.fxa_locale_in_premium_country is False
def test_when_premium_unavailable_by_language_code_returns_False(self) -> None:
self.set_fxa_locale("zh;q=0.8")
assert self.profile.fxa_locale_in_premium_country is False
def test_no_fxa_account_returns_False(self) -> None:
assert self.profile.fxa_locale_in_premium_country is False
def test_in_estonia(self):
"""Estonia (EE) was added in August 2023."""
self.set_fxa_locale("et-ee,et;q=0.8")
assert self.profile.fxa_locale_in_premium_country is True
class ProfileJoinedBeforePremiumReleaseTest(ProfileTestCase):
"""Tests for Profile.joined_before_premium_release"""
def test_returns_True(self) -> None:
before = "2021-10-18 17:00:00+00:00"
self.profile.user.date_joined = datetime.fromisoformat(before)
assert self.profile.joined_before_premium_release
def test_returns_False(self) -> None:
after = "2021-10-28 17:00:00+00:00"
self.profile.user.date_joined = datetime.fromisoformat(after)
assert self.profile.joined_before_premium_release is False
class ProfileDefaultsTest(ProfileTestCase):
"""Tests for default Profile values"""
def test_user_created_after_premium_release_server_storage_True(self) -> None:
assert self.profile.server_storage
def test_emails_replied_new_user_aggregates_sum_of_replies_to_zero(self) -> None:
assert self.profile.emails_replied == 0
class ProfileEmailsRepliedTest(ProfileTestCase):
"""Tests for Profile.emails_replied"""
def test_premium_user_aggregates_replies_from_all_addresses(self) -> None:
self.upgrade_to_premium()
self.profile.subdomain = "test"
self.profile.num_email_replied_in_deleted_address = 1
self.profile.save()
baker.make(RelayAddress, user=self.profile.user, num_replied=3)
baker.make(
DomainAddress, user=self.profile.user, address="lower-case", num_replied=5
)
assert self.profile.emails_replied == 9
def test_free_user_aggregates_replies_from_relay_addresses(self) -> None:
baker.make(RelayAddress, user=self.profile.user, num_replied=3)
baker.make(RelayAddress, user=self.profile.user, num_replied=5)
assert self.profile.emails_replied == 8
class ProfileUpdateAbuseMetricTest(ProfileTestCase):
"""Tests for Profile.update_abuse_metric()"""
def setUp(self) -> None:
super().setUp()
self.get_or_create_social_account()
self.abuse_metric = baker.make(AbuseMetrics, user=self.profile.user)
patcher_logger = patch("privaterelay.models.abuse_logger.info")
self.mocked_abuse_info = patcher_logger.start()
self.addCleanup(patcher_logger.stop)
# Selectively patch datatime.now() for emails models
# https://docs.python.org/3/library/unittest.mock-examples.html#partial-mocking
patcher = patch("privaterelay.models.datetime")
mocked_datetime = patcher.start()
self.addCleanup(patcher.stop)
self.expected_now = datetime.now(UTC)
mocked_datetime.combine.return_value = datetime.combine(
datetime.now(UTC).date(), datetime.min.time()
)
mocked_datetime.now.return_value = self.expected_now
mocked_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
@override_settings(MAX_FORWARDED_PER_DAY=5)
def test_flags_profile_when_emails_forwarded_abuse_threshold_met(self) -> None:
self.abuse_metric.num_email_forwarded_per_day = 4
self.abuse_metric.save()
assert self.profile.last_account_flagged is None
self.profile.update_abuse_metric(email_forwarded=True)
self.abuse_metric.refresh_from_db()
assert self.profile.fxa
self.mocked_abuse_info.assert_called_once_with(
"Abuse flagged",
extra={
"uid": self.profile.fxa.uid,
"flagged": self.expected_now.timestamp(),
"replies": 0,
"addresses": 0,
"forwarded": 5,
"forwarded_size_in_bytes": 0,
},
)
assert self.abuse_metric.num_email_forwarded_per_day == 5
assert self.profile.last_account_flagged == self.expected_now
@override_settings(MAX_FORWARDED_EMAIL_SIZE_PER_DAY=100)
def test_flags_profile_when_forwarded_email_size_abuse_threshold_met(self) -> None:
self.abuse_metric.forwarded_email_size_per_day = 50
self.abuse_metric.save()
assert self.profile.last_account_flagged is None
self.profile.update_abuse_metric(forwarded_email_size=50)
self.abuse_metric.refresh_from_db()
assert self.profile.fxa
self.mocked_abuse_info.assert_called_once_with(
"Abuse flagged",
extra={
"uid": self.profile.fxa.uid,
"flagged": self.expected_now.timestamp(),
"replies": 0,
"addresses": 0,
"forwarded": 0,
"forwarded_size_in_bytes": 100,
},
)
assert self.abuse_metric.forwarded_email_size_per_day == 100
assert self.profile.last_account_flagged == self.expected_now
class ProfileMetricsEnabledTest(ProfileTestCase):
def test_no_fxa_means_metrics_enabled(self) -> None:
assert not self.profile.fxa
assert self.profile.metrics_enabled
def test_fxa_legacy_means_metrics_enabled(self) -> None:
self.get_or_create_social_account()
assert self.profile.fxa
assert "metricsEnabled" not in self.profile.fxa.extra_data
assert self.profile.metrics_enabled
def test_fxa_opt_in_means_metrics_enabled(self) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["metricsEnabled"] = True
social_account.save()
assert self.profile.fxa
assert self.profile.metrics_enabled
def test_fxa_opt_out_means_metrics_disabled(self) -> None:
social_account = self.get_or_create_social_account()
social_account.extra_data["metricsEnabled"] = False
social_account.save()
assert self.profile.fxa
assert not self.profile.metrics_enabled
class ProfilePlanTest(ProfileTestCase):
def test_free_user(self) -> None:
assert self.profile.plan == "free"
def test_premium_user(self) -> None:
self.upgrade_to_premium()
assert self.profile.plan == "email"
def test_phone_user(self) -> None:
self.upgrade_to_phone()
assert self.profile.plan == "phone"
def test_vpn_bundle_user(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.plan == "bundle"
class ProfilePlanTermTest(ProfileTestCase):
def test_free_user(self) -> None:
assert self.profile.plan_term is None
def test_premium_user(self) -> None:
self.upgrade_to_premium()
assert self.profile.plan_term == "unknown"
def test_phone_user(self) -> None:
self.upgrade_to_phone()
assert self.profile.plan_term == "unknown"
def test_phone_user_1_month(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
assert self.profile.plan_term == "1_month"
def test_phone_user_1_year(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
assert self.profile.plan_term == "1_year"
def test_vpn_bundle_user(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.plan_term == "unknown"
class ProfileMetricsPremiumStatus(ProfileTestCase):
def test_free_user(self):
assert self.profile.metrics_premium_status == "free"
def test_premium_user(self) -> None:
self.upgrade_to_premium()
assert self.profile.metrics_premium_status == "email_unknown"
def test_phone_user(self) -> None:
self.upgrade_to_phone()
assert self.profile.metrics_premium_status == "phone_unknown"
def test_phone_user_1_month(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2024, 2, 1, tzinfo=UTC)
assert self.profile.metrics_premium_status == "phone_1_month"
def test_phone_user_1_year(self) -> None:
self.upgrade_to_phone()
self.profile.date_phone_subscription_start = datetime(2024, 1, 1, tzinfo=UTC)
self.profile.date_phone_subscription_end = datetime(2025, 1, 1, tzinfo=UTC)
assert self.profile.metrics_premium_status == "phone_1_year"
def test_vpn_bundle_user(self) -> None:
self.upgrade_to_vpn_bundle()
assert self.profile.metrics_premium_status == "bundle_unknown"

Просмотреть файл

@ -1,10 +1,12 @@
from collections.abc import Iterator
from hashlib import sha256
from unittest.mock import Mock, patch
from django.contrib.auth.models import User
from django.contrib.sessions.middleware import SessionMiddleware
from django.http.request import HttpRequest
from django.http.response import HttpResponse
from django.test import TestCase
from django.test.client import RequestFactory
import pytest
@ -12,6 +14,9 @@ from model_bakery import baker
from privaterelay.signals import record_user_signed_up
from ..models import Profile
from .utils import make_free_test_user
@pytest.fixture()
def mock_ses_client() -> Iterator[Mock]:
@ -36,3 +41,70 @@ def test_record_user_signed_up_telemetry() -> None:
assert sign_up_request.session["user_created"] is True
assert sign_up_request.session.modified is True
class MeasureFeatureUsageSignalTest(TestCase):
"""Test measure_feature_usage signal handler"""
def setUp(self) -> None:
user = make_free_test_user()
self.profile = user.profile
patcher_incr = patch("privaterelay.signals.incr_if_enabled")
self.mocked_incr = patcher_incr.start()
self.addCleanup(patcher_incr.stop)
patcher_logger = patch("privaterelay.signals.info_logger.info")
self.mocked_events_info = patcher_logger.start()
self.addCleanup(patcher_logger.stop)
def test_remove_level_one_email_trackers_enabled(self) -> None:
self.profile.remove_level_one_email_trackers = True
self.profile.save()
assert self.profile.fxa
expected_hashed_uid = sha256(self.profile.fxa.uid.encode("utf-8")).hexdigest()
self.mocked_incr.assert_called_once_with("tracker_removal_enabled")
self.mocked_events_info.assert_called_once_with(
"tracker_removal_feature",
extra={
"enabled": True,
"hashed_uid": expected_hashed_uid,
},
)
def test_remove_level_one_email_trackers_disabled(self) -> None:
Profile.objects.filter(id=self.profile.id).update(
remove_level_one_email_trackers=True
)
self.profile.refresh_from_db()
self.profile.remove_level_one_email_trackers = False
self.profile.save()
assert self.profile.fxa
expected_hashed_uid = sha256(self.profile.fxa.uid.encode("utf-8")).hexdigest()
self.mocked_incr.assert_called_once_with("tracker_removal_disabled")
self.mocked_events_info.assert_called_once_with(
"tracker_removal_feature",
extra={
"enabled": False,
"hashed_uid": expected_hashed_uid,
},
)
def test_remove_level_one_email_trackers_unchanged(self) -> None:
self.profile.remove_level_one_email_trackers = False
self.profile.save()
self.mocked_incr.assert_not_called()
self.mocked_events_info.assert_not_called()
def test_unmonitored_field_change_does_not_emit_metric_and_logs(self) -> None:
self.profile.server_storage = False
self.profile.save()
self.mocked_incr.assert_not_called()
self.mocked_events_info.assert_not_called()
def test_profile_created_does_not_emit_metric_and_logs(self) -> None:
self.mocked_incr.assert_not_called()
self.mocked_events_info.assert_not_called()

Просмотреть файл

@ -0,0 +1,84 @@
from django.contrib.auth.models import User
from django.test import TestCase
from privaterelay.tests.utils import make_premium_test_user
from ..exceptions import CannotMakeSubdomainException
from ..models import Profile, RegisteredSubdomain, hash_subdomain
from ..validators import valid_available_subdomain
class ValidAvailableSubdomainTest(TestCase):
"""Tests for valid_available_subdomain()"""
ERR_NOT_AVAIL = "error-subdomain-not-available"
ERR_EMPTY_OR_NULL = "error-subdomain-cannot-be-empty-or-null"
def reserve_subdomain_for_new_user(self, subdomain: str) -> User:
user = make_premium_test_user()
user.profile.add_subdomain(subdomain)
return user
def test_bad_word_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("angry")
def test_blocked_word_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("mozilla")
def test_taken_subdomain_raises(self) -> None:
subdomain = "thisisfine"
self.reserve_subdomain_for_new_user(subdomain)
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain(subdomain)
def test_taken_subdomain_different_case_raises(self) -> None:
self.reserve_subdomain_for_new_user("thIsIsfInE")
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("THiSiSFiNe")
def test_inactive_subdomain_raises(self) -> None:
"""subdomains registered by now deleted profiles are not available."""
subdomain = "thisisfine"
user = self.reserve_subdomain_for_new_user(subdomain)
user.delete()
registered_subdomain_count = RegisteredSubdomain.objects.filter(
subdomain_hash=hash_subdomain(subdomain)
).count()
assert Profile.objects.filter(subdomain=subdomain).count() == 0
assert registered_subdomain_count == 1
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain(subdomain)
def test_subdomain_with_space_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("my domain")
def test_subdomain_with_special_char_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("my@domain")
def test_subdomain_with_dash_succeeds(self) -> None:
valid_available_subdomain("my-domain")
def test_subdomain_with_dash_at_front_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("-mydomain")
def test_empty_subdomain_raises(self) -> None:
with self.assertRaisesMessage(
CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL
):
valid_available_subdomain("")
def test_null_subdomain_raises(self) -> None:
with self.assertRaisesMessage(
CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL
):
valid_available_subdomain(None)
def test_subdomain_with_space_at_end_raises(self) -> None:
with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL):
valid_available_subdomain("mydomain ")

Просмотреть файл

@ -0,0 +1,32 @@
import re
from typing import Any
from emails.validators import has_bad_words, is_blocklisted
from .exceptions import CannotMakeSubdomainException
# A valid subdomain:
# can't start or end with a hyphen
# must be 1-63 alphanumeric characters and/or hyphens
_re_valid_subdomain = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$")
def valid_available_subdomain(subdomain: Any) -> None:
"""Raise CannotMakeSubdomainException if the subdomain fails a validation test."""
from privaterelay.models import RegisteredSubdomain
if not subdomain:
raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null")
subdomain = str(subdomain).lower()
# valid subdomains:
# have to meet the rules for length and characters
valid = _re_valid_subdomain.match(subdomain) is not None
# can't have "bad" words in them
bad_word = has_bad_words(subdomain)
# can't have "blocked" words in them
blocked_word = is_blocklisted(subdomain)
# can't be taken by someone else
taken = RegisteredSubdomain.is_taken(subdomain)
if not valid or bad_word or blocked_word or taken:
raise CannotMakeSubdomainException("error-subdomain-not-available")

Просмотреть файл

@ -25,14 +25,15 @@ from google_measurement_protocol import event, report
from oauthlib.oauth2.rfc6749.errors import CustomOAuth2Error
from rest_framework.decorators import api_view, schema
# from silk.profiling.profiler import silk_profile
from emails.exceptions import CannotMakeSubdomainException
from emails.models import DomainAddress, RelayAddress
from emails.utils import incr_if_enabled
from emails.validators import valid_available_subdomain
from .apps import PrivateRelayConfig
# from silk.profiling.profiler import silk_profile
from .exceptions import CannotMakeSubdomainException
from .fxa_utils import NoSocialToken, _get_oauth2_session
from .validators import valid_available_subdomain
FXA_PROFILE_CHANGE_EVENT = "https://schemas.accounts.firefox.com/event/profile-change"
FXA_SUBSCRIPTION_CHANGE_EVENT = (