[AIRFLOW-3012] Fix Bug when passing emails for SLA

This commit is contained in:
Kaxil Naik 2018-09-09 15:41:34 +01:00 коммит произвёл Sid Anand
Родитель 8f601124a8
Коммит 1411245b17
3 изменённых файлов: 51 добавлений и 4 удалений

Просмотреть файл

@ -58,7 +58,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
SimpleDagBag,
list_py_file_paths)
from airflow.utils.db import create_session, provide_session
from airflow.utils.email import send_email
from airflow.utils.email import send_email, get_email_address_list
from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter
from airflow.utils.net import get_hostname
from airflow.utils.state import State
@ -711,7 +711,7 @@ class SchedulerJob(BaseJob):
for task in dag.tasks:
if task.email:
if isinstance(task.email, basestring):
emails.add(task.email)
emails |= set(get_email_address_list(task.email))
elif isinstance(task.email, (list, tuple)):
emails |= set(task.email)
if emails and len(slas):

Просмотреть файл

@ -47,6 +47,9 @@ def send_email(to, subject, html_content,
path, attr = configuration.conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
module = importlib.import_module(path)
backend = getattr(module, attr)
to = get_email_address_list(to)
to = ", ".join(to)
return backend(to, subject, html_content, files=files,
dryrun=dryrun, cc=cc, bcc=bcc,
mime_subtype=mime_subtype, mime_charset=mime_charset, **kwargs)
@ -128,9 +131,9 @@ def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
def get_email_address_list(address_string):
if isinstance(address_string, basestring):
if ',' in address_string:
address_string = address_string.split(',')
address_string = [address.strip() for address in address_string.split(',')]
elif ';' in address_string:
address_string = address_string.split(';')
address_string = [address.strip() for address in address_string.split(';')]
else:
address_string = [address_string]

44
tests/utils/test_email.py Normal file
Просмотреть файл

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from airflow.utils.email import get_email_address_list
EMAILS = ['test1@example.com', 'test2@example.com']
class EmailTest(unittest.TestCase):
def test_get_email_address_comma_sep_string(self):
emails_string = 'test1@example.com, test2@example.com'
self.assertEquals(
get_email_address_list(emails_string), EMAILS)
def test_get_email_address_colon_sep_string(self):
emails_string = 'test1@example.com; test2@example.com'
self.assertEquals(
get_email_address_list(emails_string), EMAILS)
def test_get_email_address_list(self):
emails_list = ['test1@example.com', 'test2@example.com']
self.assertEquals(
get_email_address_list(emails_list), EMAILS)