* Add Amazon SES hook

* Add SES Hook to operators-and-hooks documentation.

* Fix arguments for parent class constructor call (PR feedback)

* Fix indentation in operators-and-hooks documentation

* Fix mypy error for argument on call to parent class constructor

* Simplify logic on constructor (PR feedback)

* Add custom headers and other relevant options to hook

* Change pylint exception rule to apply it only to function instead of module (PR feedback)

* Fix spellcheck error

* Vendorize airflow.utils.emaail

* fixup! Vendorize airflow.utils.emaail

Co-authored-by: Kamil Breguła <kamil.bregula@polidea.com>
This commit is contained in:
Ignacio Peluffo 2020-08-21 08:32:25 +01:00 коммит произвёл GitHub
Родитель 88c7d2e526
Коммит 27d08b76a2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 284 добавлений и 11 удалений

Просмотреть файл

@ -0,0 +1,100 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains AWS SES Hook
"""
from typing import Any, Dict, Iterable, List, Optional, Union
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.utils.email import build_mime_message
class SESHook(AwsBaseHook):
"""
Interact with Amazon Simple Email Service.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""
def __init__(self, *args, **kwargs) -> None:
kwargs['client_type'] = 'ses'
super().__init__(*args, **kwargs)
def send_email( # pylint: disable=too-many-arguments
self,
mail_from: str,
to: Union[str, Iterable[str]],
subject: str,
html_content: str,
files: Optional[List[str]] = None,
cc: Optional[Union[str, Iterable[str]]] = None,
bcc: Optional[Union[str, Iterable[str]]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
reply_to: Optional[str] = None,
return_path: Optional[str] = None,
custom_headers: Optional[Dict[str, Any]] = None
) -> dict:
"""
Send email using Amazon Simple Email Service
:param mail_from: Email address to set as email's from
:param to: List of email addresses to set as email's to
:param subject: Email's subject
:param html_content: Content of email in HTML format
:param files: List of paths of files to be attached
:param cc: List of email addresses to set as email's CC
:param bcc: List of email addresses to set as email's BCC
:param mime_subtype: Can be used to specify the sub-type of the message. Default = mixed
:param mime_charset: Email's charset. Default = UTF-8.
:param return_path: The email address to which replies will be sent. By default, replies
are sent to the original sender's email address.
:param reply_to: The email address to which message bounces and complaints should be sent.
"Return-Path" is sometimes called "envelope from," "envelope sender," or "MAIL FROM."
:param custom_headers: Additional headers to add to the MIME message.
No validations are run on these values and they should be able to be encoded.
:return: Response from Amazon SES service with unique message identifier.
"""
ses_client = self.get_conn()
custom_headers = custom_headers or {}
if reply_to:
custom_headers['Reply-To'] = reply_to
if return_path:
custom_headers['Return-Path'] = return_path
message, recipients = build_mime_message(
mail_from=mail_from,
to=to,
subject=subject,
html_content=html_content,
files=files,
cc=cc,
bcc=bcc,
mime_subtype=mime_subtype,
mime_charset=mime_charset,
custom_headers=custom_headers,
)
return ses_client.send_raw_email(
Source=mail_from, Destinations=recipients, RawMessage={'Data': message.as_string()}
)

Просмотреть файл

@ -24,7 +24,7 @@ from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from typing import Iterable, List, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
@ -47,10 +47,18 @@ def send_email(to: Union[List[str], Iterable[str]], subject: str, html_content:
mime_subtype=mime_subtype, mime_charset=mime_charset, **kwargs)
def send_email_smtp(to, subject, html_content, files=None,
dryrun=False, cc=None, bcc=None,
mime_subtype='mixed', mime_charset='utf-8',
**kwargs):
def send_email_smtp(
to: Union[str, Iterable[str]],
subject: str,
html_content: str,
files: Optional[List[str]] = None,
dryrun: bool = False,
cc: Optional[Union[str, Iterable[str]]] = None,
bcc: Optional[Union[str, Iterable[str]]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
**kwargs,
):
"""
Send an email with html content
@ -58,11 +66,55 @@ def send_email_smtp(to, subject, html_content, files=None,
"""
smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM')
msg, recipients = build_mime_message(
mail_from=smtp_mail_from,
to=to,
subject=subject,
html_content=html_content,
files=files,
cc=cc,
bcc=bcc,
mime_subtype=mime_subtype,
mime_charset=mime_charset,
)
send_mime_email(e_from=smtp_mail_from, e_to=recipients, mime_msg=msg, dryrun=dryrun)
def build_mime_message(
mail_from: str,
to: Union[str, Iterable[str]],
subject: str,
html_content: str,
files: Optional[List[str]] = None,
cc: Optional[Union[str, Iterable[str]]] = None,
bcc: Optional[Union[str, Iterable[str]]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
custom_headers: Optional[Dict[str, Any]] = None,
) -> Tuple[MIMEMultipart, List[str]]:
"""
Build a MIME message that can be used to send an email and
returns full list of recipients.
:param mail_from: Email address to set as email's from
:param to: List of email addresses to set as email's to
:param subject: Email's subject
:param html_content: Content of email in HTML format
:param files: List of paths of files to be attached
:param cc: List of email addresses to set as email's CC
:param bcc: List of email addresses to set as email's BCC
:param mime_subtype: Can be used to specify the subtype of the message. Default = mixed
:param mime_charset: Email's charset. Default = UTF-8.
:param custom_headers: Additional headers to add to the MIME message.
No validations are run on these values and they should be able to be encoded.
:return: Email as MIMEMultipart and list of recipients' addresses.
"""
to = get_email_address_list(to)
msg = MIMEMultipart(mime_subtype)
msg['Subject'] = subject
msg['From'] = smtp_mail_from
msg['From'] = mail_from
msg['To'] = ", ".join(to)
recipients = to
if cc:
@ -86,14 +138,18 @@ def send_email_smtp(to, subject, html_content, files=None,
file.read(),
Name=basename
)
part['Content-Disposition'] = 'attachment; filename="%s"' % basename
part['Content-ID'] = '<%s>' % basename
part['Content-Disposition'] = f'attachment; filename="{basename}"'
part['Content-ID'] = f'<{basename}>'
msg.attach(part)
send_mime_email(smtp_mail_from, recipients, msg, dryrun)
if custom_headers:
for header_key, header_value in custom_headers.items():
msg[header_key] = header_value
return msg, recipients
def send_mime_email(e_from, e_to, mime_msg, dryrun=False):
def send_mime_email(e_from: str, e_to: List[str], mime_msg: MIMEMultipart, dryrun: bool = False) -> None:
"""
Send MIME email.
"""

Просмотреть файл

@ -430,6 +430,17 @@ class RefactorBackportPackages:
rename("airflow.providers.amazon.common.utils.typing_compat")
)
copyfile(
os.path.join(get_source_airflow_folder(), "airflow", "utils", "email.py"),
os.path.join(get_target_providers_package_folder("amazon"), "common", "utils", "email.py")
)
(
self.qry.
select_module("airflow.utils.email").
filter(callback=amazon_package_filter).
rename("airflow.providers.amazon.common.utils.email")
)
def refactor_google_package(self):
r"""
Fixes to "google" providers package.

Просмотреть файл

@ -486,6 +486,12 @@ These integrations allow you to perform various operations within the Amazon Web
:mod:`airflow.providers.amazon.aws.sensors.sagemaker_transform`,
:mod:`airflow.providers.amazon.aws.sensors.sagemaker_tuning`
* - `Amazon Simple Email Service (SES) <https://aws.amazon.com/ses/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.ses`
-
-
* - `Amazon Simple Notification Service (SNS) <https://aws.amazon.com/sns/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.sns`

Просмотреть файл

@ -0,0 +1,78 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import boto3
import pytest
from moto import mock_ses
from airflow.providers.amazon.aws.hooks.ses import SESHook
boto3.setup_default_session()
@mock_ses
def test_get_conn():
hook = SESHook(aws_conn_id='aws_default')
assert hook.get_conn() is not None
@mock_ses
@pytest.mark.parametrize('to',
[
'to@domain.com',
['to1@domain.com', 'to2@domain.com'],
'to1@domain.com,to2@domain.com'
])
@pytest.mark.parametrize('cc',
[
'cc@domain.com',
['cc1@domain.com', 'cc2@domain.com'],
'cc1@domain.com,cc2@domain.com'
])
@pytest.mark.parametrize('bcc',
[
'bcc@domain.com',
['bcc1@domain.com', 'bcc2@domain.com'],
'bcc1@domain.com,bcc2@domain.com'
])
def test_send_email(to, cc, bcc):
# Given
hook = SESHook()
ses_client = hook.get_conn()
mail_from = 'test_from@domain.com'
# Amazon only allows to send emails from verified addresses,
# then we need to validate the from address before sending the email,
# otherwise this test would raise a `botocore.errorfactory.MessageRejected` exception
ses_client.verify_email_identity(EmailAddress=mail_from)
# When
response = hook.send_email(
mail_from=mail_from,
to=to,
subject='subject',
html_content='<html>Test</html>',
cc=cc,
bcc=bcc,
reply_to='reply_to@domain.com',
return_path='return_path@domain.com',
)
# Then
assert response is not None
assert isinstance(response, dict)
assert 'MessageId' in response

Просмотреть файл

@ -26,7 +26,7 @@ import mock
from airflow import utils
from airflow.configuration import conf
from airflow.utils.email import get_email_address_list
from airflow.utils.email import build_mime_message, get_email_address_list
from tests.test_utils.config import conf_vars
EMAILS = ['test1@example.com', 'test2@example.com']
@ -96,6 +96,28 @@ class TestEmail(unittest.TestCase):
cc=None, bcc=None, mime_charset='utf-8', mime_subtype='mixed')
self.assertFalse(mock_send_email.called)
def test_build_mime_message(self):
mail_from = 'from@example.com'
mail_to = 'to@example.com'
subject = 'test subject'
html_content = '<html>Test</html>'
custom_headers = {'Reply-To': 'reply_to@example.com'}
msg, recipients = build_mime_message(
mail_from=mail_from,
to=mail_to,
subject=subject,
html_content=html_content,
custom_headers=custom_headers,
)
self.assertIn('From', msg)
self.assertIn('To', msg)
self.assertIn('Subject', msg)
self.assertIn('Reply-To', msg)
self.assertListEqual([mail_to], recipients)
self.assertEqual(msg['To'], ','.join(recipients))
@conf_vars({('smtp', 'SMTP_SSL'): 'False'})
class TestEmailSmtp(unittest.TestCase):