[AIRFLOW-6708] Set unique logger names (#7330)

This commit is contained in:
Kamil Breguła 2020-02-02 20:48:03 +01:00 коммит произвёл GitHub
Родитель 1e576f1234
Коммит cf141506a2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
56 изменённых файлов: 223 добавлений и 200 удалений

Просмотреть файл

@ -243,6 +243,12 @@ repos:
entry: "from airflow\\.utils\\.db import.* (provide_session|create_session)"
files: \.py$
pass_filenames: true
- id: incorrect-use-of-LoggingMixin
language: pygrep
name: Make sure LoggingMixin is not used alone
entry: "LoggingMixin\\(\\)"
files: \.py$
pass_filenames: true
- id: build
name: Check if image build is needed
entry: ./scripts/ci/pre_commit_ci_build.sh

Просмотреть файл

@ -17,12 +17,13 @@
# specific language governing permissions and limitations
# under the License.
"""Authentication backend"""
import logging
from importlib import import_module
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
class ApiAuth: # pylint: disable=too-few-public-methods
@ -33,8 +34,6 @@ class ApiAuth: # pylint: disable=too-few-public-methods
API_AUTH = ApiAuth()
LOG = LoggingMixin().log
def load_auth():
"""Loads authentication backend"""
@ -47,7 +46,7 @@ def load_auth():
try:
API_AUTH.api_auth = import_module(auth_backend)
except ImportError as err:
LOG.critical(
log.critical(
"Cannot import %s for API authentication due to: %s",
auth_backend, err
)

Просмотреть файл

@ -41,6 +41,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Kerberos authentication module"""
import logging
import os
from functools import wraps
from socket import getfqdn
@ -51,15 +52,13 @@ from flask import Response, _request_ctx_stack as stack, g, make_response, reque
from requests_kerberos import HTTPKerberosAuth
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
# pylint: disable=c-extension-no-member
CLIENT_AUTH = HTTPKerberosAuth(service='airflow')
LOG = LoggingMixin().log
class KerberosService: # pylint: disable=too-few-public-methods
"""Class to keep information about the Kerberos Service initialized """
def __init__(self):
@ -76,7 +75,7 @@ def init_app(app):
hostname = app.config.get('SERVER_NAME')
if not hostname:
hostname = getfqdn()
LOG.info("Kerberos: hostname %s", hostname)
log.info("Kerberos: hostname %s", hostname)
service = 'airflow'
@ -86,12 +85,12 @@ def init_app(app):
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
try:
LOG.info("Kerberos init: %s %s", service, hostname)
log.info("Kerberos init: %s %s", service, hostname)
principal = kerberos.getServerPrincipalDetails(service, hostname)
except kerberos.KrbError as err:
LOG.warning("Kerberos: %s", err)
log.warning("Kerberos: %s", err)
else:
LOG.info("Kerberos API: server is %s", principal)
log.info("Kerberos API: server is %s", principal)
def _unauthorized():

Просмотреть файл

@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
"""Delete DAGs APIs."""
import logging
from sqlalchemy import or_
@ -25,9 +26,10 @@ from airflow.exceptions import DagNotFound
from airflow.models import DagModel, TaskFail
from airflow.models.serialized_dag import SerializedDagModel
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
log = logging.getLogger(__name__)
@provide_session
def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> int:
@ -39,8 +41,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
:param session: session used
:return count of deleted dags
"""
logger = LoggingMixin()
logger.log.info("Deleting DAG: %s", dag_id)
log.info("Deleting DAG: %s", dag_id)
dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
if dag is None:
raise DagNotFound("Dag id {} not found".format(dag_id))

Просмотреть файл

@ -16,6 +16,7 @@
# under the License.
"""Webserver command"""
import logging
import os
import signal
import subprocess
@ -27,13 +28,13 @@ import daemon
import psutil
from daemon.pidfile import TimeoutPIDLockFile
from airflow import AirflowException, LoggingMixin, conf, settings
from airflow import AirflowException, conf, settings
from airflow.exceptions import AirflowWebServerTimeout
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging
from airflow.www.app import cached_app, create_app
LOG = LoggingMixin().log
log = logging.getLogger(__name__)
def get_num_ready_workers_running(gunicorn_master_proc):
@ -95,7 +96,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
def start_refresh(gunicorn_master_proc):
batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
LOG.debug('%s doing a refresh of %s workers', state, batch_size)
log.debug('%s doing a refresh of %s workers', state, batch_size)
sys.stdout.flush()
sys.stderr.flush()
@ -120,14 +121,14 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
# Whenever some workers are not ready, wait until all workers are ready
if num_ready_workers_running < num_workers_running:
LOG.debug('%s some workers are starting up, waiting...', state)
log.debug('%s some workers are starting up, waiting...', state)
sys.stdout.flush()
time.sleep(1)
# Kill a worker gracefully by asking gunicorn to reduce number of workers
elif num_workers_running > num_workers_expected:
excess = num_workers_running - num_workers_expected
LOG.debug('%s killing %s workers', state, excess)
log.debug('%s killing %s workers', state, excess)
for _ in range(excess):
gunicorn_master_proc.send_signal(signal.SIGTTOU)
@ -139,7 +140,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
# Start a new worker by asking gunicorn to increase number of workers
elif num_workers_running == num_workers_expected:
refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
LOG.debug(
log.debug(
'%s sleeping for %ss starting doing a refresh...',
state, refresh_interval
)
@ -148,7 +149,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
else:
# num_ready_workers_running == num_workers_running < num_workers_expected
LOG.error((
log.error((
"%s some workers seem to have died and gunicorn"
"did not restart them as expected"
), state)
@ -158,8 +159,8 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout):
) < num_workers_expected:
start_refresh(gunicorn_master_proc)
except (AirflowWebServerTimeout, OSError) as err:
LOG.error(err)
LOG.error("Shutting down webserver")
log.error(err)
log.error("Shutting down webserver")
try:
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
@ -285,7 +286,7 @@ def webserver(args):
gunicorn_master_proc_pid = int(file.read())
break
except OSError:
LOG.debug("Waiting for gunicorn's pid file to be created.")
log.debug("Waiting for gunicorn's pid file to be created.")
time.sleep(0.1)
gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)

Просмотреть файл

@ -17,18 +17,18 @@
# specific language governing permissions and limitations
# under the License.
"""Default celery configuration."""
import logging
import ssl
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
def _broker_supports_visibility_timeout(url):
return url.startswith("redis://") or url.startswith("sqs://")
log = LoggingMixin().log
log = logging.getLogger(__name__)
broker_url = conf.get('celery', 'BROKER_URL')

Просмотреть файл

@ -17,6 +17,7 @@
# under the License.
import copy
import logging
import os
import pathlib
import shlex
@ -34,9 +35,8 @@ from cryptography.fernet import Fernet
from zope.deprecation import deprecated
from airflow.exceptions import AirflowConfigException
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
log = logging.getLogger(__name__)
# show Airflow's deprecation warnings
warnings.filterwarnings(

Просмотреть файл

@ -16,6 +16,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import flask_login
from flask import redirect, request, url_for
# Need to expose these downstream
@ -25,10 +27,9 @@ from flask_oauthlib.client import OAuth
from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
log = LoggingMixin().log
log = logging.getLogger(__name__)
def get_config_param(param):

Просмотреть файл

@ -16,6 +16,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import flask_login
from flask import redirect, request, url_for
# Need to expose these downstream
@ -25,10 +27,9 @@ from flask_oauthlib.client import OAuth
from airflow import models
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
log = LoggingMixin().log
log = logging.getLogger(__name__)
def get_config_param(param):

Просмотреть файл

@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import re
import ssl
import traceback
@ -30,14 +30,13 @@ from wtforms.validators import InputRequired
from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
login_manager = flask_login.LoginManager()
login_manager.login_view = 'airflow.login' # Calls login() below
login_manager.login_message = None
log = LoggingMixin().log
log = logging.getLogger(__name__)
class AuthenticationError(Exception):

Просмотреть файл

@ -18,6 +18,7 @@
# under the License.
"""Password authentication backend"""
import base64
import logging
from functools import wraps
import flask_login
@ -32,14 +33,13 @@ from wtforms import Form, PasswordField, StringField
from wtforms.validators import InputRequired
from airflow import models
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session
LOGIN_MANAGER = flask_login.LoginManager()
LOGIN_MANAGER.login_view = 'airflow.login' # Calls login() below
LOGIN_MANAGER.login_message = None
LOG = LoggingMixin().log
log = logging.getLogger(__name__)
CLIENT_AUTH = None
@ -108,7 +108,7 @@ class PasswordUser(models.User):
@provide_session
def load_user(userid, session=None):
"""Loads user from the database"""
LOG.debug("Loading user %s", userid)
log.debug("Loading user %s", userid)
if not userid or userid == 'None':
return None
@ -140,7 +140,7 @@ def authenticate(session, username, password):
if not user.authenticate(password):
raise AuthenticationError()
LOG.info("User %s successfully authenticated", username)
log.info("User %s successfully authenticated", username)
return user

Просмотреть файл

@ -21,6 +21,7 @@ Airflow module for emailer using sendgrid
"""
import base64
import logging
import mimetypes
import os
@ -30,7 +31,8 @@ from sendgrid.helpers.mail import (
)
from airflow.utils.email import get_email_address_list
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
def send_email(to, subject, html_content, files=None, cc=None,
@ -114,7 +116,6 @@ def send_email(to, subject, html_content, files=None, cc=None,
def _post_sendgrid_mail(mail_data):
log = LoggingMixin().log
sendgrid_client = sendgrid.SendGridAPIClient(api_key=os.environ.get('SENDGRID_API_KEY'))
response = sendgrid_client.client.mail.send.post(request_body=mail_data)
# 2xx status code.

Просмотреть файл

@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""Celery executor."""
import logging
import math
import os
import subprocess
@ -32,10 +33,11 @@ from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor, CommandType
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType, TaskInstanceStateType
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
from airflow.utils.timeout import timeout
log = logging.getLogger(__name__)
# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task state'
@ -63,7 +65,6 @@ app = Celery(
@app.task
def execute_command(command_to_exec: CommandType) -> None:
"""Executes command."""
log = LoggingMixin().log
log.info("Executing command in Celery: %s", command_to_exec)
env = os.environ.copy()
try:
@ -224,7 +225,7 @@ class CeleryExecutor(BaseExecutor):
for key, command, result in key_and_async_results:
if isinstance(result, ExceptionWithTraceback):
self.log.error(
self.log.error( # pylint: disable=logging-not-lazy
CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
)
elif result is not None:
@ -269,7 +270,7 @@ class CeleryExecutor(BaseExecutor):
"""Updates states of the tasks."""
for key_and_state in task_keys_to_states:
if isinstance(key_and_state, ExceptionWithTraceback):
self.log.error(
self.log.error( # pylint: disable=logging-not-lazy
CELERY_FETCH_ERR_MSG_HEADER + ", ignoring it:%s\n%s\n",
repr(key_and_state.exception), key_and_state.traceback
)

Просмотреть файл

@ -16,10 +16,13 @@
# under the License.
"""All executors."""
import importlib
import logging
from typing import Optional
from airflow.executors.base_executor import BaseExecutor
log = logging.getLogger(__name__)
class ExecutorLoader:
"""
@ -54,8 +57,6 @@ class ExecutorLoader:
cls._default_executor = ExecutorLoader._get_executor(executor_name)
from airflow import LoggingMixin
log = LoggingMixin().log
log.info("Using executor %s", executor_name)
return cls._default_executor

Просмотреть файл

@ -819,8 +819,10 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
try:
self._change_state(key, state, pod_id)
except Exception as e: # pylint: disable=broad-except
self.log.exception('Exception: %s when attempting ' +
'to change state of %s to %s, re-queueing.', e, results, state)
self.log.exception(
"Exception: %s when attempting to change state of %s to %s, re-queueing.",
e, results, state
)
self.result_queue.put(results)
finally:
self.result_queue.task_done()
@ -837,7 +839,7 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
self.kube_scheduler.run_next(task)
except ApiException as e:
self.log.warning('ApiException when attempting to run task, re-queueing. '
'Message: %s' % json.loads(e.body)['message'])
'Message: %s', json.loads(e.body)['message'])
self.task_queue.put(task)
finally:
self.task_queue.task_done()

Просмотреть файл

@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
"""Base class for all hooks"""
import logging
import os
import random
from typing import Iterable
@ -28,6 +29,8 @@ from airflow.utils.session import provide_session
CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
log = logging.getLogger(__name__)
class BaseHook(LoggingMixin):
"""
@ -85,7 +88,6 @@ class BaseHook(LoggingMixin):
"""
conn = random.choice(list(cls.get_connections(conn_id)))
if conn.host:
log = LoggingMixin().log
log.info("Using connection to: %s", conn.log_info())
return conn

Просмотреть файл

@ -20,6 +20,7 @@
Provides lineage support functions
"""
import json
import logging
from functools import wraps
from typing import Any, Dict, Optional
@ -28,7 +29,6 @@ import jinja2
from cattr import structure, unstructure
from airflow.models.base import Operator
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string
ENV = jinja2.Environment()
@ -37,7 +37,7 @@ PIPELINE_OUTLETS = "pipeline_outlets"
PIPELINE_INLETS = "pipeline_inlets"
AUTO = "auto"
log = LoggingMixin().log
log = logging.getLogger(__name__)
@attr.s(auto_attribs=True)

Просмотреть файл

@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Optional
from cryptography.fernet import Fernet, MultiFernet
@ -24,7 +24,8 @@ from cryptography.fernet import Fernet, MultiFernet
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.typing_compat import Protocol
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
class FernetProtocol(Protocol):
@ -67,7 +68,6 @@ def get_fernet():
:raises: airflow.exceptions.AirflowException if there's a problem trying to load Fernet
"""
global _fernet
log = LoggingMixin().log
if _fernet:
return _fernet

Просмотреть файл

@ -19,6 +19,7 @@
import copy
import functools
import logging
import os
import pickle
import re
@ -57,6 +58,8 @@ from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import Interval, UtcDateTime
from airflow.utils.state import State
log = logging.getLogger(__name__)
ScheduleInterval = Union[str, timedelta, relativedelta]
@ -1558,7 +1561,6 @@ class DAG(BaseDag, LoggingMixin):
:type expiration_date: datetime
:return: None
"""
log = LoggingMixin().log
for dag in session.query(
DagModel).filter(DagModel.last_scheduler_run < expiration_date,
DagModel.is_active).all():
@ -1839,7 +1841,6 @@ class DagModel(Base):
:param alive_dag_filelocs: file paths of alive DAGs
:param session: ORM Session
"""
log = LoggingMixin().log
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ",
cls.__tablename__)

Просмотреть файл

@ -20,6 +20,7 @@
"""Serialzed DAG table in database."""
import hashlib
import logging
from datetime import timedelta
from typing import Any, Dict, List, Optional
@ -32,11 +33,10 @@ from airflow.models.base import ID_LEN, Base
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
log = LoggingMixin().log
log = logging.getLogger(__name__)
class SerializedDagModel(Base):

Просмотреть файл

@ -45,16 +45,15 @@ class Variable(Base, LoggingMixin):
return '{} : {}'.format(self.key, self._val)
def get_val(self):
log = LoggingMixin().log
if self._val and self.is_encrypted:
try:
fernet = get_fernet()
return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
except InvalidFernetToken:
log.error("Can't decrypt _val for key=%s, invalid token or value", self.key)
self.log.error("Can't decrypt _val for key=%s, invalid token or value", self.key)
return None
except Exception:
log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing", self.key)
self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing", self.key)
return None
else:
return self._val

Просмотреть файл

@ -18,6 +18,7 @@
# under the License.
import json
import logging
import pickle
from typing import Any, Iterable, Optional, Union
@ -33,6 +34,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
log = logging.getLogger(__name__)
# MAX XCOM Size is 48KB
# https://github.com/apache/airflow/pull/1618#discussion_r68249677
MAX_XCOM_SIZE = 49344
@ -205,7 +208,6 @@ class XCom(Base, LoggingMixin):
try:
return json.dumps(value).encode('UTF-8')
except ValueError:
log = LoggingMixin().log
log.error("Could not serialize the XCOM value into JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "
@ -223,7 +225,6 @@ class XCom(Base, LoggingMixin):
try:
return json.loads(result.value.decode('UTF-8'))
except ValueError:
log = LoggingMixin().log
log.error("Could not deserialize the XCOM value from JSON. "
"If you are using pickles instead of JSON "
"for XCOM, then you need to enable pickle "

Просмотреть файл

@ -19,6 +19,7 @@
# noinspection PyDeprecation
import importlib
import inspect
import logging
import os
import re
import sys
@ -28,9 +29,8 @@ from typing import Any, Callable, Dict, List, Optional, Type
import pkg_resources
from airflow import settings
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
log = logging.getLogger(__name__)
import_errors = {}

Просмотреть файл

@ -92,7 +92,7 @@ class AWSAthenaHook(AwsHook):
try:
state = response['QueryExecution']['Status']['State']
except Exception as ex: # pylint: disable=broad-except
self.log.error('Exception while getting query state', ex)
self.log.error('Exception while getting query state %s', ex)
finally:
# The error is being absorbed here and is being handled by the caller.
# The error is being absorbed to implement retries.
@ -111,7 +111,7 @@ class AWSAthenaHook(AwsHook):
try:
reason = response['QueryExecution']['Status']['StateChangeReason']
except Exception as ex: # pylint: disable=broad-except
self.log.error('Exception while getting query state change reason', ex)
self.log.error('Exception while getting query state change reason: %s', ex)
finally:
# The error is being absorbed here and is being handled by the caller.
# The error is being absorbed to implement retries.
@ -131,7 +131,7 @@ class AWSAthenaHook(AwsHook):
self.log.error('Invalid Query state')
return None
elif query_state in self.INTERMEDIATE_STATES or query_state in self.FAILURE_STATES:
self.log.error('Query is in {state} state. Cannot fetch results'.format(state=query_state))
self.log.error('Query is in "%s" state. Cannot fetch results', query_state)
return None
return self.get_conn().get_query_results(QueryExecutionId=query_execution_id)
@ -151,14 +151,15 @@ class AWSAthenaHook(AwsHook):
while True:
query_state = self.check_query_status(query_execution_id)
if query_state is None:
self.log.info('Trial {try_number}: Invalid query state. Retrying again'.format(
try_number=try_number))
self.log.info('Trial %s: Invalid query state. Retrying again', try_number)
elif query_state in self.INTERMEDIATE_STATES:
self.log.info('Trial {try_number}: Query is still in an intermediate state - {state}'
.format(try_number=try_number, state=query_state))
self.log.info(
'Trial %s: Query is still in an intermediate state - %s', try_number, query_state
)
else:
self.log.info('Trial {try_number}: Query execution completed. Final state is {state}'
.format(try_number=try_number, state=query_state))
self.log.info(
'Trial %s: Query execution completed. Final state is %s}', try_number, query_state
)
final_query_state = query_state
break
if max_tries and try_number >= max_tries: # Break loop if max_tries reached

Просмотреть файл

@ -279,7 +279,7 @@ class AwsBatchClient(LoggingMixin):
job_status = job.get("status")
if job_status == "SUCCEEDED":
self.log.info("AWS batch job ({}) succeeded: {}".format(job_id, job))
self.log.info("AWS batch job (%s) succeeded: %s", job_id, job)
return True
if job_status == "FAILED":

Просмотреть файл

@ -17,19 +17,21 @@
# specific language governing permissions and limitations
# under the License.
"""Hook for Web HDFS"""
import logging
from hdfs import HdfsError, InsecureClient
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
_kerberos_security_mode = conf.get("core", "security") == "kerberos"
if _kerberos_security_mode:
try:
from hdfs.ext.kerberos import KerberosClient # pylint: disable=ungrouped-imports
except ImportError:
log = LoggingMixin().log
log.error("Could not load the Kerberos extension for the WebHDFSHook.")
raise

Просмотреть файл

@ -16,6 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import re
import sys
@ -23,7 +24,8 @@ from airflow import settings
from airflow.providers.apache.hdfs.hooks.hdfs import HDFSHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
class HdfsSensor(BaseSensorOperator):
@ -63,7 +65,6 @@ class HdfsSensor(BaseSensorOperator):
:return: (bool) depending on the matching criteria
"""
if size:
log = LoggingMixin().log
log.debug(
'Filtering for file size >= %s in files: %s',
size, map(lambda x: x['path'], result)
@ -88,7 +89,6 @@ class HdfsSensor(BaseSensorOperator):
:rtype: list[dict]
"""
if ignore_copying:
log = LoggingMixin().log
regex_builder = r"^.*\.(%s$)$" % '$|'.join(ignored_ext)
ignored_extensions_regex = re.compile(regex_builder)
log.debug(

Просмотреть файл

@ -19,11 +19,12 @@
"""
This is an example dag for using the KubernetesPodOperator.
"""
import logging
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
log = logging.getLogger(__name__)
try:
# Kubernetes is optional, so not available in vanilla Airflow
@ -65,6 +66,6 @@ try:
)
except ImportError as e:
log.warning("Could not import KubernetesPodOperator: " + str(e))
log.warning("Install kubernetes dependencies with: "
log.warning("Could not import KubernetesPodOperator: %s, ", str(e))
log.warning("Install kubernetes dependencies with: \n"
" pip install 'apache-airflow[kubernetes]'")

Просмотреть файл

@ -42,11 +42,12 @@ from google.auth.environment_vars import CREDENTIALS
from googleapiclient.errors import HttpError
from googleapiclient.http import set_user_agent
from airflow import LoggingMixin, version
from airflow import version
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
logger = LoggingMixin().log
log = logging.getLogger(__name__)
_DEFAULT_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',) # type: Sequence[str]
@ -150,21 +151,23 @@ class CloudBaseHook(BaseHook):
"Please provide only one value."
)
if not key_path and not keyfile_dict:
self.log.info('Getting connection using `google.auth.default()` '
'since no key file is defined for hook.')
self.log.info(
'Getting connection using `google.auth.default()` since no key file is defined for hook.'
)
credentials, project_id = google.auth.default(scopes=self.scopes)
elif key_path:
# Get credentials from a JSON file.
if key_path.endswith('.json'):
self.log.debug('Getting connection using JSON key file %s' % key_path)
self.log.debug('Getting connection using JSON key file %s', key_path)
credentials = (
google.oauth2.service_account.Credentials.from_service_account_file(
key_path, scopes=self.scopes)
)
project_id = credentials.project_id
elif key_path.endswith('.p12'):
raise AirflowException('Legacy P12 key file are not supported, '
'use a JSON key file.')
raise AirflowException(
'Legacy P12 key file are not supported, use a JSON key file.'
)
else:
raise AirflowException('Unrecognised extension for key file.')
else:
@ -291,8 +294,8 @@ class CloudBaseHook(BaseHook):
default_kwargs = {
'wait': tenacity.wait_exponential(multiplier=1, max=100),
'retry': retry_if_temporary_quota(),
'before': tenacity.before_log(logger, logging.DEBUG),
'after': tenacity.after_log(logger, logging.DEBUG),
'before': tenacity.before_log(log, logging.DEBUG),
'after': tenacity.after_log(log, logging.DEBUG),
}
default_kwargs.update(**kwargs)
return tenacity.retry(

Просмотреть файл

@ -21,7 +21,7 @@
This module contains a BigQuery Hook, as well as a very basic PEP 249
implementation for BigQuery.
"""
import logging
import time
import warnings
from copy import deepcopy
@ -41,6 +41,8 @@ from airflow.hooks.dbapi_hook import DbApiHook
from airflow.providers.google.cloud.hooks.base import CloudBaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
# pylint: disable=too-many-public-methods
class BigQueryHook(CloudBaseHook, DbApiHook):
@ -2638,7 +2640,6 @@ def _split_tablename(table_input: str, default_project_id: str,
if project_id is None:
if var_name is not None:
log = LoggingMixin().log
log.info(
'Project not included in %s: %s; using project "%s"',
var_name, table_input, default_project_id

Просмотреть файл

@ -282,8 +282,9 @@ class DatastoreHook(CloudBaseHook):
state = result['metadata']['common']['state'] # type: str
if state == 'PROCESSING':
self.log.info('Operation is processing. Re-polling state in {} seconds'
.format(polling_interval_in_seconds))
self.log.info(
'Operation is processing. Re-polling state in %s seconds', polling_interval_in_seconds
)
time.sleep(polling_interval_in_seconds)
else:
return result

Просмотреть файл

@ -237,7 +237,7 @@ class CloudDLPHook(CloudBaseHook):
while wait_until_finished:
job = self.get_dlp_job(dlp_job_id=job_name, project_id=project_id)
self.log.info("DLP job {} state: {}.".format(job.name, DlpJob.JobState.Name(job.state)))
self.log.info("DLP job %s state: %s.", job.name, DlpJob.JobState.Name(job.state))
if job.state == DlpJob.JobState.DONE:
return job

Просмотреть файл

@ -18,7 +18,7 @@
"""
This module contains a Google ML Engine Hook.
"""
import logging
import random
import time
from typing import Callable, Dict, List, Optional
@ -27,14 +27,14 @@ from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from airflow.providers.google.cloud.hooks.base import CloudBaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.version import version as airflow_version
log = logging.getLogger(__name__)
_AIRFLOW_VERSION = 'v' + airflow_version.replace('.', '-').replace('+', '-')
def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
log = LoggingMixin().log
for i in range(0, max_n):
try:
@ -143,7 +143,7 @@ class MLEngineHook(CloudBaseHook):
job_id
)
else:
self.log.error('Failed to create MLEngine job: {}'.format(e))
self.log.error('Failed to create MLEngine job: %s', e)
raise
return self._wait_for_job_done(project_id, job_id)
@ -172,7 +172,7 @@ class MLEngineHook(CloudBaseHook):
# polling after 30 seconds when quota failure occurs
time.sleep(30)
else:
self.log.error('Failed to get MLEngine job: {}'.format(e))
self.log.error('Failed to get MLEngine job: %s', e)
raise
def _wait_for_job_done(self, project_id: str, job_id: str, interval: int = 30):

Просмотреть файл

@ -81,5 +81,5 @@ class CloudSpeechToTextHook(CloudBaseHook):
"""
client = self.get_conn()
response = client.recognize(config=config, audio=audio, retry=retry, timeout=timeout)
self.log.info("Recognised speech: %s" % response)
self.log.info("Recognised speech: %s", response)
return response

Просмотреть файл

@ -18,6 +18,7 @@
"""
This module contains GCP MLEngine operators.
"""
import logging
import re
import warnings
from typing import List, Optional
@ -26,9 +27,8 @@ from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.mlengine import MLEngineHook
from airflow.utils.decorators import apply_defaults
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
log = logging.getLogger(__name__)
def _normalize_mlengine_job_id(job_id: str) -> str:

Просмотреть файл

@ -112,12 +112,13 @@ class GrpcHook(BaseHook):
except grpc.RpcError as ex:
# noinspection PyUnresolvedReferences
self.log.exception(
"Error occurred when calling the grpc service: {0}, method: {1} \
status code: {2}, error details: {3}"
.format(stub.__class__.__name__,
call_func,
ex.code(), # pylint: disable=no-member
ex.details())) # pylint: disable=no-member
"Error occurred when calling the grpc service: %s, method: %s \
status code: %s, error details: %s",
stub.__class__.__name__,
call_func,
ex.code(), # pylint: disable=no-member
ex.details() # pylint: disable=no-member
)
raise ex
def _get_field(self, field_name, default=None):

Просмотреть файл

@ -293,7 +293,7 @@ class Mail(LoggingMixin):
else attachment.has_equal_name(name)
if found_attachment:
file_name, file_payload = attachment.get_file()
self.log.info('Found attachment: {}'.format(file_name))
self.log.info('Found attachment: %s', file_name)
attachments.append((file_name, file_payload))
if find_first:
break

Просмотреть файл

@ -19,6 +19,7 @@
#
"""Qubole hook"""
import datetime
import logging
import os
import pathlib
import time
@ -32,9 +33,10 @@ from qds_sdk.qubole import Qubole
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
log = logging.getLogger(__name__)
COMMAND_CLASSES = {
"hivecmd": HiveCommand,
"prestocmd": PrestoCommand,
@ -118,7 +120,6 @@ class QuboleHook(BaseHook):
if cmd_id is not None:
cmd = Command.find(cmd_id)
if cmd is not None:
log = LoggingMixin().log
if cmd.status == 'done':
log.info('Command ID: %s has been succeeded, hence marking this '
'TI as Success.', cmd_id)

Просмотреть файл

@ -17,14 +17,15 @@
# specific language governing permissions and limitations
# under the License.
#
import logging
from io import StringIO
from qds_sdk.commands import Command
from airflow.exceptions import AirflowException
from airflow.providers.qubole.hooks.qubole import QuboleHook
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
COL_DELIM = '\t'
ROW_DELIM = '\r\n'
@ -90,7 +91,6 @@ class QuboleCheckHook(QuboleHook):
cmd = Command.find(cmd_id)
if cmd is not None:
if cmd.status == 'running':
log = LoggingMixin().log
log.info('Cancelling the Qubole Command Id: %s', cmd_id)
cmd.cancel()
@ -102,14 +102,13 @@ class QuboleCheckHook(QuboleHook):
return record_list
def get_query_results(self):
log = LoggingMixin().log
if self.cmd is not None:
cmd_id = self.cmd.id
log.info("command id: " + str(cmd_id))
self.log.info("command id: " + str(cmd_id))
query_result_buffer = StringIO()
self.cmd.get_results(fp=query_result_buffer, inline=True, delim=COL_DELIM)
query_result = query_result_buffer.getvalue()
query_result_buffer.close()
return query_result
else:
log.info("Qubole command not found")
self.log.info("Qubole command not found")

Просмотреть файл

@ -24,13 +24,15 @@ retrieve data from it, and write that data to a file for other uses.
.. note:: this hook also relies on the simple_salesforce package:
https://github.com/simple-salesforce/simple-salesforce
"""
import logging
import time
import pandas as pd
from simple_salesforce import Salesforce
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
class SalesforceHook(BaseHook):
@ -169,7 +171,6 @@ class SalesforceHook(BaseHook):
try:
column = pd.to_datetime(column)
except ValueError:
log = LoggingMixin().log
log.warning("Could not convert field to timestamps: %s", column.name)
return column

Просмотреть файл

@ -32,19 +32,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Kerberos security provider"""
import logging
import socket
import subprocess
import sys
import time
from typing import Optional
from airflow import LoggingMixin
from airflow.configuration import conf
NEED_KRB181_WORKAROUND = None # type: Optional[bool]
log = LoggingMixin().log
log = logging.getLogger(__name__)
def renew_from_kt(principal: str, keytab: str):

Просмотреть файл

@ -18,16 +18,14 @@
# under the License.
"""Sentry Integration"""
import logging
from functools import wraps
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.state import State
log = LoggingMixin().log
log = logging.getLogger(__name__)
class DummySentry:

Просмотреть файл

@ -26,7 +26,7 @@ import cattr
import pendulum
from dateutil import relativedelta
from airflow import DAG, AirflowException, LoggingMixin
from airflow import DAG, AirflowException
from airflow.models import Connection
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
@ -35,6 +35,9 @@ from airflow.settings import json
from airflow.utils.module_loading import import_string
from airflow.www.utils import get_python_source
log = logging.getLogger(__name__)
FAILED = 'serialization_failed'
BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
@ -196,10 +199,10 @@ class BaseSerialization:
return cls._encode(
[cls._serialize(v) for v in var], type_=DAT.TUPLE)
else:
LOG.debug('Cast type %s to str in serialization.', type(var))
log.debug('Cast type %s to str in serialization.', type(var))
return str(var)
except Exception: # pylint: disable=broad-except
LOG.warning('Failed to stringify.', exc_info=True)
log.warning('Failed to stringify.', exc_info=True)
return FAILED
# pylint: enable=too-many-return-statements
@ -581,7 +584,3 @@ class SerializedDAG(DAG, BaseSerialization):
if ver != cls.SERIALIZER_VERSION:
raise ValueError("Unsure how to deserialize version {!r}".format(ver))
return cls.deserialize_dag(serialized_obj['dag'])
LOG = LoggingMixin().log
FAILED = 'serialization_failed'

Просмотреть файл

@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
from airflow import models, settings
@ -26,10 +26,9 @@ from airflow.models import Connection
from airflow.models.pool import Pool
# We need to add this model manually to get reset working well
from airflow.models.serialized_dag import SerializedDagModel # noqa: F401 # pylint: disable=unused-import
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session # noqa # pylint: disable=unused-import
log = LoggingMixin().log
log = logging.getLogger(__name__)
@provide_session

Просмотреть файл

@ -19,6 +19,7 @@
import collections
import importlib
import logging
import os
import smtplib
from email.mime.application import MIMEApplication
@ -29,7 +30,8 @@ from typing import Iterable, List, Union
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.log.logging_mixin import LoggingMixin
log = logging.getLogger(__name__)
def send_email(to, subject, html_content,
@ -99,7 +101,6 @@ def send_mime_email(e_from, e_to, mime_msg, dryrun=False):
"""
Send MIME email.
"""
log = LoggingMixin().log
smtp_host = conf.get('smtp', 'SMTP_HOST')
smtp_port = conf.getint('smtp', 'SMTP_PORT')

Просмотреть файл

@ -16,13 +16,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import re
import zipfile
from typing import Dict, List, Optional, Pattern
from airflow import LoggingMixin, conf
from airflow import conf
log = logging.getLogger(__name__)
def TemporaryDirectory(*args, **kwargs): # pylint: disable=invalid-name
@ -149,7 +151,6 @@ def find_dag_file_paths(file_paths, files, patterns, root, safe_mode):
file_paths.append(file_path)
except Exception: # pylint: disable=broad-except
log = LoggingMixin().log
log.exception("Error while examining %s", f)

Просмотреть файл

@ -186,7 +186,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
else:
metadata['max_offset'] = 0
except Exception: # pylint: disable=broad-except
self.log.exception('Could not get current log size with log_id: {}'.format(log_id))
self.log.exception('Could not get current log size with log_id: %s', log_id)
logs = []
if max_log_line != 0:

Просмотреть файл

@ -49,7 +49,7 @@ class LoggingMixin:
# FIXME: LoggingMixin should have a default _log field.
return self._log # type: ignore
except AttributeError:
self._log = logging.root.getChild(
self._log = logging.getLogger(
self.__class__.__module__ + '.' + self.__class__.__name__
)
return self._log

Просмотреть файл

@ -20,13 +20,12 @@
"""
Utilities for running process with writing output to logger
"""
import logging
import shlex
import subprocess
from typing import List
from airflow import LoggingMixin
log = LoggingMixin().log
log = logging.getLogger(__name__)
def execute_in_subprocess(cmd: List[str]):

Просмотреть файл

@ -19,6 +19,7 @@
import datetime
import json
import logging
import os
import pendulum
@ -26,9 +27,7 @@ from dateutil import relativedelta
from sqlalchemy import event, exc
from sqlalchemy.types import DateTime, Text, TypeDecorator
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
log = logging.getLogger(__name__)
utc = pendulum.timezone('UTC')

Просмотреть файл

@ -16,6 +16,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from flask import Blueprint, g, jsonify, request, url_for
import airflow.api
@ -29,12 +31,11 @@ from airflow.api.common.experimental.get_task import get_task
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.exceptions import AirflowException
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.strings import to_boolean
from airflow.version import version
from airflow.www.app import csrf
_log = LoggingMixin().log
log = logging.getLogger(__name__)
requires_authentication = airflow.api.API_AUTH.api_auth.requires_authentication
@ -71,7 +72,7 @@ def trigger_dag(dag_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'
.format(execution_date))
_log.info(error_message)
log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@ -84,13 +85,13 @@ def trigger_dag(dag_id):
try:
dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
except AirflowException as err:
_log.error(err)
log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
if getattr(g, 'user', None):
_log.info("User %s created %s", g.user, dr)
log.info("User %s created %s", g.user, dr)
response = jsonify(message="Created {}".format(dr), execution_date=dr.execution_date.isoformat())
return response
@ -106,7 +107,7 @@ def delete_dag(dag_id):
try:
count = delete.delete_dag(dag_id)
except AirflowException as err:
_log.error(err)
log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -128,7 +129,7 @@ def dag_runs(dag_id):
state = request.args.get('state')
dagruns = get_dag_runs(dag_id, state)
except AirflowException as err:
_log.info(err)
log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = 400
return response
@ -155,7 +156,7 @@ def get_dag_code(dag_id):
try:
return get_code(dag_id)
except AirflowException as err:
_log.info(err)
log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -168,7 +169,7 @@ def task_info(dag_id, task_id):
try:
info = get_task(dag_id, task_id)
except AirflowException as err:
_log.info(err)
log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -215,7 +216,7 @@ def task_instance_info(dag_id, execution_date, task_id):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'
.format(execution_date))
_log.info(error_message)
log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@ -224,7 +225,7 @@ def task_instance_info(dag_id, execution_date, task_id):
try:
info = get_task_instance(dag_id, task_id, execution_date)
except AirflowException as err:
_log.info(err)
log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -256,7 +257,7 @@ def dag_run_status(dag_id, execution_date):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
_log.info(error_message)
log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@ -265,7 +266,7 @@ def dag_run_status(dag_id, execution_date):
try:
info = get_dag_run_state(dag_id, execution_date)
except AirflowException as err:
_log.info(err)
log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -300,7 +301,7 @@ def get_pool(name):
try:
pool = pool_api.get_pool(name=name)
except AirflowException as err:
_log.error(err)
log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -315,7 +316,7 @@ def get_pools():
try:
pools = pool_api.get_pools()
except AirflowException as err:
_log.error(err)
log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -332,7 +333,7 @@ def create_pool():
try:
pool = pool_api.create_pool(**params)
except AirflowException as err:
_log.error(err)
log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -348,7 +349,7 @@ def delete_pool(name):
try:
pool = pool_api.delete_pool(name=name)
except AirflowException as err:
_log.error(err)
log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response
@ -369,7 +370,7 @@ def get_lineage(dag_id: str, execution_date: str):
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
_log.info(error_message)
log.info(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
@ -378,7 +379,7 @@ def get_lineage(dag_id: str, execution_date: str):
try:
lineage = get_lineage_api(dag_id=dag_id, execution_date=execution_date)
except AirflowException as err:
_log.error(err)
log.error(err)
response = jsonify(error=f"{err}")
response.status_code = err.status_code
return response

Просмотреть файл

@ -26,32 +26,32 @@ class LoggingCommandExecutor(LoggingMixin):
def execute_cmd(self, cmd, silent=False, cwd=None):
if silent:
self.log.info("Executing in silent mode: '{}'".format(" ".join(cmd)))
self.log.info("Executing in silent mode: '%s'", " ".join(cmd))
with open(os.devnull, 'w') as dev_null:
return subprocess.call(args=cmd, stdout=dev_null, stderr=subprocess.STDOUT)
else:
self.log.info("Executing: '{}'".format(" ".join(cmd)))
self.log.info("Executing: '%s'", " ".join(cmd))
process = subprocess.Popen(
args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, cwd=cwd
)
output, err = process.communicate()
retcode = process.poll()
self.log.info("Stdout: {}".format(output))
self.log.info("Stderr: {}".format(err))
self.log.info("Stdout: %s", output)
self.log.info("Stderr: %s", err)
if retcode:
self.log.warning("Error when executing %s", " ".join(cmd))
return retcode
def check_output(self, cmd):
self.log.info("Executing for output: '{}'".format(" ".join(cmd)))
self.log.info("Executing for output: '%s'", " ".join(cmd))
process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output, err = process.communicate()
retcode = process.poll()
if retcode:
self.log.info("Error when executing '{}'".format(" ".join(cmd)))
self.log.info("Stdout: {}".format(output))
self.log.info("Stderr: {}".format(err))
self.log.info("Error when executing '%s'", " ".join(cmd))
self.log.info("Stdout: %s", output)
self.log.info("Stderr: %s", err)
raise AirflowException("Retcode {} on {} with stdout: {}, stderr: {}".
format(retcode, " ".join(cmd), output, err))
return output

Просмотреть файл

@ -214,16 +214,17 @@ class CloudSqlQueryTestHelper(LoggingCommandExecutor):
for member in binding['members']:
if member.startswith('serviceAccount:gcp-storage-account'):
self.log.info("Skip removing member {}".format(member))
self.log.info("Skip removing member %s", member)
continue
self.log.info("Remove member: {}".format(member))
self.log.info("Remove member: %s", member)
member_type, member_email = member.split(':')
if member_type != 'serviceAccount':
self.log.warning("Skip removing member {} as the type {} is "
"not service account".format(member,
member_type))
self.log.warning(
"Skip removing member %s as the type %s is not service account",
member, member_type
)
self.execute_cmd(['gsutil', 'acl', 'ch', '-d', member_email,
"gs://{}".format(export_bucket_name)])

Просмотреть файл

@ -100,7 +100,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
self.log.info('Airflow DB Session error:' + str(ex))
self.log.info('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
@ -126,7 +126,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
conn.extra = json.dumps(extras)
session.commit()
except BaseException as ex:
self.log.info('Airflow DB Session error:' + str(ex))
self.log.info('Airflow DB Session error: %s', str(ex))
session.rollback()
raise
finally:
@ -147,14 +147,14 @@ class GcpAuthenticator(LoggingCommandExecutor):
else:
gcp_config_dir = os.path.join(AIRFLOW_MAIN_FOLDER, os.pardir, "config")
if not os.path.isdir(gcp_config_dir):
self.log.info("The {} is not a directory".format(gcp_config_dir))
self.log.info("The %s is not a directory", gcp_config_dir)
key_dir = os.path.join(gcp_config_dir, "keys")
if not os.path.isdir(key_dir):
self.log.info("The {} is not a directory".format(key_dir))
self.log.info("The %s is not a directory", key_dir)
return
key_path = os.path.join(key_dir, self.gcp_key)
if not os.path.isfile(key_path):
self.log.info("The {} is missing".format(key_path))
self.log.info("The %s file is missing", key_path)
self.full_key_path = key_path
def _validate_key_set(self):
@ -172,7 +172,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
Authenticate with service account specified via key name.
"""
self._validate_key_set()
self.log.info("Setting the GCP key to {}".format(self.full_key_path))
self.log.info("Setting the GCP key to %s", self.full_key_path)
# Checking if we can authenticate using service account credentials provided
self.execute_cmd(
[
@ -206,7 +206,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
GcpAuthenticator.original_account = self.check_output(
['gcloud', 'config', 'get-value', 'account', '--project={}'.format(self.project_id)]
).decode('utf-8')
self.log.info("Storing account: to restore it later {}".format(GcpAuthenticator.original_account))
self.log.info("Storing account: to restore it later %s", GcpAuthenticator.original_account)
def gcp_restore_authentication(self):
"""
@ -214,7 +214,7 @@ class GcpAuthenticator(LoggingCommandExecutor):
"""
self._validate_key_set()
if GcpAuthenticator.original_account:
self.log.info("Restoring original account stored: {}".format(GcpAuthenticator.original_account))
self.log.info("Restoring original account stored: %s", GcpAuthenticator.original_account)
subprocess.call(
[
'gcloud',

Просмотреть файл

@ -21,8 +21,8 @@ import os
import unittest
from argparse import Namespace
from airflow import LoggingMixin
from airflow.configuration import conf
from airflow.security import kerberos
from airflow.security.kerberos import renew_from_kt
from tests.test_utils.config import conf_vars
@ -58,7 +58,7 @@ class TestKerberos(unittest.TestCase):
renew_from_kt(principal=self.args.principal, # pylint: disable=no-member
keytab=self.args.keytab)
with self.assertLogs(LoggingMixin().log) as log:
with self.assertLogs(kerberos.log) as log:
self.assertIn(
'kinit: krb5_init_creds_set_keytab: Failed to find '
'airflow@LUPUS.GRIDDYNAMICS.NET in keytab FILE:{} '

Просмотреть файл

@ -26,7 +26,7 @@ from airflow.utils import process_utils
class TestExecuteInSubProcess(unittest.TestCase):
def test_should_print_all_messages1(self):
with self.assertLogs("airflow.utils.log.logging_mixin.LoggingMixin") as logs:
with self.assertLogs(process_utils.log) as logs:
process_utils.execute_in_subprocess(["bash", "-c", "echo CAT; echo KITTY;"])
msgs = [record.getMessage() for record in logs.records]