2069 строки
75 KiB
Python
2069 строки
75 KiB
Python
import html
|
|
import json
|
|
import logging
|
|
import re
|
|
import shlex
|
|
from datetime import UTC, datetime
|
|
from email import message_from_bytes
|
|
from email.iterators import _structure
|
|
from email.message import EmailMessage
|
|
from email.utils import parseaddr
|
|
from io import StringIO
|
|
from json import JSONDecodeError
|
|
from textwrap import dedent
|
|
from typing import Any, Literal, NamedTuple, TypedDict, TypeVar
|
|
from urllib.parse import urlencode
|
|
from uuid import uuid4
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.db import transaction
|
|
from django.db.models import prefetch_related_objects
|
|
from django.http import HttpRequest, HttpResponse
|
|
from django.shortcuts import render
|
|
from django.template.loader import render_to_string
|
|
from django.utils.html import escape
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
|
|
from botocore.exceptions import ClientError
|
|
from codetiming import Timer
|
|
from decouple import strtobool
|
|
from markus.utils import generate_tag
|
|
from sentry_sdk import capture_message
|
|
from waffle import get_waffle_flag_model, sample_is_active
|
|
|
|
from privaterelay.ftl_bundles import main as ftl_bundle
|
|
from privaterelay.models import Profile
|
|
from privaterelay.utils import (
|
|
flag_is_active_in_task,
|
|
get_subplat_upgrade_link_by_language,
|
|
glean_logger,
|
|
)
|
|
|
|
from .exceptions import CannotMakeAddressException
|
|
from .models import (
|
|
DeletedAddress,
|
|
DomainAddress,
|
|
RelayAddress,
|
|
Reply,
|
|
address_hash,
|
|
get_domain_numerical,
|
|
)
|
|
from .policy import relay_policy
|
|
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
|
|
from .types import (
|
|
AWS_MailJSON,
|
|
AWS_SNSMessageJSON,
|
|
EmailForwardingIssues,
|
|
EmailHeaderIssues,
|
|
OutgoingHeaders,
|
|
)
|
|
from .utils import (
|
|
InvalidFromHeader,
|
|
_get_bucket_and_key_from_s3_json,
|
|
b64_lookup_key,
|
|
count_all_trackers,
|
|
decrypt_reply_metadata,
|
|
derive_reply_keys,
|
|
encode_dict_gza85,
|
|
encrypt_reply_metadata,
|
|
generate_from_header,
|
|
get_domains_from_settings,
|
|
get_message_content_from_s3,
|
|
get_message_id_bytes,
|
|
get_reply_to_address,
|
|
histogram_if_enabled,
|
|
incr_if_enabled,
|
|
parse_email_header,
|
|
remove_message_from_s3,
|
|
remove_trackers,
|
|
ses_send_raw_email,
|
|
urlize_and_linebreaks,
|
|
)
|
|
|
|
logger = logging.getLogger("events")
|
|
info_logger = logging.getLogger("eventsinfo")
|
|
|
|
|
|
class ReplyHeadersNotFound(Exception):
|
|
def __init__(self, message="No In-Reply-To or References headers."):
|
|
self.message = message
|
|
|
|
|
|
def first_time_user_test(request):
|
|
"""
|
|
Demonstrate rendering of the "First time Relay user" email.
|
|
Settings like language can be given in the querystring, otherwise settings
|
|
come from a random free profile.
|
|
"""
|
|
in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
|
|
email_context = {
|
|
"in_bundle_country": in_bundle_country,
|
|
"SITE_ORIGIN": settings.SITE_ORIGIN,
|
|
}
|
|
if request.GET.get("format", "html") == "text":
|
|
return render(
|
|
request,
|
|
"emails/first_time_user.txt",
|
|
email_context,
|
|
"text/plain; charset=utf-8",
|
|
)
|
|
return render(request, "emails/first_time_user.html", email_context)
|
|
|
|
|
|
def reply_requires_premium_test(request):
|
|
"""
|
|
Demonstrate rendering of the "Reply requires premium" email.
|
|
|
|
Settings like language can be given in the querystring, otherwise settings
|
|
come from a random free profile.
|
|
"""
|
|
email_context = {
|
|
"sender": "test@example.com",
|
|
"forwarded": True,
|
|
"SITE_ORIGIN": settings.SITE_ORIGIN,
|
|
}
|
|
for param in request.GET:
|
|
email_context[param] = request.GET.get(param)
|
|
if param == "forwarded" and request.GET[param] == "True":
|
|
email_context[param] = True
|
|
|
|
for param in request.GET:
|
|
if param == "content-type" and request.GET[param] == "text/plain":
|
|
return render(
|
|
request,
|
|
"emails/reply_requires_premium.txt",
|
|
email_context,
|
|
"text/plain; charset=utf-8",
|
|
)
|
|
return render(request, "emails/reply_requires_premium.html", email_context)
|
|
|
|
|
|
def disabled_mask_for_spam_test(request):
|
|
"""
|
|
Demonstrate rendering of the "Disabled mask for spam" email.
|
|
|
|
Settings like language can be given in the querystring, otherwise settings
|
|
come from a random free profile.
|
|
"""
|
|
mask = "abc123456@mozmail.com"
|
|
email_context = {
|
|
"mask": mask,
|
|
"SITE_ORIGIN": settings.SITE_ORIGIN,
|
|
}
|
|
for param in request.GET:
|
|
email_context[param] = request.GET.get(param)
|
|
|
|
for param in request.GET:
|
|
if param == "content-type" and request.GET[param] == "text/plain":
|
|
return render(
|
|
request,
|
|
"emails/disabled_mask_for_spam.txt",
|
|
email_context,
|
|
"text/plain; charset=utf-8",
|
|
)
|
|
return render(request, "emails/disabled_mask_for_spam.html", email_context)
|
|
|
|
|
|
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
|
|
# TO DO: Update with correct context when trigger is created
|
|
first_forwarded_email_html = render_to_string(
|
|
"emails/first_forwarded_email.html",
|
|
{
|
|
"SITE_ORIGIN": settings.SITE_ORIGIN,
|
|
},
|
|
)
|
|
|
|
wrapped_email = wrap_html_email(
|
|
first_forwarded_email_html,
|
|
"en-us",
|
|
True,
|
|
"test@example.com",
|
|
0,
|
|
)
|
|
|
|
return HttpResponse(wrapped_email)
|
|
|
|
|
|
def wrap_html_email(
|
|
original_html: str,
|
|
language: str,
|
|
has_premium: bool,
|
|
display_email: str,
|
|
num_level_one_email_trackers_removed: int | None = None,
|
|
tracker_report_link: str | None = None,
|
|
) -> str:
|
|
"""Add Relay banners, surveys, etc. to an HTML email"""
|
|
subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
|
|
email_context = {
|
|
"original_html": original_html,
|
|
"language": language,
|
|
"has_premium": has_premium,
|
|
"subplat_upgrade_link": subplat_upgrade_link,
|
|
"display_email": display_email,
|
|
"tracker_report_link": tracker_report_link,
|
|
"num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
|
|
"SITE_ORIGIN": settings.SITE_ORIGIN,
|
|
}
|
|
content = render_to_string("emails/wrapped_email.html", email_context)
|
|
# Remove empty lines
|
|
content_lines = [line for line in content.splitlines() if line.strip()]
|
|
return "\n".join(content_lines) + "\n"
|
|
|
|
|
|
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
|
|
"""
|
|
Demonstrate rendering of forwarded HTML emails.
|
|
|
|
Settings like language can be given in the querystring, otherwise settings
|
|
come from a randomly chosen profile.
|
|
"""
|
|
|
|
if all(key in request.GET for key in ("language", "has_premium")):
|
|
user_profile = None
|
|
else:
|
|
user_profile = Profile.objects.order_by("?").first()
|
|
|
|
if "language" in request.GET:
|
|
language = request.GET["language"]
|
|
else:
|
|
if user_profile is None:
|
|
raise ValueError("user_profile must not be None")
|
|
language = user_profile.language
|
|
|
|
if "has_premium" in request.GET:
|
|
has_premium = strtobool(request.GET["has_premium"])
|
|
else:
|
|
if user_profile is None:
|
|
raise ValueError("user_profile must not be None")
|
|
has_premium = user_profile.has_premium
|
|
|
|
if "num_level_one_email_trackers_removed" in request.GET:
|
|
num_level_one_email_trackers_removed = int(
|
|
request.GET["num_level_one_email_trackers_removed"]
|
|
)
|
|
else:
|
|
num_level_one_email_trackers_removed = 0
|
|
|
|
if "has_tracker_report_link" in request.GET:
|
|
has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
|
|
else:
|
|
has_tracker_report_link = False
|
|
if has_tracker_report_link:
|
|
if num_level_one_email_trackers_removed:
|
|
trackers = {
|
|
"fake-tracker.example.com": num_level_one_email_trackers_removed
|
|
}
|
|
else:
|
|
trackers = {}
|
|
tracker_report_link = (
|
|
"/tracker-report/#{"
|
|
'"sender": "sender@example.com", '
|
|
'"received_at": 1658434657, '
|
|
f'"trackers": { json.dumps(trackers) }'
|
|
"}"
|
|
)
|
|
else:
|
|
tracker_report_link = ""
|
|
|
|
path = "/emails/wrapped_email_test"
|
|
old_query = {
|
|
"language": language,
|
|
"has_premium": "Yes" if has_premium else "No",
|
|
"has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
|
|
"num_level_one_email_trackers_removed": str(
|
|
num_level_one_email_trackers_removed
|
|
),
|
|
}
|
|
|
|
def switch_link(key, value):
|
|
if old_query[key] == value:
|
|
return str(value)
|
|
new_query = old_query.copy()
|
|
new_query[key] = value
|
|
return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
|
|
|
|
html_content = dedent(
|
|
f"""\
|
|
<p>
|
|
<strong>Email rendering Test</strong>
|
|
</p>
|
|
<p>Settings: (<a href="{path}">clear all</a>)</p>
|
|
<ul>
|
|
<li>
|
|
<strong>language</strong>:
|
|
{escape(language)}
|
|
(switch to
|
|
{switch_link("language", "en-us")},
|
|
{switch_link("language", "de")},
|
|
{switch_link("language", "en-gb")},
|
|
{switch_link("language", "fr")},
|
|
{switch_link("language", "ru-ru")},
|
|
{switch_link("language", "es-es")},
|
|
{switch_link("language", "pt-br")},
|
|
{switch_link("language", "it-it")},
|
|
{switch_link("language", "en-ca")},
|
|
{switch_link("language", "de-de")},
|
|
{switch_link("language", "es-mx")})
|
|
</li>
|
|
<li>
|
|
<strong>has_premium</strong>:
|
|
{"Yes" if has_premium else "No"}
|
|
(switch to
|
|
{switch_link("has_premium", "Yes")},
|
|
{switch_link("has_premium", "No")})
|
|
</li>
|
|
<li>
|
|
<strong>has_tracker_report_link</strong>:
|
|
{"Yes" if has_tracker_report_link else "No"}
|
|
(switch to
|
|
{switch_link("has_tracker_report_link", "Yes")},
|
|
{switch_link("has_tracker_report_link", "No")})
|
|
</li>
|
|
<li>
|
|
<strong>num_level_one_email_trackers_removed</strong>:
|
|
{num_level_one_email_trackers_removed}
|
|
(switch to
|
|
{switch_link("num_level_one_email_trackers_removed", "0")},
|
|
{switch_link("num_level_one_email_trackers_removed", "1")},
|
|
{switch_link("num_level_one_email_trackers_removed", "2")})
|
|
</li>
|
|
</ul>
|
|
"""
|
|
)
|
|
|
|
wrapped_email = wrap_html_email(
|
|
original_html=html_content,
|
|
language=language,
|
|
has_premium=has_premium,
|
|
tracker_report_link=tracker_report_link,
|
|
display_email="test@relay.firefox.com",
|
|
num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
|
|
)
|
|
return HttpResponse(wrapped_email)
|
|
|
|
|
|
def _store_reply_record(
|
|
mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
|
|
) -> AWS_MailJSON:
|
|
# After relaying email, store a Reply record for it
|
|
reply_metadata = {}
|
|
for header in mail["headers"]:
|
|
if header["name"].lower() in ["message-id", "from", "reply-to"]:
|
|
reply_metadata[header["name"].lower()] = header["value"]
|
|
message_id_bytes = get_message_id_bytes(message_id)
|
|
(lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
|
|
lookup = b64_lookup_key(lookup_key)
|
|
encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
|
|
reply_create_args: dict[str, Any] = {
|
|
"lookup": lookup,
|
|
"encrypted_metadata": encrypted_metadata,
|
|
}
|
|
if isinstance(address, DomainAddress):
|
|
reply_create_args["domain_address"] = address
|
|
else:
|
|
if not isinstance(address, RelayAddress):
|
|
raise TypeError("address must be type RelayAddress")
|
|
reply_create_args["relay_address"] = address
|
|
Reply.objects.create(**reply_create_args)
|
|
return mail
|
|
|
|
|
|
@csrf_exempt
|
|
def sns_inbound(request):
|
|
incr_if_enabled("sns_inbound", 1)
|
|
# First thing we do is verify the signature
|
|
json_body = json.loads(request.body)
|
|
verified_json_body = verify_from_sns(json_body)
|
|
|
|
# Validate ARN and message type
|
|
topic_arn = verified_json_body.get("TopicArn", None)
|
|
message_type = verified_json_body.get("Type", None)
|
|
error_details = validate_sns_arn_and_type(topic_arn, message_type)
|
|
if error_details:
|
|
logger.error("validate_sns_arn_and_type_error", extra=error_details)
|
|
return HttpResponse(error_details["error"], status=400)
|
|
|
|
return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
|
|
|
|
|
|
def validate_sns_arn_and_type(
|
|
topic_arn: str | None, message_type: str | None
|
|
) -> dict[str, Any] | None:
|
|
"""
|
|
Validate Topic ARN and SNS Message Type.
|
|
|
|
If an error is detected, the return is a dictionary of error details.
|
|
If no error is detected, the return is None.
|
|
"""
|
|
if not topic_arn:
|
|
error = "Received SNS request without Topic ARN."
|
|
elif topic_arn not in settings.AWS_SNS_TOPIC:
|
|
error = "Received SNS message for wrong topic."
|
|
elif not message_type:
|
|
error = "Received SNS request without Message Type."
|
|
elif message_type not in SUPPORTED_SNS_TYPES:
|
|
error = "Received SNS message for unsupported Type."
|
|
else:
|
|
error = None
|
|
|
|
if error:
|
|
return {
|
|
"error": error,
|
|
"received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
|
|
"supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
|
|
"received_sns_type": (
|
|
shlex.quote(message_type) if message_type else message_type
|
|
),
|
|
"supported_sns_types": SUPPORTED_SNS_TYPES,
|
|
}
|
|
return None
|
|
|
|
|
|
def _sns_inbound_logic(topic_arn, message_type, json_body):
|
|
if message_type == "SubscriptionConfirmation":
|
|
info_logger.info(
|
|
"SNS SubscriptionConfirmation",
|
|
extra={"SubscribeURL": json_body["SubscribeURL"]},
|
|
)
|
|
return HttpResponse("Logged SubscribeURL", status=200)
|
|
if message_type == "Notification":
|
|
incr_if_enabled("sns_inbound_Notification", 1)
|
|
return _sns_notification(json_body)
|
|
|
|
logger.error(
|
|
"SNS message type did not fall under the SNS inbound logic",
|
|
extra={"message_type": shlex.quote(message_type)},
|
|
)
|
|
capture_message(
|
|
"Received SNS message with type not handled in inbound log",
|
|
level="error",
|
|
stack=True,
|
|
)
|
|
return HttpResponse(
|
|
"Received SNS message with type not handled in inbound log", status=400
|
|
)
|
|
|
|
|
|
def _sns_notification(json_body):
|
|
try:
|
|
message_json = json.loads(json_body["Message"])
|
|
except JSONDecodeError:
|
|
logger.error(
|
|
"SNS notification has non-JSON message body",
|
|
extra={"content": shlex.quote(json_body["Message"])},
|
|
)
|
|
return HttpResponse("Received SNS notification with non-JSON body", status=400)
|
|
|
|
event_type = message_json.get("eventType")
|
|
notification_type = message_json.get("notificationType")
|
|
if notification_type not in {
|
|
"Complaint",
|
|
"Received",
|
|
"Bounce",
|
|
} and event_type not in {"Complaint", "Bounce"}:
|
|
logger.error(
|
|
"SNS notification for unsupported type",
|
|
extra={
|
|
"notification_type": shlex.quote(notification_type),
|
|
"event_type": shlex.quote(event_type),
|
|
"keys": [shlex.quote(key) for key in message_json.keys()],
|
|
},
|
|
)
|
|
return HttpResponse(
|
|
(
|
|
"Received SNS notification for unsupported Type: "
|
|
f"{html.escape(shlex.quote(notification_type))}"
|
|
),
|
|
status=400,
|
|
)
|
|
response = _sns_message(message_json)
|
|
bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
|
|
if response.status_code < 500:
|
|
remove_message_from_s3(bucket, object_key)
|
|
|
|
return response
|
|
|
|
|
|
def _get_recipient_with_relay_domain(recipients):
|
|
domains_to_check = get_domains_from_settings().values()
|
|
for recipient in recipients:
|
|
for domain in domains_to_check:
|
|
if domain in recipient:
|
|
return recipient
|
|
return None
|
|
|
|
|
|
def _get_relay_recipient_from_message_json(message_json):
|
|
# Go thru all To, Cc, and Bcc fields and
|
|
# return the one that has a Relay domain
|
|
|
|
# First check common headers for to or cc match
|
|
headers_to_check = "to", "cc"
|
|
common_headers = message_json["mail"]["commonHeaders"]
|
|
for header in headers_to_check:
|
|
if header in common_headers:
|
|
recipient = _get_recipient_with_relay_domain(common_headers[header])
|
|
if recipient is not None:
|
|
return parseaddr(recipient)[1]
|
|
|
|
# SES-SNS sends bcc in a different part of the message
|
|
recipients = message_json["receipt"]["recipients"]
|
|
return _get_recipient_with_relay_domain(recipients)
|
|
|
|
|
|
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
|
|
incr_if_enabled("sns_inbound_Notification_Received", 1)
|
|
init_waffle_flags()
|
|
notification_type = message_json.get("notificationType")
|
|
event_type = message_json.get("eventType")
|
|
if notification_type == "Bounce" or event_type == "Bounce":
|
|
return _handle_bounce(message_json)
|
|
if notification_type == "Complaint" or event_type == "Complaint":
|
|
return _handle_complaint(message_json)
|
|
if notification_type != "Received":
|
|
raise ValueError('notification_type must be "Received"')
|
|
if event_type is not None:
|
|
raise ValueError("event_type must be None")
|
|
return _handle_received(message_json)
|
|
|
|
|
|
# Enumerate the reasons that an email was not forwarded.
|
|
# This excludes emails dropped due to mask forwarding settings,
|
|
# such as "block all" and "block promotional". Those are logged
|
|
# as Glean email_blocked events.
|
|
EmailDroppedReason = Literal[
|
|
"auto_block_spam", # Email identified as spam, user has the auto_block_spam flag
|
|
"dmarc_reject_failed", # Email failed DMARC check with a reject policy
|
|
"hard_bounce_pause", # The user recently had a hard bounce
|
|
"soft_bounce_pause", # The user recently has a soft bounce
|
|
"abuse_flag", # The user exceeded an abuse limit, like mails forwarded
|
|
"user_deactivated", # The user account is deactivated
|
|
"reply_requires_premium", # The email is a reply from a free user
|
|
"content_missing", # Could not load the email from storage
|
|
"error_from_header", # Error generating the From: header, retryable
|
|
"error_storage", # Error fetching the email contents from storage (S3), retryable
|
|
"error_sending", # Error sending the forwarded email (SES), retryable
|
|
]
|
|
|
|
|
|
def log_email_dropped(
|
|
reason: EmailDroppedReason,
|
|
mask: RelayAddress | DomainAddress,
|
|
is_reply: bool = False,
|
|
can_retry: bool = False,
|
|
) -> None:
|
|
"""
|
|
Log that an email was dropped for a reason other than a mask blocking setting.
|
|
|
|
This mirrors the interface of glean_logger().log_email_blocked(), which
|
|
records emails dropped due to the mask's blocking setting.
|
|
"""
|
|
extra: dict[str, str | int | bool] = {"reason": reason}
|
|
if mask.user.profile.metrics_enabled:
|
|
if mask.user.profile.fxa is not None:
|
|
extra["fxa_id"] = mask.user.profile.fxa.uid
|
|
extra["mask_id"] = mask.metrics_id
|
|
extra |= {
|
|
"is_random_mask": isinstance(mask, RelayAddress),
|
|
"is_reply": is_reply,
|
|
"can_retry": can_retry,
|
|
}
|
|
info_logger.info("email_dropped", extra=extra)
|
|
|
|
|
|
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
|
|
"""
|
|
Handle an AWS SES received notification.
|
|
|
|
For more information, see:
|
|
https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
|
|
https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
|
|
|
|
Returns (may be incomplete):
|
|
* 200 if the email was sent, the Relay address is disabled, the Relay user is
|
|
flagged for abuse, the email is under a bounce pause, the email was suppressed
|
|
for spam, the list email was blocked, or the noreply address was the recipient.
|
|
* 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
|
|
the email failed DMARC with reject policy, or the email is a reply chain to a
|
|
non-premium user.
|
|
* 404 if an S3-stored email was not found, no Relay address was found in the "To",
|
|
"CC", or "BCC" fields, or the Relay address is not in the database.
|
|
* 503 if the "From" address is malformed, the S3 client returned an error different
|
|
from "not found", or the SES client fails
|
|
|
|
And many other returns conditions if the email is a reply. The HTTP returns are an
|
|
artifact from an earlier time when emails were sent to a webhook. Currently,
|
|
production instead pulls events from a queue.
|
|
|
|
TODO: Return a more appropriate status object
|
|
TODO: Document the metrics emitted
|
|
"""
|
|
mail = message_json["mail"]
|
|
if "commonHeaders" not in mail:
|
|
logger.error("SNS message without commonHeaders")
|
|
return HttpResponse(
|
|
"Received SNS notification without commonHeaders.", status=400
|
|
)
|
|
common_headers = mail["commonHeaders"]
|
|
receipt = message_json["receipt"]
|
|
|
|
_record_receipt_verdicts(receipt, "all")
|
|
to_address = _get_relay_recipient_from_message_json(message_json)
|
|
if to_address is None:
|
|
incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
|
|
return HttpResponse("Address does not exist", status=404)
|
|
|
|
_record_receipt_verdicts(receipt, "relay_recipient")
|
|
from_addresses = parse_email_header(common_headers["from"][0])
|
|
if not from_addresses:
|
|
info_logger.error(
|
|
"_handle_received: no from address",
|
|
extra={
|
|
"source": mail["source"],
|
|
"common_headers_from": common_headers["from"],
|
|
},
|
|
)
|
|
return HttpResponse("Unable to parse From address", status=400)
|
|
from_address = from_addresses[0][1]
|
|
|
|
try:
|
|
[to_local_portion, to_domain_portion] = to_address.split("@")
|
|
except ValueError:
|
|
# TODO: Add metric
|
|
return HttpResponse("Malformed to field.", status=400)
|
|
|
|
if to_local_portion.lower() == "noreply":
|
|
incr_if_enabled("email_for_noreply_address", 1)
|
|
return HttpResponse("noreply address is not supported.")
|
|
try:
|
|
# FIXME: this ambiguous return of either
|
|
# RelayAddress or DomainAddress types makes the Rustacean in me throw
|
|
# up a bit.
|
|
address = _get_address(to_address)
|
|
prefetch_related_objects([address.user], "socialaccount_set", "profile")
|
|
user_profile = address.user.profile
|
|
except (
|
|
ObjectDoesNotExist,
|
|
CannotMakeAddressException,
|
|
DeletedAddress.MultipleObjectsReturned,
|
|
):
|
|
if to_local_portion.lower() == "replies":
|
|
response = _handle_reply(from_address, message_json, to_address)
|
|
else:
|
|
response = HttpResponse("Address does not exist", status=404)
|
|
return response
|
|
|
|
_record_receipt_verdicts(receipt, "valid_user")
|
|
# if this is spam and the user is set to auto-block spam, early return
|
|
if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
|
|
incr_if_enabled("email_auto_suppressed_for_spam", 1)
|
|
log_email_dropped(reason="auto_block_spam", mask=address)
|
|
return HttpResponse("Address rejects spam.")
|
|
|
|
if _get_verdict(receipt, "dmarc") == "FAIL":
|
|
policy = receipt.get("dmarcPolicy", "none")
|
|
# TODO: determine action on dmarcPolicy "quarantine"
|
|
if policy == "reject":
|
|
log_email_dropped(reason="dmarc_reject_failed", mask=address)
|
|
incr_if_enabled(
|
|
"email_suppressed_for_dmarc_failure",
|
|
tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
|
|
)
|
|
return HttpResponse("DMARC failure, policy is reject", status=400)
|
|
|
|
# if this user is over bounce limits, early return
|
|
bounce_paused, bounce_type = user_profile.check_bounce_pause()
|
|
if bounce_paused:
|
|
_record_receipt_verdicts(receipt, "user_bounce_paused")
|
|
incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
|
|
reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
|
|
"soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
|
|
)
|
|
log_email_dropped(reason=reason, mask=address)
|
|
return HttpResponse("Address is temporarily disabled.")
|
|
|
|
# check if this is a reply from an external sender to a Relay user
|
|
try:
|
|
(lookup_key, _) = _get_keys_from_headers(mail["headers"])
|
|
reply_record = _get_reply_record_from_lookup_key(lookup_key)
|
|
user_address = address
|
|
address = reply_record.address
|
|
message_id = _get_message_id_from_headers(mail["headers"])
|
|
# make sure the relay user is premium
|
|
if not _reply_allowed(from_address, to_address, reply_record, message_id):
|
|
log_email_dropped(reason="reply_requires_premium", mask=user_address)
|
|
return HttpResponse("Relay replies require a premium account", status=403)
|
|
except (ReplyHeadersNotFound, Reply.DoesNotExist):
|
|
# if there's no In-Reply-To header, or the In-Reply-To value doesn't
|
|
# match a Reply record, continue to treat this as a regular email from
|
|
# an external sender to a relay user
|
|
pass
|
|
|
|
# if account flagged for abuse, early return
|
|
if user_profile.is_flagged:
|
|
log_email_dropped(reason="abuse_flag", mask=address)
|
|
return HttpResponse("Address is temporarily disabled.")
|
|
|
|
if not user_profile.user.is_active:
|
|
log_email_dropped(reason="user_deactivated", mask=address)
|
|
return HttpResponse("Account is deactivated.")
|
|
|
|
# if address is set to block, early return
|
|
if not address.enabled:
|
|
incr_if_enabled("email_for_disabled_address", 1)
|
|
address.num_blocked += 1
|
|
address.save(update_fields=["num_blocked"])
|
|
_record_receipt_verdicts(receipt, "disabled_alias")
|
|
user_profile.last_engagement = datetime.now(UTC)
|
|
user_profile.save()
|
|
glean_logger().log_email_blocked(mask=address, reason="block_all")
|
|
return HttpResponse("Address is temporarily disabled.")
|
|
|
|
_record_receipt_verdicts(receipt, "active_alias")
|
|
incr_if_enabled("email_for_active_address", 1)
|
|
|
|
# if address is blocking list emails, and email is from list, early return
|
|
if (
|
|
address
|
|
and address.block_list_emails
|
|
and user_profile.has_premium
|
|
and _check_email_from_list(mail["headers"])
|
|
):
|
|
incr_if_enabled("list_email_for_address_blocking_lists", 1)
|
|
address.num_blocked += 1
|
|
address.save(update_fields=["num_blocked"])
|
|
user_profile.last_engagement = datetime.now(UTC)
|
|
user_profile.save()
|
|
glean_logger().log_email_blocked(mask=address, reason="block_promotional")
|
|
return HttpResponse("Address is not accepting list emails.")
|
|
|
|
# Collect new headers
|
|
subject = common_headers.get("subject", "")
|
|
destination_address = user_profile.user.email
|
|
reply_address = get_reply_to_address()
|
|
try:
|
|
from_header = generate_from_header(from_address, to_address)
|
|
except InvalidFromHeader:
|
|
# TODO: MPP-3407, MPP-3417 - Determine how to handle these
|
|
header_from = []
|
|
for header in mail["headers"]:
|
|
if header["name"].lower() == "from":
|
|
header_from.append(header)
|
|
info_logger.error(
|
|
"generate_from_header",
|
|
extra={
|
|
"from_address": from_address,
|
|
"source": mail["source"],
|
|
"common_headers_from": common_headers["from"],
|
|
"headers_from": header_from,
|
|
},
|
|
)
|
|
log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
|
|
return HttpResponse("Cannot parse the From address", status=503)
|
|
|
|
# Get incoming email
|
|
try:
|
|
(incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
|
|
except ClientError as e:
|
|
if e.response["Error"].get("Code", "") == "NoSuchKey":
|
|
logger.error("s3_object_does_not_exist", extra=e.response["Error"])
|
|
log_email_dropped(reason="content_missing", mask=address)
|
|
return HttpResponse("Email not in S3", status=404)
|
|
logger.error("s3_client_error_get_email", extra=e.response["Error"])
|
|
log_email_dropped(reason="error_storage", mask=address, can_retry=True)
|
|
# we are returning a 503 so that SNS can retry the email processing
|
|
return HttpResponse("Cannot fetch the message content from S3", status=503)
|
|
|
|
# Handle developer overrides, logging
|
|
dev_action = _get_developer_mode_action(address)
|
|
if dev_action:
|
|
if dev_action.new_destination_address:
|
|
destination_address = dev_action.new_destination_address
|
|
_log_dev_notification(
|
|
"_handle_received: developer_mode", dev_action, message_json
|
|
)
|
|
|
|
# Convert to new email
|
|
headers: OutgoingHeaders = {
|
|
"Subject": subject,
|
|
"From": from_header,
|
|
"To": destination_address,
|
|
"Reply-To": reply_address,
|
|
"Resent-From": from_address,
|
|
}
|
|
sample_trackers = bool(sample_is_active("tracker_sample"))
|
|
tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
|
|
remove_level_one_trackers = bool(
|
|
tracker_removal_flag and user_profile.remove_level_one_email_trackers
|
|
)
|
|
(
|
|
forwarded_email,
|
|
issues,
|
|
level_one_trackers_removed,
|
|
has_html,
|
|
has_text,
|
|
) = _convert_to_forwarded_email(
|
|
incoming_email_bytes=incoming_email_bytes,
|
|
headers=headers,
|
|
to_address=to_address,
|
|
from_address=from_address,
|
|
language=user_profile.language,
|
|
has_premium=user_profile.has_premium,
|
|
sample_trackers=sample_trackers,
|
|
remove_level_one_trackers=remove_level_one_trackers,
|
|
)
|
|
if has_html:
|
|
incr_if_enabled("email_with_html_content", 1)
|
|
if has_text:
|
|
incr_if_enabled("email_with_text_content", 1)
|
|
if issues:
|
|
info_logger.info(
|
|
"_handle_received: forwarding issues", extra={"issues": issues}
|
|
)
|
|
|
|
# Send new email
|
|
try:
|
|
ses_response = ses_send_raw_email(
|
|
source_address=reply_address,
|
|
destination_address=destination_address,
|
|
message=forwarded_email,
|
|
)
|
|
except ClientError:
|
|
# 503 service unavailable response to SNS so it can retry
|
|
log_email_dropped(reason="error_sending", mask=address, can_retry=True)
|
|
return HttpResponse("SES client error on Raw Email", status=503)
|
|
|
|
message_id = ses_response["MessageId"]
|
|
_store_reply_record(mail, message_id, address)
|
|
|
|
user_profile.update_abuse_metric(
|
|
email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
|
|
)
|
|
user_profile.last_engagement = datetime.now(UTC)
|
|
user_profile.save()
|
|
address.num_forwarded += 1
|
|
address.last_used_at = datetime.now(UTC)
|
|
if level_one_trackers_removed:
|
|
address.num_level_one_trackers_blocked = (
|
|
address.num_level_one_trackers_blocked or 0
|
|
) + level_one_trackers_removed
|
|
address.save(
|
|
update_fields=[
|
|
"num_forwarded",
|
|
"last_used_at",
|
|
"block_list_emails",
|
|
"num_level_one_trackers_blocked",
|
|
]
|
|
)
|
|
glean_logger().log_email_forwarded(mask=address, is_reply=False)
|
|
return HttpResponse("Sent email to final recipient.", status=200)
|
|
|
|
|
|
class DeveloperModeAction(NamedTuple):
|
|
mask_id: str
|
|
action: Literal["log", "simulate_complaint"] = "log"
|
|
new_destination_address: str | None = None
|
|
|
|
|
|
def _get_verdict(receipt, verdict_type):
|
|
return receipt[f"{verdict_type}Verdict"]["status"]
|
|
|
|
|
|
def _check_email_from_list(headers):
|
|
for header in headers:
|
|
if header["name"].lower().startswith("list-"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _record_receipt_verdicts(receipt, state):
|
|
verdict_tags = []
|
|
for key in sorted(receipt.keys()):
|
|
if key.endswith("Verdict"):
|
|
value = receipt[key]["status"]
|
|
verdict_tags.append(f"{key}:{value}")
|
|
incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
|
|
elif key == "dmarcPolicy":
|
|
value = receipt[key]
|
|
verdict_tags.append(f"{key}:{value}")
|
|
incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
|
|
|
|
|
|
def _get_message_id_from_headers(headers):
|
|
message_id = None
|
|
for header in headers:
|
|
if header["name"].lower() == "message-id":
|
|
message_id = header["value"]
|
|
return message_id
|
|
|
|
|
|
def _get_keys_from_headers(headers):
|
|
in_reply_to = None
|
|
for header in headers:
|
|
if header["name"].lower() == "in-reply-to":
|
|
in_reply_to = header["value"]
|
|
message_id_bytes = get_message_id_bytes(in_reply_to)
|
|
return derive_reply_keys(message_id_bytes)
|
|
|
|
if header["name"].lower() == "references":
|
|
message_ids = header["value"]
|
|
for message_id in message_ids.split(" "):
|
|
message_id_bytes = get_message_id_bytes(message_id)
|
|
lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
|
|
try:
|
|
# FIXME: calling code is likely to duplicate this query
|
|
_get_reply_record_from_lookup_key(lookup_key)
|
|
return lookup_key, encryption_key
|
|
except Reply.DoesNotExist:
|
|
pass
|
|
raise Reply.DoesNotExist
|
|
incr_if_enabled("mail_to_replies_without_reply_headers", 1)
|
|
raise ReplyHeadersNotFound
|
|
|
|
|
|
def _get_reply_record_from_lookup_key(lookup_key):
|
|
lookup = b64_lookup_key(lookup_key)
|
|
return Reply.objects.get(lookup=lookup)
|
|
|
|
|
|
def _strip_localpart_tag(address):
|
|
[localpart, domain] = address.split("@")
|
|
subaddress_parts = localpart.split("+")
|
|
return f"{subaddress_parts[0]}@{domain}"
|
|
|
|
|
|
_TransportType = Literal["sns", "s3"]
|
|
|
|
|
|
def _get_email_bytes(
|
|
message_json: AWS_SNSMessageJSON,
|
|
) -> tuple[bytes, _TransportType, float]:
|
|
with Timer(logger=None) as load_timer:
|
|
if "content" in message_json:
|
|
# email content in sns message
|
|
message_content = message_json["content"].encode("utf-8")
|
|
transport: Literal["sns", "s3"] = "sns"
|
|
else:
|
|
# assume email content in S3
|
|
bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
|
|
message_content = get_message_content_from_s3(bucket, object_key)
|
|
transport = "s3"
|
|
histogram_if_enabled("relayed_email.size", len(message_content))
|
|
load_time_s = round(load_timer.last, 3)
|
|
return (message_content, transport, load_time_s)
|
|
|
|
|
|
def _get_developer_mode_action(
|
|
mask: RelayAddress | DomainAddress,
|
|
) -> DeveloperModeAction | None:
|
|
"""Get the developer mode actions for this mask, if enabled."""
|
|
|
|
if not (
|
|
flag_is_active_in_task("developer_mode", mask.user)
|
|
and "DEV:" in mask.description
|
|
):
|
|
return None
|
|
|
|
if "DEV:simulate_complaint" in mask.description:
|
|
action = DeveloperModeAction(
|
|
mask_id=mask.metrics_id,
|
|
action="simulate_complaint",
|
|
new_destination_address=f"complaint+{mask.metrics_id}@simulator.amazonses.com",
|
|
)
|
|
else:
|
|
action = DeveloperModeAction(mask_id=mask.metrics_id, action="log")
|
|
return action
|
|
|
|
|
|
def _log_dev_notification(
|
|
log_message: str, dev_action: DeveloperModeAction, notification: dict[str, Any]
|
|
) -> None:
|
|
"""
|
|
Log notification JSON
|
|
|
|
This will log information beyond our privacy policy, so it should only be used on
|
|
Relay staff accounts with prior permission.
|
|
|
|
The notification JSON will be compressed, Ascii85-encoded with padding, and broken
|
|
into 1024-bytes chunks. This will ensure it fits into GCP's log entry, which has a
|
|
64KB limit per label value.
|
|
"""
|
|
|
|
notification_gza85 = encode_dict_gza85(notification)
|
|
total_parts = notification_gza85.count("\n") + 1
|
|
log_group_id = str(uuid4())
|
|
for partnum, part in enumerate(notification_gza85.splitlines()):
|
|
info_logger.info(
|
|
log_message,
|
|
extra={
|
|
"mask_id": dev_action.mask_id,
|
|
"dev_action": dev_action.action,
|
|
"log_group_id": log_group_id,
|
|
"part": partnum,
|
|
"parts": total_parts,
|
|
"notification_gza85": part,
|
|
},
|
|
)
|
|
|
|
|
|
def _convert_to_forwarded_email(
|
|
incoming_email_bytes: bytes,
|
|
headers: OutgoingHeaders,
|
|
to_address: str,
|
|
from_address: str,
|
|
language: str,
|
|
has_premium: bool,
|
|
sample_trackers: bool,
|
|
remove_level_one_trackers: bool,
|
|
now: datetime | None = None,
|
|
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
|
|
"""
|
|
Convert an email (as bytes) to a forwarded email.
|
|
|
|
Return is a tuple:
|
|
- email - The forwarded email
|
|
- issues - Any detected issues in conversion
|
|
- level_one_trackers_removed (int) - Number of trackers removed
|
|
- has_html - True if the email has an HTML representation
|
|
- has_text - True if the email has a plain text representation
|
|
"""
|
|
email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
|
|
# python/typeshed issue 2418
|
|
# The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
|
|
# policy.default.message_factory is EmailMessage
|
|
if not isinstance(email, EmailMessage):
|
|
raise TypeError("email must be type EmailMessage")
|
|
|
|
# Replace headers in the original email
|
|
header_issues = _replace_headers(email, headers)
|
|
|
|
# Find and replace text content
|
|
text_body = email.get_body("plain")
|
|
text_content = None
|
|
has_text = False
|
|
if text_body:
|
|
has_text = True
|
|
if not isinstance(text_body, EmailMessage):
|
|
raise TypeError("text_body must be type EmailMessage")
|
|
text_content = text_body.get_content()
|
|
new_text_content = _convert_text_content(text_content, to_address)
|
|
text_body.set_content(new_text_content)
|
|
|
|
# Find and replace HTML content
|
|
html_body = email.get_body("html")
|
|
level_one_trackers_removed = 0
|
|
has_html = False
|
|
if html_body:
|
|
has_html = True
|
|
if not isinstance(html_body, EmailMessage):
|
|
raise TypeError("html_body must be type EmailMessage")
|
|
html_content = html_body.get_content()
|
|
new_content, level_one_trackers_removed = _convert_html_content(
|
|
html_content,
|
|
to_address,
|
|
from_address,
|
|
language,
|
|
has_premium,
|
|
sample_trackers,
|
|
remove_level_one_trackers,
|
|
)
|
|
html_body.set_content(new_content, subtype="html")
|
|
elif text_content:
|
|
# Try to use the text content to generate HTML content
|
|
html_content = urlize_and_linebreaks(text_content)
|
|
new_content, level_one_trackers_removed = _convert_html_content(
|
|
html_content,
|
|
to_address,
|
|
from_address,
|
|
language,
|
|
has_premium,
|
|
sample_trackers,
|
|
remove_level_one_trackers,
|
|
)
|
|
if not isinstance(text_body, EmailMessage):
|
|
raise TypeError("text_body must be type EmailMessage")
|
|
try:
|
|
text_body.add_alternative(new_content, subtype="html")
|
|
except TypeError as e:
|
|
out = StringIO()
|
|
_structure(email, fp=out)
|
|
info_logger.error(
|
|
"Adding HTML alternate failed",
|
|
extra={"exception": str(e), "structure": out.getvalue()},
|
|
)
|
|
|
|
issues: EmailForwardingIssues = {}
|
|
if header_issues:
|
|
issues["headers"] = header_issues
|
|
return (email, issues, level_one_trackers_removed, has_html, has_text)
|
|
|
|
|
|
def _replace_headers(
|
|
email: EmailMessage, headers: OutgoingHeaders
|
|
) -> EmailHeaderIssues:
|
|
"""
|
|
Replace the headers in email with new headers.
|
|
|
|
This replaces headers in the passed email object, rather than returns an altered
|
|
copy. The primary reason is that the Python email package can read an email with
|
|
non-compliant headers or content, but can't write it. A read/write is required to
|
|
create a copy that we then alter. This code instead alters the passed EmailMessage
|
|
object, making header-specific changes in try / except statements.
|
|
|
|
The other reason is the object size. An Email can be up to 10 MB, and we hope to
|
|
support 40 MB emails someday. Modern servers may be OK with this, but it would be
|
|
nice to handle the non-compliant headers without crashing before we add a source of
|
|
memory-related crashes.
|
|
"""
|
|
# Look for headers to drop
|
|
to_drop: list[str] = []
|
|
replacements: set[str] = {_k.lower() for _k in headers.keys()}
|
|
issues: EmailHeaderIssues = []
|
|
|
|
# Detect non-compliant headers in incoming emails
|
|
for header in email.keys():
|
|
try:
|
|
value = email[header]
|
|
except Exception as e:
|
|
issues.append(
|
|
{"header": header, "direction": "in", "exception_on_read": repr(e)}
|
|
)
|
|
value = None
|
|
if getattr(value, "defects", None):
|
|
issues.append(
|
|
{
|
|
"header": header,
|
|
"direction": "in",
|
|
"defect_count": len(value.defects),
|
|
"parsed_value": str(value),
|
|
"raw_value": str(value.as_raw),
|
|
}
|
|
)
|
|
elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
|
|
issues.append(
|
|
{
|
|
"header": header,
|
|
"direction": "in",
|
|
"defect_count": len(value._parse_tree.all_defects),
|
|
"parsed_value": str(value),
|
|
"raw_value": str(value.as_raw),
|
|
}
|
|
)
|
|
|
|
# Collect headers that will not be forwarded
|
|
for header in email.keys():
|
|
header_lower = header.lower()
|
|
if (
|
|
header_lower not in replacements
|
|
and header_lower != "mime-version"
|
|
and not header_lower.startswith("content-")
|
|
):
|
|
to_drop.append(header)
|
|
|
|
# Drop headers that should be dropped
|
|
for header in to_drop:
|
|
del email[header]
|
|
|
|
# Replace the requested headers
|
|
for header, value in headers.items():
|
|
del email[header]
|
|
try:
|
|
email[header] = value.rstrip("\r\n")
|
|
except Exception as e:
|
|
issues.append(
|
|
{
|
|
"header": header,
|
|
"direction": "out",
|
|
"exception_on_write": repr(e),
|
|
"value": value,
|
|
}
|
|
)
|
|
continue
|
|
try:
|
|
parsed_value = email[header]
|
|
except Exception as e:
|
|
issues.append(
|
|
{
|
|
"header": header,
|
|
"direction": "out",
|
|
"exception_on_write": repr(e),
|
|
"value": value,
|
|
}
|
|
)
|
|
continue
|
|
if parsed_value.defects:
|
|
issues.append(
|
|
{
|
|
"header": header,
|
|
"direction": "out",
|
|
"defect_count": len(parsed_value.defects),
|
|
"parsed_value": str(parsed_value),
|
|
"raw_value": str(parsed_value.as_raw),
|
|
},
|
|
)
|
|
|
|
return issues
|
|
|
|
|
|
def _convert_html_content(
|
|
html_content: str,
|
|
to_address: str,
|
|
from_address: str,
|
|
language: str,
|
|
has_premium: bool,
|
|
sample_trackers: bool,
|
|
remove_level_one_trackers: bool,
|
|
now: datetime | None = None,
|
|
) -> tuple[str, int]:
|
|
# frontend expects a timestamp in milliseconds
|
|
now = now or datetime.now(UTC)
|
|
datetime_now_ms = int(now.timestamp() * 1000)
|
|
|
|
# scramble alias so that clients don't recognize it
|
|
# and apply default link styles
|
|
display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
|
|
|
|
# sample tracker numbers
|
|
if sample_trackers:
|
|
count_all_trackers(html_content)
|
|
|
|
tracker_report_link = ""
|
|
removed_count = 0
|
|
if remove_level_one_trackers:
|
|
html_content, tracker_details = remove_trackers(
|
|
html_content, from_address, datetime_now_ms
|
|
)
|
|
removed_count = tracker_details["tracker_removed"]
|
|
tracker_report_details = {
|
|
"sender": from_address,
|
|
"received_at": datetime_now_ms,
|
|
"trackers": tracker_details["level_one"]["trackers"],
|
|
}
|
|
tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
|
|
tracker_report_details
|
|
)
|
|
|
|
wrapped_html = wrap_html_email(
|
|
original_html=html_content,
|
|
language=language,
|
|
has_premium=has_premium,
|
|
display_email=display_email,
|
|
tracker_report_link=tracker_report_link,
|
|
num_level_one_email_trackers_removed=removed_count,
|
|
)
|
|
return wrapped_html, removed_count
|
|
|
|
|
|
def _convert_text_content(text_content: str, to_address: str) -> str:
|
|
relay_header_text = (
|
|
"This email was sent to your alias "
|
|
f"{to_address}. To stop receiving emails sent to this alias, "
|
|
"update the forwarding settings in your dashboard.\n"
|
|
"---Begin Email---\n"
|
|
)
|
|
wrapped_text = relay_header_text + text_content
|
|
return wrapped_text
|
|
|
|
|
|
def _build_reply_requires_premium_email(
|
|
from_address: str,
|
|
reply_record: Reply,
|
|
message_id: str | None,
|
|
decrypted_metadata: dict[str, Any] | None,
|
|
) -> EmailMessage:
|
|
# If we haven't forwarded a first reply for this user yet, _reply_allowed
|
|
# will forward. So, tell the user we forwarded it.
|
|
forwarded = not reply_record.address.user.profile.forwarded_first_reply
|
|
sender: str | None = ""
|
|
if decrypted_metadata is not None:
|
|
sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
|
|
ctx = {
|
|
"sender": sender or "",
|
|
"forwarded": forwarded,
|
|
"SITE_ORIGIN": settings.SITE_ORIGIN,
|
|
}
|
|
html_body = render_to_string("emails/reply_requires_premium.html", ctx)
|
|
text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
|
|
|
|
# Create the message
|
|
msg = EmailMessage()
|
|
msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
|
|
msg["From"] = get_reply_to_address()
|
|
msg["To"] = from_address
|
|
if message_id:
|
|
msg["In-Reply-To"] = message_id
|
|
msg["References"] = message_id
|
|
msg.set_content(text_body)
|
|
msg.add_alternative(html_body, subtype="html")
|
|
return msg
|
|
|
|
|
|
def _set_forwarded_first_reply(profile):
|
|
profile.forwarded_first_reply = True
|
|
profile.save()
|
|
|
|
|
|
def _send_reply_requires_premium_email(
|
|
from_address: str,
|
|
reply_record: Reply,
|
|
message_id: str | None,
|
|
decrypted_metadata: dict[str, Any] | None,
|
|
) -> None:
|
|
msg = _build_reply_requires_premium_email(
|
|
from_address, reply_record, message_id, decrypted_metadata
|
|
)
|
|
try:
|
|
ses_send_raw_email(
|
|
source_address=get_reply_to_address(premium=False),
|
|
destination_address=from_address,
|
|
message=msg,
|
|
)
|
|
# If we haven't forwarded a first reply for this user yet, _reply_allowed will.
|
|
# So, updated the DB.
|
|
_set_forwarded_first_reply(reply_record.address.user.profile)
|
|
except ClientError as e:
|
|
logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
|
|
incr_if_enabled("free_user_reply_attempt", 1)
|
|
|
|
|
|
def _reply_allowed(
|
|
from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
|
|
):
|
|
stripped_from_address = _strip_localpart_tag(from_address)
|
|
reply_record_email = reply_record.address.user.email
|
|
stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
|
|
if (from_address == reply_record_email) or (
|
|
stripped_from_address == stripped_reply_record_address
|
|
):
|
|
# This is a Relay user replying to an external sender;
|
|
|
|
if not reply_record.profile.user.is_active:
|
|
return False
|
|
|
|
if reply_record.profile.is_flagged:
|
|
return False
|
|
|
|
if reply_record.owner_has_premium:
|
|
return True
|
|
|
|
# if we haven't forwarded a first reply for this user, return True to allow
|
|
# this first reply
|
|
allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
|
|
_send_reply_requires_premium_email(
|
|
from_address, reply_record, message_id, decrypted_metadata
|
|
)
|
|
return allow_first_reply
|
|
else:
|
|
# The From: is not a Relay user, so make sure this is a reply *TO* a
|
|
# premium Relay user
|
|
try:
|
|
address = _get_address(to_address)
|
|
if address.user.profile.has_premium:
|
|
return True
|
|
except ObjectDoesNotExist:
|
|
return False
|
|
incr_if_enabled("free_user_reply_attempt", 1)
|
|
return False
|
|
|
|
|
|
def _handle_reply(
|
|
from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
|
|
) -> HttpResponse:
|
|
"""
|
|
Handle a reply from a Relay user to an external email.
|
|
|
|
Returns (may be incomplete):
|
|
* 200 if the reply was sent
|
|
* 400 if the In-Reply-To and References headers are missing, none of the References
|
|
headers are a reply record, or the SES client raises an error
|
|
* 403 if the Relay user is not allowed to reply
|
|
* 404 if the S3-stored email is not found, or there is no matching Reply record in
|
|
the database
|
|
* 503 if the S3 client returns an error (other than not found), or the SES client
|
|
returns an error
|
|
|
|
TODO: Return a more appropriate status object (see _handle_received)
|
|
TODO: Document metrics emitted
|
|
"""
|
|
mail = message_json["mail"]
|
|
try:
|
|
(lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
|
|
except ReplyHeadersNotFound:
|
|
incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
|
|
return HttpResponse("No In-Reply-To header", status=400)
|
|
|
|
try:
|
|
reply_record = _get_reply_record_from_lookup_key(lookup_key)
|
|
except Reply.DoesNotExist:
|
|
incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
|
|
return HttpResponse("Unknown or stale In-Reply-To header", status=404)
|
|
|
|
address = reply_record.address
|
|
message_id = _get_message_id_from_headers(mail["headers"])
|
|
decrypted_metadata = json.loads(
|
|
decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
|
|
)
|
|
if not _reply_allowed(
|
|
from_address, to_address, reply_record, message_id, decrypted_metadata
|
|
):
|
|
log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
|
|
return HttpResponse("Relay replies require a premium account", status=403)
|
|
|
|
outbound_from_address = address.full_address
|
|
incr_if_enabled("reply_email", 1)
|
|
subject = mail["commonHeaders"].get("subject", "")
|
|
to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
|
|
headers: OutgoingHeaders = {
|
|
"Subject": subject,
|
|
"From": outbound_from_address,
|
|
"To": to_address,
|
|
"Reply-To": outbound_from_address,
|
|
}
|
|
|
|
try:
|
|
(email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
|
|
except ClientError as e:
|
|
if e.response["Error"].get("Code", "") == "NoSuchKey":
|
|
logger.error("s3_object_does_not_exist", extra=e.response["Error"])
|
|
log_email_dropped(reason="content_missing", mask=address, is_reply=True)
|
|
return HttpResponse("Email not in S3", status=404)
|
|
logger.error("s3_client_error_get_email", extra=e.response["Error"])
|
|
log_email_dropped(
|
|
reason="error_storage", mask=address, is_reply=True, can_retry=True
|
|
)
|
|
# we are returning a 500 so that SNS can retry the email processing
|
|
return HttpResponse("Cannot fetch the message content from S3", status=503)
|
|
|
|
email = message_from_bytes(email_bytes, policy=relay_policy)
|
|
if not isinstance(email, EmailMessage):
|
|
raise TypeError("email must be type EmailMessage")
|
|
|
|
# Convert to a reply email
|
|
# TODO: Issue #1747 - Remove wrapper / prefix in replies
|
|
_replace_headers(email, headers)
|
|
|
|
try:
|
|
ses_send_raw_email(
|
|
source_address=outbound_from_address,
|
|
destination_address=to_address,
|
|
message=email,
|
|
)
|
|
except ClientError:
|
|
log_email_dropped(reason="error_sending", mask=address, is_reply=True)
|
|
return HttpResponse("SES client error", status=400)
|
|
|
|
reply_record.increment_num_replied()
|
|
profile = address.user.profile
|
|
profile.update_abuse_metric(replied=True)
|
|
profile.last_engagement = datetime.now(UTC)
|
|
profile.save()
|
|
glean_logger().log_email_forwarded(mask=address, is_reply=True)
|
|
return HttpResponse("Sent email to final recipient.", status=200)
|
|
|
|
|
|
def _get_domain_address(
|
|
local_portion: str, domain_portion: str, create: bool = True
|
|
) -> DomainAddress:
|
|
"""
|
|
Find or create the DomainAddress for the parts of an email address.
|
|
|
|
If the domain_portion is for a valid subdomain, and create=True, a new DomainAddress
|
|
will be created and returned. If create=False, DomainAddress.DoesNotExist is raised.
|
|
|
|
If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
|
|
|
|
If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
|
|
"""
|
|
|
|
[address_subdomain, address_domain] = domain_portion.split(".", 1)
|
|
if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
|
|
if create:
|
|
incr_if_enabled("email_for_not_supported_domain", 1)
|
|
raise ObjectDoesNotExist("Address does not exist")
|
|
try:
|
|
with transaction.atomic():
|
|
locked_profile = Profile.objects.select_for_update().get(
|
|
subdomain=address_subdomain
|
|
)
|
|
domain_numerical = get_domain_numerical(address_domain)
|
|
# filter DomainAddress because it may not exist
|
|
# which will throw an error with get()
|
|
domain_address = DomainAddress.objects.filter(
|
|
user=locked_profile.user, address=local_portion, domain=domain_numerical
|
|
).first()
|
|
if domain_address is None:
|
|
if not create:
|
|
raise DomainAddress.DoesNotExist()
|
|
# TODO: Consider flows when a user generating alias on a fly
|
|
# was unable to receive an email due to user no longer being a
|
|
# premium user as seen in exception thrown on make_domain_address
|
|
domain_address = DomainAddress.make_domain_address(
|
|
locked_profile.user, local_portion, True
|
|
)
|
|
glean_logger().log_email_mask_created(
|
|
mask=domain_address,
|
|
created_by_api=False,
|
|
)
|
|
domain_address.last_used_at = datetime.now(UTC)
|
|
domain_address.save()
|
|
return domain_address
|
|
except Profile.DoesNotExist as e:
|
|
if create:
|
|
incr_if_enabled("email_for_dne_subdomain", 1)
|
|
raise e
|
|
|
|
|
|
def _get_address(address: str, create: bool = True) -> RelayAddress | DomainAddress:
|
|
"""
|
|
Find or create the RelayAddress or DomainAddress for an email address.
|
|
|
|
If an unknown email address is for a valid subdomain, and create is True,
|
|
a new DomainAddress will be created.
|
|
|
|
On failure, raises exception based on Django's ObjectDoesNotExist:
|
|
* RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
|
|
* Profile.DoesNotExist - looks like DomainAddress, no subdomain match
|
|
* DomainAddress.DoesNotExist - looks like unknown DomainAddress, create is False
|
|
* ObjectDoesNotExist - Unknown domain
|
|
"""
|
|
|
|
local_portion, domain_portion = address.split("@")
|
|
local_address = local_portion.lower()
|
|
domain = domain_portion.lower()
|
|
|
|
# if the domain is not the site's 'top' relay domain,
|
|
# it may be for a user's subdomain
|
|
email_domains = get_domains_from_settings().values()
|
|
if domain not in email_domains:
|
|
return _get_domain_address(local_address, domain, create)
|
|
|
|
# the domain is the site's 'top' relay domain, so look up the RelayAddress
|
|
try:
|
|
domain_numerical = get_domain_numerical(domain)
|
|
relay_address = RelayAddress.objects.get(
|
|
address=local_address, domain=domain_numerical
|
|
)
|
|
return relay_address
|
|
except RelayAddress.DoesNotExist as e:
|
|
if not create:
|
|
raise e
|
|
try:
|
|
DeletedAddress.objects.get(
|
|
address_hash=address_hash(local_address, domain=domain)
|
|
)
|
|
incr_if_enabled("email_for_deleted_address", 1)
|
|
# TODO: create a hard bounce receipt rule in SES
|
|
except DeletedAddress.DoesNotExist:
|
|
incr_if_enabled("email_for_unknown_address", 1)
|
|
except DeletedAddress.MultipleObjectsReturned:
|
|
# not sure why this happens on stage but let's handle it
|
|
incr_if_enabled("email_for_deleted_address_multiple", 1)
|
|
raise e
|
|
|
|
|
|
def _get_address_if_exists(address: str) -> RelayAddress | DomainAddress | None:
|
|
"""Get the matching RelayAddress or DomainAddress, or None if it doesn't exist."""
|
|
try:
|
|
return _get_address(address, create=False)
|
|
except (RelayAddress.DoesNotExist, Profile.DoesNotExist, ObjectDoesNotExist):
|
|
return None
|
|
|
|
|
|
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
|
|
"""
|
|
Handle an AWS SES bounce notification.
|
|
|
|
For more information, see:
|
|
https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
|
|
|
|
Returns:
|
|
* 404 response if any email address does not match a user,
|
|
* 200 response if all match or none are given
|
|
|
|
Emits a counter metric "email_bounce" with these tags:
|
|
* bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
|
|
* bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
|
|
* user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
|
|
* relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
|
|
|
|
Emits an info log "bounce_notification", same data as metric, plus:
|
|
* bounce_action: 'action' from bounced recipient data, or None
|
|
* bounce_status: 'status' from bounced recipient data, or None
|
|
* bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
|
|
* bounce_extra: Extra data from bounce_recipient data, if any
|
|
* domain: User's real email address domain, if an address was given
|
|
* fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
|
|
"""
|
|
bounce = message_json.get("bounce", {})
|
|
bounce_type = bounce.get("bounceType", "none")
|
|
bounce_subtype = bounce.get("bounceSubType", "none")
|
|
bounced_recipients = bounce.get("bouncedRecipients", [])
|
|
|
|
now = datetime.now(UTC)
|
|
bounce_data = []
|
|
for recipient in bounced_recipients:
|
|
recipient_address = recipient.pop("emailAddress", None)
|
|
data = {
|
|
"bounce_type": bounce_type,
|
|
"bounce_subtype": bounce_subtype,
|
|
"bounce_action": recipient.pop("action", ""),
|
|
"bounce_status": recipient.pop("status", ""),
|
|
"bounce_diagnostic": recipient.pop("diagnosticCode", ""),
|
|
"user_match": "no_address",
|
|
"relay_action": "no_action",
|
|
}
|
|
if recipient:
|
|
data["bounce_extra"] = recipient.copy()
|
|
bounce_data.append(data)
|
|
|
|
if recipient_address is None:
|
|
continue
|
|
|
|
recipient_address = parseaddr(recipient_address)[1]
|
|
recipient_domain = recipient_address.split("@")[1]
|
|
data["domain"] = recipient_domain
|
|
|
|
try:
|
|
user = User.objects.get(email=recipient_address)
|
|
profile = user.profile
|
|
data["user_match"] = "found"
|
|
if (fxa := profile.fxa) and profile.metrics_enabled:
|
|
data["fxa_id"] = fxa.uid
|
|
else:
|
|
data["fxa_id"] = ""
|
|
except User.DoesNotExist:
|
|
# TODO: handle bounce for a user who no longer exists
|
|
# add to SES account-wide suppression list?
|
|
data["user_match"] = "missing"
|
|
continue
|
|
|
|
action = None
|
|
if "spam" in data["bounce_diagnostic"].lower():
|
|
# if an email bounced as spam, set to auto block spam for this user
|
|
# and DON'T set them into bounce pause state
|
|
action = "auto_block_spam"
|
|
profile.auto_block_spam = True
|
|
elif bounce_type == "Permanent":
|
|
# TODO: handle sub-types: 'General', 'NoEmail', etc.
|
|
action = "hard_bounce"
|
|
profile.last_hard_bounce = now
|
|
elif bounce_type == "Transient":
|
|
# TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
|
|
action = "soft_bounce"
|
|
profile.last_soft_bounce = now
|
|
if action:
|
|
data["relay_action"] = action
|
|
profile.save()
|
|
|
|
if not bounce_data:
|
|
# Data when there are no identified recipients
|
|
bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
|
|
|
|
for data in bounce_data:
|
|
tags = {
|
|
"bounce_type": bounce_type,
|
|
"bounce_subtype": bounce_subtype,
|
|
"user_match": data["user_match"],
|
|
"relay_action": data["relay_action"],
|
|
}
|
|
incr_if_enabled(
|
|
"email_bounce",
|
|
1,
|
|
tags=[generate_tag(key, val) for key, val in tags.items()],
|
|
)
|
|
info_logger.info("bounce_notification", extra=data)
|
|
|
|
if any(data["user_match"] == "missing" for data in bounce_data):
|
|
return HttpResponse("Address does not exist", status=404)
|
|
return HttpResponse("OK", status=200)
|
|
|
|
|
|
def _build_disabled_mask_for_spam_email(
|
|
mask: RelayAddress | DomainAddress,
|
|
) -> EmailMessage:
|
|
ctx = {"mask": mask.full_address, "SITE_ORIGIN": settings.SITE_ORIGIN}
|
|
html_body = render_to_string("emails/disabled_mask_for_spam.html", ctx)
|
|
text_body = render_to_string("emails/disabled_mask_for_spam.txt", ctx)
|
|
|
|
# Create the message
|
|
msg = EmailMessage()
|
|
msg["Subject"] = ftl_bundle.format("relay-deactivated-mask-email-subject")
|
|
msg["From"] = settings.RELAY_FROM_ADDRESS
|
|
msg["To"] = mask.user.email
|
|
msg.set_content(text_body)
|
|
msg.add_alternative(html_body, subtype="html")
|
|
return msg
|
|
|
|
|
|
def _send_disabled_mask_for_spam_email(mask: RelayAddress | DomainAddress) -> None:
|
|
msg = _build_disabled_mask_for_spam_email(mask)
|
|
if not settings.RELAY_FROM_ADDRESS:
|
|
raise ValueError(
|
|
"Must set settings.RELAY_FROM_ADDRESS to send disabled_mask_for_spam email."
|
|
)
|
|
try:
|
|
ses_send_raw_email(
|
|
source_address=settings.RELAY_FROM_ADDRESS,
|
|
destination_address=mask.user.email,
|
|
message=msg,
|
|
)
|
|
except ClientError as e:
|
|
logger.error("send_disabled_mask_ses_client_error", extra=e.response["Error"])
|
|
incr_if_enabled("send_disabled_mask_email", 1)
|
|
|
|
|
|
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
|
|
"""
|
|
Handle an AWS SES complaint notification.
|
|
|
|
This looks for Relay users in the complainedRecipients (real email address)
|
|
and the From: header (mask address). We expect both to match the same Relay user,
|
|
and return a 200. If one or the other do not match, a 404 is returned, and errors
|
|
may be logged.
|
|
|
|
The first time a user complains, this sets the user's auto_block_spam flag to True.
|
|
|
|
The second time a user complains, this disables the mask thru which the spam mail
|
|
was forwarded, and sends an email to the user to notify them the mask is disabled
|
|
and can be re-enabled on their dashboard.
|
|
|
|
For more information on the complaint notification, see:
|
|
https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
|
|
|
|
Returns:
|
|
* 404 response if any email address does not match a user,
|
|
* 200 response if all match or none are given
|
|
|
|
Emits a counter metric "email_complaint" with these tags:
|
|
* complaint_subtype: 'onaccountsuppressionlist', or 'none' if omitted
|
|
* complaint_feedback - feedback enumeration from ISP (usually 'abuse') or 'none'
|
|
* user_match: 'found' or 'no_recipients'
|
|
* relay_action: 'no_action', 'auto_block_spam', or 'disable_mask'
|
|
|
|
Emits an info log "complaint_notification", same data as metric, plus:
|
|
* complaint_user_agent - identifies the client used to file the complaint
|
|
* complaint_extra - Extra data from complainedRecipients data, if any
|
|
* domain - User's domain, if an address was given
|
|
* found_in - "complained_recipients" (real email), "from_header" (email mask),
|
|
or "all" (matching records found in both)
|
|
* fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
|
|
* mask_match - "found" if "From" header contains an email mask, or "not_found"
|
|
"""
|
|
complaint_data = _get_complaint_data(message_json)
|
|
complainers, unknown_count = _gather_complainers(complaint_data)
|
|
|
|
# Reduce future complaints from complaining Relay users
|
|
actions: list[ComplaintAction] = []
|
|
for complainer in complainers:
|
|
action = _reduce_future_complaints(complainer)
|
|
actions.append(action)
|
|
|
|
if (
|
|
flag_is_active_in_task("developer_mode", complainer["user"])
|
|
and action.mask_id
|
|
):
|
|
_log_dev_notification(
|
|
"_handle_complaint: developer_mode",
|
|
DeveloperModeAction(mask_id=action.mask_id, action="log"),
|
|
message_json,
|
|
)
|
|
|
|
# Log complaint and actions taken
|
|
if not actions:
|
|
# Log the complaint but that no action was taken
|
|
actions.append(ComplaintAction(user_match="no_recipients"))
|
|
for action in actions:
|
|
tags = [
|
|
generate_tag(key, val)
|
|
for key, val in {
|
|
"complaint_subtype": complaint_data.subtype or "none",
|
|
"complaint_feedback": complaint_data.feedback_type or "none",
|
|
"user_match": action.user_match,
|
|
"relay_action": action.relay_action,
|
|
}.items()
|
|
]
|
|
incr_if_enabled("email_complaint", tags=tags)
|
|
|
|
log_extra = {
|
|
"complaint_subtype": complaint_data.subtype or None,
|
|
"complaint_user_agent": complaint_data.user_agent or None,
|
|
"complaint_feedback": complaint_data.feedback_type or None,
|
|
}
|
|
log_extra.update(
|
|
{
|
|
key: value
|
|
for key, value in action._asdict().items()
|
|
if (value is not None and key != "mask_id")
|
|
}
|
|
)
|
|
info_logger.info("complaint_notification", extra=log_extra)
|
|
|
|
if unknown_count:
|
|
return HttpResponse("Address does not exist", status=404)
|
|
return HttpResponse("OK", status=200)
|
|
|
|
|
|
class RawComplaintData(NamedTuple):
|
|
complained_recipients: list[tuple[str, dict[str, Any]]]
|
|
from_addresses: list[str]
|
|
subtype: str
|
|
user_agent: str
|
|
feedback_type: str
|
|
|
|
|
|
def _get_complaint_data(message_json: AWS_SNSMessageJSON) -> RawComplaintData:
|
|
"""
|
|
Extract complaint data from an AWS SES Complaint Notification.
|
|
|
|
This extracts only the data used by _handle_complaint(). It also works on
|
|
complaint events, which have a similar structure and the same data needed
|
|
by _handle_complaint.
|
|
|
|
For more information on the complaint notification, see:
|
|
https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
|
|
"""
|
|
complaint = message_json["complaint"]
|
|
|
|
T = TypeVar("T")
|
|
|
|
def get_or_log(
|
|
key: str, source: dict[str, T], data_type: type[T]
|
|
) -> tuple[T, bool]:
|
|
"""Get a value from a dictionary, or log if not found"""
|
|
if key in source:
|
|
return source[key], True
|
|
logger.error(
|
|
"_get_complaint_data: Unexpected message format",
|
|
extra={"missing_key": key, "found_keys": ",".join(sorted(source.keys()))},
|
|
)
|
|
return data_type(), False
|
|
|
|
raw_recipients, has_cr = get_or_log("complainedRecipients", complaint, list)
|
|
complained_recipients = []
|
|
no_entries = True
|
|
for entry in raw_recipients:
|
|
no_entries = False
|
|
raw_email_address, has_email = get_or_log("emailAddress", entry, str)
|
|
if has_email:
|
|
email_address = parseaddr(raw_email_address)[1]
|
|
extra = {
|
|
key: value for key, value in entry.items() if key != "emailAddress"
|
|
}
|
|
complained_recipients.append((email_address, extra))
|
|
if has_cr and no_entries:
|
|
logger.error("_get_complaint_data: Empty complainedRecipients")
|
|
|
|
mail, has_mail = get_or_log("mail", message_json, dict)
|
|
if has_mail:
|
|
commonHeaders, has_ch = get_or_log("commonHeaders", mail, dict)
|
|
else:
|
|
commonHeaders, has_ch = {}, False
|
|
if has_ch:
|
|
raw_from_addresses, _ = get_or_log("from", commonHeaders, list)
|
|
else:
|
|
raw_from_addresses = []
|
|
from_addresses = [parseaddr(addr)[1] for addr in raw_from_addresses]
|
|
|
|
# Only present when set
|
|
feedback_type = complaint.get("complaintFeedbackType", "")
|
|
# Only present when destination is on account suppression list
|
|
subtype = complaint.get("complaintSubType", "")
|
|
# Only present for feedback reports
|
|
user_agent = complaint.get("userAgent", "")
|
|
|
|
return RawComplaintData(
|
|
complained_recipients, from_addresses, subtype, user_agent, feedback_type
|
|
)
|
|
|
|
|
|
class Complainer(TypedDict):
|
|
user: User
|
|
found_in: Literal["complained_recipients", "from_header", "all"]
|
|
domain: str
|
|
extra: dict[str, Any] | None
|
|
masks: list[RelayAddress | DomainAddress]
|
|
|
|
|
|
def _gather_complainers(
|
|
complaint_data: RawComplaintData,
|
|
) -> tuple[list[Complainer], int]:
|
|
"""
|
|
Fetch Relay Users and masks from the complaint data.
|
|
|
|
This matches data from an AWS SES Complaint Notification (as extracted by
|
|
_get_complaint_data()) to the Relay database, and returns the Users,
|
|
RelayAddresses, and DomainAddresses, as well as status and extra data.
|
|
|
|
If the complaint came from the AWS SES complaint simulator, detect
|
|
developer_mode and move forward with the developer's User data.
|
|
"""
|
|
|
|
users: dict[int, Complainer] = {}
|
|
unknown_complainer_count = 0
|
|
for email_address, extra_data in complaint_data.complained_recipients:
|
|
local, domain = email_address.split("@", 1)
|
|
|
|
# If the complainer is the AWS SES complaint simulation, assume that
|
|
# it was send by a user with the developer_mode flag. Look for
|
|
# a mask that matches the embedded mask metrics_id, and use
|
|
# the related user's email instead of the AWS simulator address.
|
|
# See docs/developer_mode.md
|
|
if domain == "simulator.amazonses.com" and local.startswith("complaint+"):
|
|
mask_metrics_id = local.removeprefix("complaint+")
|
|
mask = _get_mask_by_metrics_id(mask_metrics_id)
|
|
if mask:
|
|
email_address = mask.user.email
|
|
domain = mask.user.email.split("@")[1]
|
|
|
|
try:
|
|
user = User.objects.get(email=email_address)
|
|
except User.DoesNotExist:
|
|
logger.error("_gather_complainers: unknown complainedRecipient")
|
|
unknown_complainer_count += 1
|
|
continue
|
|
|
|
if user.id in users:
|
|
logger.error("_gather_complainers: complainer appears twice")
|
|
continue
|
|
|
|
users[user.id] = {
|
|
"user": user,
|
|
"found_in": "complained_recipients",
|
|
"domain": domain,
|
|
"extra": extra_data or None,
|
|
"masks": [],
|
|
}
|
|
|
|
# Collect From: addresses and their users
|
|
unknown_sender_count = 0
|
|
for email_address in complaint_data.from_addresses:
|
|
mask = _get_address_if_exists(email_address)
|
|
if not mask:
|
|
logger.error("_gather_complainers: unknown mask, maybe deleted?")
|
|
unknown_sender_count += 1
|
|
continue
|
|
|
|
if mask.user.id not in users:
|
|
# Add mask-only entry to users
|
|
users[mask.user.id] = {
|
|
"user": mask.user,
|
|
"found_in": "from_header",
|
|
"domain": mask.user.email.split("@")[1],
|
|
"extra": None,
|
|
"masks": [mask],
|
|
}
|
|
continue
|
|
|
|
user_data = users[mask.user.id]
|
|
if mask in user_data["masks"]:
|
|
logger.error("_gather_complainers: mask appears twice")
|
|
continue
|
|
|
|
user_data["masks"].append(mask)
|
|
if user_data["found_in"] in ("all", "complained_recipients"):
|
|
user_data["found_in"] = "all"
|
|
else:
|
|
logger.error("_gather_complainers: no complainer, multi-mask")
|
|
|
|
return (list(users.values()), unknown_complainer_count + unknown_sender_count)
|
|
|
|
|
|
def _get_mask_by_metrics_id(metrics_id: str) -> RelayAddress | DomainAddress | None:
|
|
"""Look up a mask by metrics ID, or None if not found."""
|
|
if not metrics_id or metrics_id[0] not in ("R", "D"):
|
|
return None
|
|
mask_type_id = metrics_id[0]
|
|
mask_raw_id = metrics_id[1:]
|
|
try:
|
|
mask_id = int(mask_raw_id)
|
|
except ValueError:
|
|
return None # ID is not an int, do not try to match to Relay mask
|
|
|
|
if mask_type_id == "R":
|
|
try:
|
|
return RelayAddress.objects.get(id=mask_id)
|
|
except RelayAddress.DoesNotExist:
|
|
return None
|
|
try:
|
|
return DomainAddress.objects.get(id=mask_id)
|
|
except DomainAddress.DoesNotExist:
|
|
return None
|
|
|
|
|
|
class ComplaintAction(NamedTuple):
|
|
user_match: Literal["found", "no_recipients"]
|
|
relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
|
|
mask_match: Literal["found", "not_found"] = "not_found"
|
|
mask_id: str | None = None
|
|
found_in: Literal["complained_recipients", "from_header", "all"] | None = None
|
|
fxa_id: str | None = None
|
|
domain: str | None = None
|
|
complaint_extra: str | None = None
|
|
|
|
|
|
def _reduce_future_complaints(complainer: Complainer) -> ComplaintAction:
|
|
"""Take action to reduce future complaints from complaining user."""
|
|
|
|
user = complainer["user"]
|
|
mask_match: Literal["found", "not_found"] = "not_found"
|
|
relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
|
|
mask_id = None
|
|
|
|
if not user.profile.auto_block_spam:
|
|
relay_action = "auto_block_spam"
|
|
user.profile.auto_block_spam = True
|
|
user.profile.save()
|
|
|
|
for mask in complainer["masks"]:
|
|
mask_match = "found"
|
|
mask_id = mask.metrics_id
|
|
if (
|
|
flag_is_active_in_task("disable_mask_on_complaint", user)
|
|
and mask.enabled
|
|
and relay_action != "auto_block_spam"
|
|
):
|
|
relay_action = "disable_mask"
|
|
mask.enabled = False
|
|
mask.save()
|
|
_send_disabled_mask_for_spam_email(mask)
|
|
|
|
return ComplaintAction(
|
|
user_match="found",
|
|
relay_action=relay_action,
|
|
mask_match=mask_match,
|
|
mask_id=mask_id,
|
|
fxa_id=user.profile.metrics_fxa_id,
|
|
domain=complainer["domain"],
|
|
found_in=complainer["found_in"],
|
|
complaint_extra=(
|
|
json.dumps(complainer["extra"]) if complainer["extra"] else None
|
|
),
|
|
)
|
|
|
|
|
|
_WAFFLE_FLAGS_INITIALIZED = False
|
|
|
|
|
|
def init_waffle_flags() -> None:
|
|
"""Initialize waffle flags for email tasks"""
|
|
global _WAFFLE_FLAGS_INITIALIZED
|
|
if _WAFFLE_FLAGS_INITIALIZED:
|
|
return
|
|
|
|
flags: list[tuple[str, str]] = [
|
|
(
|
|
"disable_mask_on_complaint",
|
|
"MPP-3119: When a Relay user marks an email as spam, disable the mask.",
|
|
),
|
|
(
|
|
"developer_mode",
|
|
"MPP-3932: Enable logging and overrides for Relay developers.",
|
|
),
|
|
]
|
|
waffle_flag_table = get_waffle_flag_model().objects
|
|
for name, note in flags:
|
|
waffle_flag_table.get_or_create(name=name, defaults={"note": note})
|
|
_WAFFLE_FLAGS_INITIALIZED = True
|