fx-private-relay/emails/policy.py

86 строки
3.1 KiB
Python

"""
Implement Python's email.policy.Policy for Relay
The library provides email.policy.default, which parses and generates RFC-compliant
email headers. However, Relay needs to be able to handle emails with non-compliant
headers as well.
The main change is to use a custom header_factory, which:
* For each header, adds the UnstructuredHeader variant as .as_unstructured
* Handles non-compliant Message-IDs generated by Microsoft alerts
See:
https://docs.python.org/3/library/email.headerregistry.html
https://docs.python.org/3/library/email.policy.html
https://github.com/python/cpython/blob/main/Lib/email/_policybase.py
https://github.com/python/cpython/blob/main/Lib/email/headerregistry.py
https://github.com/python/cpython/blob/main/Lib/email/policy.py
"""
from email import errors
from email._header_value_parser import InvalidMessageID, get_unstructured
from email.headerregistry import BaseHeader, UnstructuredHeader
from email.headerregistry import HeaderRegistry as PythonHeaderRegistry
from email.headerregistry import MessageIDHeader as PythonMessageIDHeader
from email.policy import EmailPolicy
from typing import TYPE_CHECKING, cast
if TYPE_CHECKING:
# _HeaderParser is a protocol from mypy's typeshed
# https://github.com/python/typeshed/blob/main/stdlib/email/headerregistry.pyi
from email.headerregistry import _HeaderParser
class RelayMessageIDHeader(PythonMessageIDHeader):
"""
Handle an IndexError raised by parsing an invalid Message-ID header.
This issue is tracked in
https://github.com/python/cpython/issues/105802
A fix is unmerged as of October 2023:
https://github.com/python/cpython/pull/108133
"""
@classmethod
def parse(cls, value, kwds):
try:
parse_tree = cls.value_parser(value)
except IndexError:
token = get_unstructured(value)
message_id = InvalidMessageID(token)
message_id.defects.append(
errors.InvalidHeaderDefect(
f"IndexError for invalid msg-id in '{value}'"
)
)
parse_tree = message_id
kwds["parse_tree"] = parse_tree
kwds["decoded"] = str(parse_tree)
kwds["defects"].extend(parse_tree.all_defects)
class RelayHeaderRegistry(PythonHeaderRegistry):
"""Extend the HeaderRegistry to store the unstructured header."""
def __call__(self, name: str, value: str) -> BaseHeader:
"""Add the unstructured header as .as_unstructured."""
header_instance = super().__call__(name, value)
as_unstructured_cls = type(
"_UnstructuredHeader", (UnstructuredHeader, self.base_class), {}
)
as_unstructured = as_unstructured_cls(name, value)
# Avoid mypy attr-defined error for setting a dynamic attribute
setattr(header_instance, "as_unstructured", as_unstructured)
setattr(header_instance, "as_raw", value)
return header_instance
relay_header_factory = RelayHeaderRegistry()
relay_header_factory.registry["message-id"] = cast(
type["_HeaderParser"], RelayMessageIDHeader
)
relay_policy = EmailPolicy(header_factory=relay_header_factory)