[AIRFLOW-4858] Deprecate "Historical convenience functions" in airflow.configuration (#5495)

1. Issue old conf method deprecation warnings properly and remove current old conf method usages.
2. Unify the way to use conf as `from airflow.configuration import conf`
This commit is contained in:
Hao Liang 2019-09-04 00:08:55 +08:00 коммит произвёл Ash Berlin-Taylor
Родитель c5c6448ff9
Коммит f497d1d5aa
76 изменённых файлов: 330 добавлений и 321 удалений

Просмотреть файл

@ -20,8 +20,8 @@
from importlib import import_module
from airflow.exceptions import AirflowException
from airflow import configuration as conf
from airflow.exceptions import AirflowException, AirflowConfigException
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
@ -42,7 +42,7 @@ def load_auth():
auth_backend = 'airflow.api.auth.backend.default'
try:
auth_backend = conf.get("api", "auth_backend")
except conf.AirflowConfigException:
except AirflowConfigException:
pass
try:

Просмотреть файл

@ -40,7 +40,7 @@ import kerberos
from requests_kerberos import HTTPKerberosAuth
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin

Просмотреть файл

@ -21,14 +21,14 @@
import os
import argcomplete
from airflow import configuration
from airflow.configuration import conf
from airflow.bin.cli import CLIFactory
if __name__ == '__main__':
if configuration.conf.get("core", "security") == 'kerberos':
os.environ['KRB5CCNAME'] = configuration.conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = configuration.conf.get('kerberos', 'keytab')
if conf.get("core", "security") == 'kerberos':
os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache')
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
parser = CLIFactory.get_parser()
argcomplete.autocomplete(parser)

Просмотреть файл

@ -53,7 +53,7 @@ from typing import Any
import airflow
from airflow import api
from airflow import jobs, settings
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowWebServerTimeout
from airflow.executors import get_default_executor
from airflow.models import (
@ -507,7 +507,7 @@ def run(args, dag=None):
if os.path.exists(args.cfg_path):
os.remove(args.cfg_path)
conf.conf.read_dict(conf_dict, source=args.cfg_path)
conf.read_dict(conf_dict, source=args.cfg_path)
settings.configure_vars()
# IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave

Просмотреть файл

@ -20,7 +20,7 @@
import os
from typing import Dict, Any
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.file import mkdirs
# TODO: Logging format and level should be configured

Просмотреть файл

@ -19,7 +19,7 @@
import ssl
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
@ -30,9 +30,9 @@ def _broker_supports_visibility_timeout(url):
log = LoggingMixin().log
broker_url = configuration.conf.get('celery', 'BROKER_URL')
broker_url = conf.get('celery', 'BROKER_URL')
broker_transport_options = configuration.conf.getsection(
broker_transport_options = conf.getsection(
'celery_broker_transport_options'
)
if 'visibility_timeout' not in broker_transport_options:
@ -44,31 +44,31 @@ DEFAULT_CELERY_CONFIG = {
'event_serializer': 'json',
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),
'broker_url': broker_url,
'broker_transport_options': broker_transport_options,
'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': configuration.conf.getint('celery', 'WORKER_CONCURRENCY'),
'result_backend': conf.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': conf.getint('celery', 'WORKER_CONCURRENCY'),
}
celery_ssl_active = False
try:
celery_ssl_active = configuration.conf.getboolean('celery', 'SSL_ACTIVE')
celery_ssl_active = conf.getboolean('celery', 'SSL_ACTIVE')
except AirflowConfigException:
log.warning("Celery Executor will run without SSL")
try:
if celery_ssl_active:
if 'amqp://' in broker_url:
broker_use_ssl = {'keyfile': configuration.conf.get('celery', 'SSL_KEY'),
'certfile': configuration.conf.get('celery', 'SSL_CERT'),
'ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
broker_use_ssl = {'keyfile': conf.get('celery', 'SSL_KEY'),
'certfile': conf.get('celery', 'SSL_CERT'),
'ca_certs': conf.get('celery', 'SSL_CACERT'),
'cert_reqs': ssl.CERT_REQUIRED}
elif 'redis://' in broker_url:
broker_use_ssl = {'ssl_keyfile': configuration.conf.get('celery', 'SSL_KEY'),
'ssl_certfile': configuration.conf.get('celery', 'SSL_CERT'),
'ssl_ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
broker_use_ssl = {'ssl_keyfile': conf.get('celery', 'SSL_KEY'),
'ssl_certfile': conf.get('celery', 'SSL_CERT'),
'ssl_ca_certs': conf.get('celery', 'SSL_CACERT'),
'ssl_cert_reqs': ssl.CERT_REQUIRED}
else:
raise AirflowException('The broker you configured does not support SSL_ACTIVE to be True. '

Просмотреть файл

@ -595,7 +595,7 @@ set = conf.set # noqa
for func in [load_test_config, get, getboolean, getfloat, getint, has_option,
remove_option, as_dict, set]:
deprecated(
func,
func.__name__,
"Accessing configuration method '{f.__name__}' directly from "
"the configuration module is deprecated. Please access the "
"configuration from the 'configuration.conf' object via "

Просмотреть файл

@ -26,8 +26,8 @@ from flask import url_for, redirect, request
from flask_oauthlib.client import OAuth
from airflow import models, configuration
from airflow.configuration import AirflowConfigException
from airflow import models
from airflow.configuration import AirflowConfigException, conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
@ -35,7 +35,7 @@ log = LoggingMixin().log
def get_config_param(param):
return str(configuration.conf.get('github_enterprise', param))
return str(conf.get('github_enterprise', param))
class GHEUser(models.User):

Просмотреть файл

@ -26,7 +26,8 @@ from flask import url_for, redirect, request
from flask_oauthlib.client import OAuth
from airflow import models, configuration
from airflow import models
from airflow.configuration import conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
@ -34,7 +35,7 @@ log = LoggingMixin().log
def get_config_param(param):
return str(configuration.conf.get('google', param))
return str(conf.get('google', param))
class GoogleUser(models.User):

Просмотреть файл

@ -33,7 +33,7 @@ from airflow.security import utils
from flask import url_for, redirect
from airflow import models
from airflow import configuration
from airflow.configuration import conf
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
@ -55,13 +55,13 @@ class KerberosUser(models.User, LoggingMixin):
@staticmethod
def authenticate(username, password):
service_principal = "%s/%s" % (
configuration.conf.get('kerberos', 'principal'),
conf.get('kerberos', 'principal'),
utils.get_fqdn()
)
realm = configuration.conf.get("kerberos", "default_realm")
realm = conf.get("kerberos", "default_realm")
try:
user_realm = configuration.conf.get("security", "default_realm")
user_realm = conf.get("security", "default_realm")
except AirflowConfigException:
user_realm = realm

Просмотреть файл

@ -29,7 +29,7 @@ import ssl
from flask import url_for, redirect
from airflow import models
from airflow import configuration
from airflow.configuration import conf
from airflow.configuration import AirflowConfigException
from airflow.utils.db import provide_session
@ -55,12 +55,12 @@ class LdapException(Exception):
def get_ldap_connection(dn=None, password=None):
try:
cacert = configuration.conf.get("ldap", "cacert")
cacert = conf.get("ldap", "cacert")
except AirflowConfigException:
pass
try:
ignore_malformed_schema = configuration.conf.get("ldap", "ignore_malformed_schema")
ignore_malformed_schema = conf.get("ldap", "ignore_malformed_schema")
except AirflowConfigException:
pass
@ -70,7 +70,7 @@ def get_ldap_connection(dn=None, password=None):
tls_configuration = Tls(validate=ssl.CERT_REQUIRED,
ca_certs_file=cacert)
server = Server(configuration.conf.get("ldap", "uri"),
server = Server(conf.get("ldap", "uri"),
use_ssl=True,
tls=tls_configuration)
@ -100,7 +100,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam
def groups_user(conn, search_base, user_filter, user_name_att, username):
search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, username)
try:
memberof_attr = configuration.conf.get("ldap", "group_member_attr")
memberof_attr = conf.get("ldap", "group_member_attr")
except Exception:
memberof_attr = "memberOf"
res = conn.search(search_base, search_filter, attributes=[memberof_attr])
@ -135,13 +135,13 @@ class LdapUser(models.User):
self.ldap_groups = []
# Load and cache superuser and data_profiler settings.
conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
configuration.conf.get("ldap", "bind_password"))
conn = get_ldap_connection(conf.get("ldap", "bind_user"),
conf.get("ldap", "bind_password"))
superuser_filter = None
data_profiler_filter = None
try:
superuser_filter = configuration.conf.get("ldap", "superuser_filter")
superuser_filter = conf.get("ldap", "superuser_filter")
except AirflowConfigException:
pass
@ -150,14 +150,14 @@ class LdapUser(models.User):
log.debug("Missing configuration for superuser settings or empty. Skipping.")
else:
self.superuser = group_contains_user(conn,
configuration.conf.get("ldap", "basedn"),
conf.get("ldap", "basedn"),
superuser_filter,
configuration.conf.get("ldap",
"user_name_attr"),
conf.get("ldap",
"user_name_attr"),
user.username)
try:
data_profiler_filter = configuration.conf.get("ldap", "data_profiler_filter")
data_profiler_filter = conf.get("ldap", "data_profiler_filter")
except AirflowConfigException:
pass
@ -168,10 +168,10 @@ class LdapUser(models.User):
else:
self.data_profiler = group_contains_user(
conn,
configuration.conf.get("ldap", "basedn"),
conf.get("ldap", "basedn"),
data_profiler_filter,
configuration.conf.get("ldap",
"user_name_attr"),
conf.get("ldap",
"user_name_attr"),
user.username
)
@ -179,9 +179,9 @@ class LdapUser(models.User):
try:
self.ldap_groups = groups_user(
conn,
configuration.conf.get("ldap", "basedn"),
configuration.conf.get("ldap", "user_filter"),
configuration.conf.get("ldap", "user_name_attr"),
conf.get("ldap", "basedn"),
conf.get("ldap", "user_filter"),
conf.get("ldap", "user_name_attr"),
user.username
)
except AirflowConfigException:
@ -189,25 +189,25 @@ class LdapUser(models.User):
@staticmethod
def try_login(username, password):
conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
configuration.conf.get("ldap", "bind_password"))
conn = get_ldap_connection(conf.get("ldap", "bind_user"),
conf.get("ldap", "bind_password"))
search_filter = "(&({0})({1}={2}))".format(
configuration.conf.get("ldap", "user_filter"),
configuration.conf.get("ldap", "user_name_attr"),
conf.get("ldap", "user_filter"),
conf.get("ldap", "user_name_attr"),
username
)
search_scope = LEVEL
if configuration.conf.has_option("ldap", "search_scope"):
if configuration.conf.get("ldap", "search_scope") == "SUBTREE":
if conf.has_option("ldap", "search_scope"):
if conf.get("ldap", "search_scope") == "SUBTREE":
search_scope = SUBTREE
else:
search_scope = LEVEL
# todo: BASE or ONELEVEL?
res = conn.search(configuration.conf.get("ldap", "basedn"), search_filter, search_scope=search_scope)
res = conn.search(conf.get("ldap", "basedn"), search_filter, search_scope=search_scope)
# todo: use list or result?
if not res:

Просмотреть файл

@ -26,7 +26,7 @@ import re
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow import configuration
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow.models import TaskInstance
@ -174,7 +174,7 @@ class QuboleHook(BaseHook):
if fp is None:
iso = datetime.datetime.utcnow().isoformat()
logpath = os.path.expanduser(
configuration.conf.get('core', 'BASE_LOG_FOLDER')
conf.get('core', 'BASE_LOG_FOLDER')
)
resultpath = logpath + '/' + self.dag_id + '/' + self.task_id + '/results'
pathlib.Path(resultpath).mkdir(parents=True, exist_ok=True)

Просмотреть файл

@ -20,7 +20,7 @@
from base64 import b64encode
from select import select
from airflow import configuration
from airflow.configuration import conf
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@ -152,7 +152,7 @@ class SSHOperator(BaseOperator):
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
enable_pickling = configuration.conf.getboolean(
enable_pickling = conf.getboolean(
'core', 'enable_xcom_pickling'
)
if enable_pickling:

Просмотреть файл

@ -22,7 +22,7 @@ import logging
from winrm.exceptions import WinRMOperationTimeoutError
from airflow import configuration
from airflow.configuration import conf
from airflow.contrib.hooks.winrm_hook import WinRMHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
@ -124,7 +124,7 @@ class WinRMOperator(BaseOperator):
if return_code == 0:
# returning output if do_xcom_push is set
enable_pickling = configuration.conf.getboolean(
enable_pickling = conf.getboolean(
'core', 'enable_xcom_pickling'
)
if enable_pickling:

Просмотреть файл

@ -20,7 +20,7 @@
import logging
from logging import StreamHandler
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.helpers import parse_template_string

Просмотреть файл

@ -20,7 +20,7 @@
from typing import Optional
import sys
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.local_executor import LocalExecutor
@ -44,7 +44,7 @@ def get_default_executor():
if DEFAULT_EXECUTOR is not None:
return DEFAULT_EXECUTOR
executor_name = configuration.conf.get('core', 'EXECUTOR')
executor_name = conf.get('core', 'EXECUTOR')
DEFAULT_EXECUTOR = _get_executor(executor_name)

Просмотреть файл

@ -21,12 +21,12 @@ from collections import OrderedDict
# To avoid circular imports
import airflow.utils.dag_processing
from airflow import configuration
from airflow.configuration import conf
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
PARALLELISM = configuration.conf.getint('core', 'PARALLELISM')
PARALLELISM = conf.getint('core', 'PARALLELISM')
class BaseExecutor(LoggingMixin):

Просмотреть файл

@ -27,7 +27,7 @@ from multiprocessing import Pool, cpu_count
from celery import Celery
from celery import states as celery_states
from airflow import configuration
from airflow.configuration import conf
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
@ -45,15 +45,15 @@ To start the celery worker, run the command:
airflow worker
'''
if configuration.conf.has_option('celery', 'celery_config_options'):
if conf.has_option('celery', 'celery_config_options'):
celery_configuration = import_string(
configuration.conf.get('celery', 'celery_config_options')
conf.get('celery', 'celery_config_options')
)
else:
celery_configuration = DEFAULT_CELERY_CONFIG
app = Celery(
configuration.conf.get('celery', 'CELERY_APP_NAME'),
conf.get('celery', 'CELERY_APP_NAME'),
config_source=celery_configuration)
@ -141,7 +141,7 @@ class CeleryExecutor(BaseExecutor):
# (which can become a bottleneck on bigger clusters) so we use
# a multiprocessing pool to speed this up.
# How many worker processes are created for checking celery task state.
self._sync_parallelism = configuration.getint('celery', 'SYNC_PARALLELISM')
self._sync_parallelism = conf.getint('celery', 'SYNC_PARALLELISM')
if self._sync_parallelism == 0:
self._sync_parallelism = max(1, cpu_count() - 1)

Просмотреть файл

@ -21,7 +21,7 @@ import distributed
import subprocess
import warnings
from airflow import configuration
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
@ -31,15 +31,15 @@ class DaskExecutor(BaseExecutor):
"""
def __init__(self, cluster_address=None):
if cluster_address is None:
cluster_address = configuration.conf.get('dask', 'cluster_address')
cluster_address = conf.get('dask', 'cluster_address')
if not cluster_address:
raise ValueError(
'Please provide a Dask cluster address in airflow.cfg')
self.cluster_address = cluster_address
# ssl / tls parameters
self.tls_ca = configuration.get('dask', 'tls_ca')
self.tls_key = configuration.get('dask', 'tls_key')
self.tls_cert = configuration.get('dask', 'tls_cert')
self.tls_ca = conf.get('dask', 'tls_ca')
self.tls_key = conf.get('dask', 'tls_key')
self.tls_cert = conf.get('dask', 'tls_cert')
super().__init__(parallelism=0)
def start(self):

Просмотреть файл

@ -27,7 +27,6 @@ from uuid import uuid4
import kubernetes
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from airflow.configuration import conf
from airflow.kubernetes.pod_launcher import PodLauncher
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.worker_configuration import WorkerConfiguration
@ -36,7 +35,8 @@ from airflow.executors import Executors
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
from airflow.utils.state import State
from airflow.utils.db import provide_session, create_session
from airflow import configuration, settings
from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
@ -127,24 +127,24 @@ class KubeConfig:
kubernetes_section = 'kubernetes'
def __init__(self):
configuration_dict = configuration.as_dict(display_sensitive=True)
configuration_dict = conf.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict['core']
self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {})
self.env_from_configmap_ref = configuration.get(self.kubernetes_section,
'env_from_configmap_ref')
self.env_from_secret_ref = configuration.get(self.kubernetes_section,
'env_from_secret_ref')
self.env_from_configmap_ref = conf.get(self.kubernetes_section,
'env_from_configmap_ref')
self.env_from_secret_ref = conf.get(self.kubernetes_section,
'env_from_secret_ref')
self.airflow_home = settings.AIRFLOW_HOME
self.dags_folder = configuration.get(self.core_section, 'dags_folder')
self.parallelism = configuration.getint(self.core_section, 'parallelism')
self.worker_container_repository = configuration.get(
self.dags_folder = conf.get(self.core_section, 'dags_folder')
self.parallelism = conf.getint(self.core_section, 'parallelism')
self.worker_container_repository = conf.get(
self.kubernetes_section, 'worker_container_repository')
self.worker_container_tag = configuration.get(
self.worker_container_tag = conf.get(
self.kubernetes_section, 'worker_container_tag')
self.kube_image = '{}:{}'.format(
self.worker_container_repository, self.worker_container_tag)
self.kube_image_pull_policy = configuration.get(
self.kube_image_pull_policy = conf.get(
self.kubernetes_section, "worker_container_image_pull_policy"
)
self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {})
@ -216,7 +216,7 @@ class KubeConfig:
self.logs_volume_host = conf.get(self.kubernetes_section, 'logs_volume_host')
# This prop may optionally be set for PV Claims and is used to write logs
self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')
self.base_log_folder = conf.get(self.core_section, 'base_log_folder')
# The Kubernetes Namespace in which the Scheduler and Webserver reside. Note
# that if your
@ -277,7 +277,7 @@ class KubeConfig:
# pod security context items should return integers
# and only return a blank string if contexts are not set.
def _get_security_context_val(self, scontext):
val = configuration.get(self.kubernetes_section, scontext)
val = conf.get(self.kubernetes_section, scontext)
if len(val) == 0:
return val
else:

Просмотреть файл

@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
"""Hook for HDFS operations"""
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
@ -64,7 +64,7 @@ class HDFSHook(BaseHook):
# take the first.
effective_user = self.proxy_user
autoconfig = self.autoconfig
use_sasl = configuration.conf.get('core', 'security') == 'kerberos'
use_sasl = conf.get('core', 'security') == 'kerberos'
try:
connections = self.get_connections(self.hdfs_conn_id)

Просмотреть файл

@ -28,7 +28,7 @@ from tempfile import NamedTemporaryFile
import unicodecsv as csv
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.security import utils
@ -97,8 +97,8 @@ class HiveCliHook(BaseHook):
"Invalid Mapred Queue Priority. Valid values are: "
"{}".format(', '.join(HIVE_QUEUE_PRIORITIES)))
self.mapred_queue = mapred_queue or configuration.get('hive',
'default_hive_mapred_queue')
self.mapred_queue = mapred_queue or conf.get('hive',
'default_hive_mapred_queue')
self.mapred_queue_priority = mapred_queue_priority
self.mapred_job_name = mapred_job_name
@ -129,7 +129,7 @@ class HiveCliHook(BaseHook):
hive_bin = 'beeline'
jdbc_url = "jdbc:hive2://{host}:{port}/{schema}".format(
host=conn.host, port=conn.port, schema=conn.schema)
if configuration.conf.get('core', 'security') == 'kerberos':
if conf.get('core', 'security') == 'kerberos':
template = conn.extra_dejson.get(
'principal', "hive/_HOST@EXAMPLE.COM")
if "_HOST" in template:
@ -511,13 +511,13 @@ class HiveMetastoreHook(BaseHook):
auth_mechanism = ms.extra_dejson.get('authMechanism', 'NOSASL')
if configuration.conf.get('core', 'security') == 'kerberos':
if conf.get('core', 'security') == 'kerberos':
auth_mechanism = ms.extra_dejson.get('authMechanism', 'GSSAPI')
kerberos_service_name = ms.extra_dejson.get('kerberos_service_name', 'hive')
conn_socket = TSocket.TSocket(ms.host, ms.port)
if configuration.conf.get('core', 'security') == 'kerberos' \
if conf.get('core', 'security') == 'kerberos' \
and auth_mechanism == 'GSSAPI':
try:
import saslwrapper as sasl
@ -798,7 +798,7 @@ class HiveServer2Hook(BaseHook):
# we need to give a username
username = 'airflow'
kerberos_service_name = None
if configuration.conf.get('core', 'security') == 'kerberos':
if conf.get('core', 'security') == 'kerberos':
auth_mechanism = db.extra_dejson.get('authMechanism', 'KERBEROS')
kerberos_service_name = db.extra_dejson.get('kerberos_service_name', 'hive')

Просмотреть файл

@ -19,12 +19,12 @@
"""Hook for Web HDFS"""
from hdfs import InsecureClient, HdfsError
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
_kerberos_security_mode = configuration.conf.get("core", "security") == "kerberos"
_kerberos_security_mode = conf.get("core", "security") == "kerberos"
if _kerberos_security_mode:
try:
from hdfs.ext.kerberos import KerberosClient # pylint: disable=ungrouped-imports

Просмотреть файл

@ -26,7 +26,7 @@ from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from typing import Optional
from airflow import configuration as conf
from airflow.configuration import conf
from airflow import executors, models
from airflow.exceptions import (
AirflowException,

Просмотреть файл

@ -22,7 +22,7 @@ import os
import signal
import time
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.stats import Stats
from airflow.task.task_runner import get_task_runner

Просмотреть файл

@ -34,7 +34,7 @@ from setproctitle import setproctitle
from sqlalchemy import and_, func, not_, or_
from sqlalchemy.orm.session import make_transient
from airflow import configuration as conf
from airflow.configuration import conf
from airflow import executors, models, settings
from airflow.exceptions import AirflowException
from airflow.jobs.base_job import BaseJob

Просмотреть файл

@ -18,7 +18,7 @@
# under the License.
from functools import wraps
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.lineage.datasets import DataSet
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string

Просмотреть файл

@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
#
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.lineage import datasets
from airflow.lineage.backend import LineageBackend
from airflow.lineage.backend.atlas.typedefs import operator_typedef

Просмотреть файл

@ -21,7 +21,7 @@ import logging
import warnings
from logging.config import dictConfig
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string

Просмотреть файл

@ -21,9 +21,9 @@ from typing import Any
from sqlalchemy import MetaData
from sqlalchemy.ext.declarative import declarative_base
import airflow
from airflow.configuration import conf
SQL_ALCHEMY_SCHEMA = airflow.configuration.get("core", "SQL_ALCHEMY_SCHEMA")
SQL_ALCHEMY_SCHEMA = conf.get("core", "SQL_ALCHEMY_SCHEMA")
metadata = (
None

Просмотреть файл

@ -29,7 +29,8 @@ from typing import Callable, Dict, Iterable, List, Optional, Set, Any
import jinja2
from airflow import configuration, settings
from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.lineage import prepare_lineage, apply_lineage, DataSet
from airflow.models.dag import DAG
@ -261,7 +262,7 @@ class BaseOperator(LoggingMixin):
def __init__(
self,
task_id: str,
owner: str = configuration.conf.get('operators', 'DEFAULT_OWNER'),
owner: str = conf.get('operators', 'DEFAULT_OWNER'),
email: Optional[str] = None,
email_on_retry: bool = True,
email_on_failure: bool = True,
@ -279,7 +280,7 @@ class BaseOperator(LoggingMixin):
default_args: Optional[Dict] = None,
priority_weight: int = 1,
weight_rule: str = WeightRule.DOWNSTREAM,
queue: str = configuration.conf.get('celery', 'default_queue'),
queue: str = conf.get('celery', 'default_queue'),
pool: str = Pool.DEFAULT_POOL_NAME,
sla: Optional[timedelta] = None,
execution_timeout: Optional[timedelta] = None,
@ -348,7 +349,7 @@ class BaseOperator(LoggingMixin):
)
self._schedule_interval = schedule_interval
self.retries = retries if retries is not None else \
configuration.conf.getint('core', 'default_task_retries', fallback=0)
conf.getint('core', 'default_task_retries', fallback=0)
self.queue = queue
self.pool = pool
self.sla = sla

Просмотреть файл

@ -19,7 +19,7 @@
from airflow.typing import Protocol
from typing import Optional
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
@ -87,7 +87,7 @@ def get_fernet():
return _fernet
try:
fernet_key = configuration.conf.get('core', 'FERNET_KEY')
fernet_key = conf.get('core', 'FERNET_KEY')
if not fernet_key:
log.warning(
"empty cryptography key - values will not be stored encrypted."

Просмотреть файл

@ -35,7 +35,8 @@ from croniter import croniter
from dateutil.relativedelta import relativedelta
from sqlalchemy import Column, String, Boolean, Integer, Text, func, or_
from airflow import configuration, settings, utils
from airflow import settings, utils
from airflow.configuration import conf
from airflow.dag.base_dag import BaseDag
from airflow.exceptions import AirflowException, AirflowDagCycleException
from airflow.executors import LocalExecutor, get_default_executor
@ -208,13 +209,13 @@ class DAG(BaseDag, LoggingMixin):
user_defined_macros: Optional[Dict] = None,
user_defined_filters: Optional[Dict] = None,
default_args: Optional[Dict] = None,
concurrency: int = configuration.conf.getint('core', 'dag_concurrency'),
max_active_runs: int = configuration.conf.getint('core', 'max_active_runs_per_dag'),
concurrency: int = conf.getint('core', 'dag_concurrency'),
max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'),
dagrun_timeout: Optional[timedelta] = None,
sla_miss_callback: Optional[Callable] = None,
default_view: Optional[str] = None,
orientation: str = configuration.conf.get('webserver', 'dag_orientation'),
catchup: bool = configuration.conf.getboolean('scheduler', 'catchup_by_default'),
orientation: str = conf.get('webserver', 'dag_orientation'),
catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'),
on_success_callback: Optional[Callable] = None,
on_failure_callback: Optional[Callable] = None,
doc_md: Optional[str] = None,
@ -360,7 +361,7 @@ class DAG(BaseDag, LoggingMixin):
def get_default_view(self):
"""This is only there for backward compatible jinja2 templates"""
if self._default_view is None:
return configuration.conf.get('webserver', 'dag_default_view').lower()
return conf.get('webserver', 'dag_default_view').lower()
else:
return self._default_view
@ -1202,7 +1203,7 @@ class DAG(BaseDag, LoggingMixin):
mark_success=False,
local=False,
executor=None,
donot_pickle=configuration.conf.getboolean('core', 'donot_pickle'),
donot_pickle=conf.getboolean('core', 'donot_pickle'),
ignore_task_deps=False,
ignore_first_depends_on_past=False,
pool=None,
@ -1493,7 +1494,7 @@ class DagModel(Base):
dag_id = Column(String(ID_LEN), primary_key=True)
# A DAG can be paused from the UI / DB
# Set this default value of is_paused based on a configuration value!
is_paused_at_creation = configuration.conf\
is_paused_at_creation = conf\
.getboolean('core',
'dags_are_paused_at_creation')
is_paused = Column(Boolean, default=is_paused_at_creation)
@ -1545,7 +1546,7 @@ class DagModel(Base):
def get_default_view(self):
if self.default_view is None:
return configuration.conf.get('webserver', 'dag_default_view').lower()
return conf.get('webserver', 'dag_default_view').lower()
else:
return self.default_view

Просмотреть файл

@ -30,7 +30,8 @@ from datetime import datetime, timedelta
from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError
from sqlalchemy import or_
from airflow import configuration, settings
from airflow import settings
from airflow.configuration import conf
from airflow.dag.base_dag import BaseDagBag
from airflow.exceptions import AirflowDagCycleException
from airflow.executors import get_default_executor
@ -76,8 +77,8 @@ class DagBag(BaseDagBag, LoggingMixin):
self,
dag_folder=None,
executor=None,
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
# do not use default arg in signature, to fix import cycle on plugin load
if executor is None:
@ -193,7 +194,7 @@ class DagBag(BaseDagBag, LoggingMixin):
if mod_name in sys.modules:
del sys.modules[mod_name]
with timeout(configuration.conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
with timeout(conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
try:
m = imp.load_source(mod_name, filepath)
mods.append(m)
@ -283,7 +284,7 @@ class DagBag(BaseDagBag, LoggingMixin):
# How many seconds do we wait for tasks to heartbeat before mark them as zombies.
zombie_threshold_secs = (
configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
limit_dttm = timezone.utcnow() - timedelta(
seconds=zombie_threshold_secs)
self.log.debug("Failing jobs without heartbeat after %s", limit_dttm)
@ -303,7 +304,7 @@ class DagBag(BaseDagBag, LoggingMixin):
for ti in tis:
self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s",
ti.dag_id, ti.task_id, ti.execution_date.isoformat())
ti.test_mode = configuration.getboolean('core', 'unit_test_mode')
ti.test_mode = conf.getboolean('core', 'unit_test_mode')
ti.task = self.dags[ti.dag_id].get_task(ti.task_id)
ti.handle_failure("{} detected as zombie".format(ti),
ti.test_mode, ti.get_template_context())
@ -351,8 +352,8 @@ class DagBag(BaseDagBag, LoggingMixin):
self,
dag_folder=None,
only_if_updated=True,
include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
"""
Given a file path or a folder, this method looks for python modules,
imports them and adds them to the dagbag collection.

Просмотреть файл

@ -37,9 +37,10 @@ from sqlalchemy import Column, Float, Index, Integer, PickleType, String, func
from sqlalchemy.orm import reconstructor
from sqlalchemy.orm.session import Session
from airflow import configuration, settings
from airflow.exceptions import (AirflowException, AirflowRescheduleException,
AirflowSkipException, AirflowTaskTimeout)
from airflow import settings
from airflow.configuration import conf
from airflow.models.base import Base, ID_LEN
from airflow.models.log import Log
from airflow.models.pool import Pool
@ -367,14 +368,14 @@ class TaskInstance(Base, LoggingMixin):
@property
def log_filepath(self):
iso = self.execution_date.isoformat()
log = os.path.expanduser(configuration.conf.get('core', 'BASE_LOG_FOLDER'))
log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
return ("{log}/{dag_id}/{task_id}/{iso}.log".format(
log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))
@property
def log_url(self):
iso = quote(self.execution_date.isoformat())
base_url = configuration.conf.get('webserver', 'BASE_URL')
base_url = conf.get('webserver', 'BASE_URL')
return base_url + (
"/log?"
"execution_date={iso}"
@ -385,7 +386,7 @@ class TaskInstance(Base, LoggingMixin):
@property
def mark_success_url(self):
iso = quote(self.execution_date.isoformat())
base_url = configuration.conf.get('webserver', 'BASE_URL')
base_url = conf.get('webserver', 'BASE_URL')
return base_url + (
"/success"
"?task_id={task_id}"
@ -1159,7 +1160,7 @@ class TaskInstance(Base, LoggingMixin):
if task.params:
params.update(task.params)
if configuration.getboolean('core', 'dag_run_conf_overrides_params'):
if conf.getboolean('core', 'dag_run_conf_overrides_params'):
self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
class VariableAccessor:
@ -1193,7 +1194,7 @@ class TaskInstance(Base, LoggingMixin):
return str(self.var)
return {
'conf': configuration,
'conf': conf,
'dag': task.dag,
'dag_run': dag_run,
'ds': ds,
@ -1269,8 +1270,8 @@ class TaskInstance(Base, LoggingMixin):
)
def render(key, content):
if configuration.has_option('email', key):
path = configuration.get('email', key)
if conf.has_option('email', key):
path = conf.get('email', key)
with open(path) as file:
content = file.read()

Просмотреть файл

@ -23,7 +23,7 @@ import pickle
from sqlalchemy import Column, Integer, String, Index, LargeBinary, and_
from sqlalchemy.orm import reconstructor
from airflow import configuration
from airflow.configuration import conf
from airflow.models.base import Base, ID_LEN
from airflow.utils import timezone
from airflow.utils.db import provide_session
@ -65,7 +65,7 @@ class XCom(Base, LoggingMixin):
"""
@reconstructor
def init_on_load(self):
enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
self.value = pickle.loads(self.value)
else:
@ -157,7 +157,7 @@ class XCom(Base, LoggingMixin):
result = query.first()
if result:
enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
if enable_pickling:
return pickle.loads(result.value)
else:
@ -222,7 +222,7 @@ class XCom(Base, LoggingMixin):
def serialize_value(value):
# TODO: "pickling" has been deprecated and JSON is preferred.
# "pickling" will be removed in Airflow 2.0.
if configuration.getboolean('core', 'enable_xcom_pickling'):
if conf.getboolean('core', 'enable_xcom_pickling'):
return pickle.dumps(value)
try:

Просмотреть файл

@ -21,7 +21,7 @@ import re
from typing import Dict
from airflow.hooks.hive_hooks import HiveCliHook
from airflow import configuration
from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.operator_helpers import context_to_airflow_vars
@ -95,8 +95,8 @@ class HiveOperator(BaseOperator):
self.mapred_queue = mapred_queue
self.mapred_queue_priority = mapred_queue_priority
self.mapred_job_name = mapred_job_name
self.mapred_job_name_template = configuration.get('hive',
'mapred_job_name_template')
self.mapred_job_name_template = conf.get('hive',
'mapred_job_name_template')
# assigned lazily - just for consistency we can create the attribute with a
# `None` initial value, later it will be populated by the execute method.

Просмотреть файл

@ -21,7 +21,8 @@ import subprocess
import sys
import time
from airflow import configuration, LoggingMixin
from airflow import LoggingMixin
from airflow.configuration import conf
NEED_KRB181_WORKAROUND = None # type: Optional[bool]
@ -32,18 +33,18 @@ def renew_from_kt(principal, keytab):
# The config is specified in seconds. But we ask for that same amount in
# minutes to give ourselves a large renewal buffer.
renewal_lifetime = "%sm" % configuration.conf.getint('kerberos', 'reinit_frequency')
renewal_lifetime = "%sm" % conf.getint('kerberos', 'reinit_frequency')
cmd_principal = principal or configuration.conf.get('kerberos', 'principal').replace(
cmd_principal = principal or conf.get('kerberos', 'principal').replace(
"_HOST", socket.getfqdn()
)
cmdv = [
configuration.conf.get('kerberos', 'kinit_path'),
conf.get('kerberos', 'kinit_path'),
"-r", renewal_lifetime,
"-k", # host ticket
"-t", keytab, # specify keytab
"-c", configuration.conf.get('kerberos', 'ccache'), # specify credentials cache
"-c", conf.get('kerberos', 'ccache'), # specify credentials cache
cmd_principal
]
log.info("Reinitting kerberos from keytab: %s", " ".join(cmdv))
@ -73,8 +74,8 @@ def renew_from_kt(principal, keytab):
def perform_krb181_workaround(principal):
cmdv = [configuration.conf.get('kerberos', 'kinit_path'),
"-c", configuration.conf.get('kerberos', 'ccache'),
cmdv = [conf.get('kerberos', 'kinit_path'),
"-c", conf.get('kerberos', 'ccache'),
"-R"] # Renew ticket_cache
log.info(
@ -84,10 +85,10 @@ def perform_krb181_workaround(principal):
ret = subprocess.call(cmdv, close_fds=True)
if ret != 0:
principal = "%s/%s" % (principal or configuration.conf.get('kerberos', 'principal'),
principal = "%s/%s" % (principal or conf.get('kerberos', 'principal'),
socket.getfqdn())
princ = principal
ccache = configuration.conf.get('kerberos', 'principal')
ccache = conf.get('kerberos', 'principal')
log.error(
"Couldn't renew kerberos ticket in order to work around Kerberos 1.8.1 issue. Please check that "
"the ticket for '%s' is still renewable:\n $ kinit -f -c %s\nIf the 'renew until' date is the "
@ -104,7 +105,7 @@ def detect_conf_var() -> bool:
Sun Java Krb5LoginModule in Java6, so we need to take an action to work
around it.
"""
ticket_cache = configuration.conf.get('kerberos', 'ccache')
ticket_cache = conf.get('kerberos', 'ccache')
with open(ticket_cache, 'rb') as file:
# Note: this file is binary, so we check against a bytearray.
@ -118,4 +119,4 @@ def run(principal, keytab):
while True:
renew_from_kt(principal, keytab)
time.sleep(configuration.conf.getint('kerberos', 'reinit_frequency'))
time.sleep(conf.getint('kerberos', 'reinit_frequency'))

Просмотреть файл

@ -25,7 +25,7 @@ import string
import textwrap
from typing import Any
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.exceptions import InvalidStatsNameException
log = logging.getLogger(__name__)

Просмотреть файл

@ -17,11 +17,11 @@
# specific language governing permissions and limitations
# under the License.
from airflow import configuration
from airflow.configuration import conf
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
from airflow.exceptions import AirflowException
_TASK_RUNNER = configuration.conf.get('core', 'TASK_RUNNER')
_TASK_RUNNER = conf.get('core', 'TASK_RUNNER')
def get_task_runner(local_task_job):

Просмотреть файл

@ -24,7 +24,7 @@ import threading
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.utils.configuration import tmp_configuration_copy

Просмотреть файл

@ -21,7 +21,7 @@ import os
import json
from tempfile import mkstemp
from airflow import configuration as conf
from airflow.configuration import conf
def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):

Просмотреть файл

@ -39,7 +39,7 @@ from tabulate import tabulate
# To avoid circular imports
import airflow.models
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.models import errors

Просмотреть файл

@ -26,7 +26,7 @@ from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.utils import formatdate
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.log.logging_mixin import LoggingMixin
@ -37,7 +37,7 @@ def send_email(to, subject, html_content,
"""
Send email using backend specified in EMAIL_BACKEND.
"""
path, attr = configuration.conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
path, attr = conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
module = importlib.import_module(path)
backend = getattr(module, attr)
to = get_email_address_list(to)
@ -57,7 +57,7 @@ def send_email_smtp(to, subject, html_content, files=None,
>>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
"""
smtp_mail_from = configuration.conf.get('smtp', 'SMTP_MAIL_FROM')
smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM')
to = get_email_address_list(to)
@ -97,16 +97,16 @@ def send_email_smtp(to, subject, html_content, files=None,
def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
log = LoggingMixin().log
SMTP_HOST = configuration.conf.get('smtp', 'SMTP_HOST')
SMTP_PORT = configuration.conf.getint('smtp', 'SMTP_PORT')
SMTP_STARTTLS = configuration.conf.getboolean('smtp', 'SMTP_STARTTLS')
SMTP_SSL = configuration.conf.getboolean('smtp', 'SMTP_SSL')
SMTP_HOST = conf.get('smtp', 'SMTP_HOST')
SMTP_PORT = conf.getint('smtp', 'SMTP_PORT')
SMTP_STARTTLS = conf.getboolean('smtp', 'SMTP_STARTTLS')
SMTP_SSL = conf.getboolean('smtp', 'SMTP_SSL')
SMTP_USER = None
SMTP_PASSWORD = None
try:
SMTP_USER = configuration.conf.get('smtp', 'SMTP_USER')
SMTP_PASSWORD = configuration.conf.get('smtp', 'SMTP_PASSWORD')
SMTP_USER = conf.get('smtp', 'SMTP_USER')
SMTP_PASSWORD = conf.get('smtp', 'SMTP_PASSWORD')
except AirflowConfigException:
log.debug("No user/password found for SMTP, so logging in with no authentication.")

Просмотреть файл

@ -35,12 +35,12 @@ import signal
from jinja2 import Template
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
# When killing processes, time to wait after issuing a SIGTERM before issuing a
# SIGKILL.
DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = configuration.conf.getint(
DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint(
'core', 'KILLED_TASK_CLEANUP_TIME'
)

Просмотреть файл

@ -21,7 +21,7 @@ import logging
import os
import requests
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.configuration import AirflowConfigException
from airflow.utils.file import mkdirs
from airflow.utils.helpers import parse_template_string

Просмотреть файл

@ -21,7 +21,7 @@ import os
from cached_property import cached_property
from urllib.parse import urlparse
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
@ -44,7 +44,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
@cached_property
def hook(self):
remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID')
remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID')
try:
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
return GoogleCloudStorageHook(

Просмотреть файл

@ -20,7 +20,7 @@ import os
from cached_property import cached_property
from airflow import configuration
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.file_task_handler import FileTaskHandler
@ -41,7 +41,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
@cached_property
def hook(self):
remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID')
remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID')
try:
from airflow.hooks.S3_hook import S3Hook
return S3Hook(remote_conn_id)
@ -164,7 +164,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
log,
key=remote_log_location,
replace=True,
encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
encrypt=conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
)
except Exception:
self.log.exception('Could not write logs to %s', remote_log_location)

Просмотреть файл

@ -21,7 +21,7 @@ import shutil
from cached_property import cached_property
from airflow import configuration
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.file_task_handler import FileTaskHandler
from azure.common import AzureHttpError
@ -47,7 +47,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
@cached_property
def hook(self):
remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID')
try:
from airflow.contrib.hooks.wasb_hook import WasbHook
return WasbHook(remote_conn_id)

Просмотреть файл

@ -17,7 +17,7 @@
# specific language governing permissions and limitations
# under the License.
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
# Constants for resources (megabytes are the base unit)
@ -105,10 +105,10 @@ class Resources:
:type gpus: long
"""
def __init__(self,
cpus=configuration.conf.getint('operators', 'default_cpus'),
ram=configuration.conf.getint('operators', 'default_ram'),
disk=configuration.conf.getint('operators', 'default_disk'),
gpus=configuration.conf.getint('operators', 'default_gpus')
cpus=conf.getint('operators', 'default_cpus'),
ram=conf.getint('operators', 'default_ram'),
disk=conf.getint('operators', 'default_disk'),
gpus=conf.getint('operators', 'default_gpus')
):
self.cpus = CpuResource(cpus)
self.ram = RamResource(ram)

Просмотреть файл

@ -30,7 +30,7 @@ from urllib.parse import urlparse
from werkzeug.middleware.proxy_fix import ProxyFix
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from airflow import configuration as conf
from airflow.configuration import conf
from airflow import settings
from airflow.logging_config import configure_logging
from airflow.utils.json import AirflowJsonEncoder

Просмотреть файл

@ -36,7 +36,7 @@ import flask_appbuilder.models.sqla.filters as fab_sqlafilters
import sqlalchemy as sqla
from urllib.parse import urlencode
from airflow import configuration
from airflow.configuration import conf
from airflow.models import BaseOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import timezone
@ -57,8 +57,8 @@ DEFAULT_SENSITIVE_VARIABLE_FIELDS = (
def should_hide_value_for_key(key_name):
# It is possible via importing variables from file that a key is empty.
if key_name:
config_set = configuration.conf.getboolean('admin',
'hide_sensitive_variable_fields')
config_set = conf.getboolean('admin',
'hide_sensitive_variable_fields')
field_comp = any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS)
return config_set and field_comp
return False

Просмотреть файл

@ -46,7 +46,7 @@ from urllib.parse import quote
from wtforms import SelectField, validators
import airflow
from airflow import configuration as conf
from airflow.configuration import conf, AIRFLOW_CONFIG
from airflow import jobs, models
from airflow import settings
from airflow._vendor import nvd3
@ -1952,10 +1952,10 @@ class ConfigurationView(AirflowBaseView):
def conf(self):
raw = request.args.get('raw') == "true"
title = "Airflow Configuration"
subtitle = conf.AIRFLOW_CONFIG
subtitle = AIRFLOW_CONFIG
# Don't show config when expose_config variable is False in airflow config
if conf.getboolean("webserver", "expose_config"):
with open(conf.AIRFLOW_CONFIG, 'r') as file:
with open(AIRFLOW_CONFIG, 'r') as file:
config = file.read()
table = [(section, key, value, source)
for section, parameters in conf.as_dict(True, True).items()

Просмотреть файл

@ -21,7 +21,8 @@ import logging
import pandas as pd
import sys
from airflow import configuration, settings
from airflow import settings
from airflow.configuration import conf
from airflow.jobs import SchedulerJob
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.utils import timezone
@ -191,7 +192,7 @@ def main():
logging.error('Specify a positive integer for timeout.')
sys.exit(1)
configuration.load_test_config()
conf.load_test_config()
set_dags_paused_state(False)
clear_dag_runs()

Просмотреть файл

@ -21,7 +21,8 @@ import unittest
import time
from datetime import datetime, timedelta
from airflow import configuration, models
from airflow import models
from airflow.configuration import conf
from airflow.api.common.experimental.mark_tasks import (
set_state, _create_dagruns, set_dag_run_state_to_success, set_dag_run_state_to_failed,
set_dag_run_state_to_running)
@ -251,7 +252,7 @@ class TestMarkTasks(unittest.TestCase):
# TODO: this skipIf should be removed once a fixing solution is found later
# We skip it here because this test case is working with Postgres & SQLite
# but not with MySQL
@unittest.skipIf('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), "Flaky with MySQL")
@unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'), "Flaky with MySQL")
def test_mark_tasks_subdag(self):
# set one task to success towards end of scheduled dag runs
task = self.dag2.get_task("section-1")

Просмотреть файл

@ -19,7 +19,7 @@
import unittest
from airflow import configuration
from airflow.configuration import conf
from airflow import models
from airflow.contrib.operators.s3_to_sftp_operator import S3ToSFTPOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
@ -88,7 +88,7 @@ class TestS3ToSFTPOperator(unittest.TestCase):
@mock_s3
def test_s3_to_sftp_operation(self):
# Setting
configuration.conf.set("core", "enable_xcom_pickling", "True")
conf.set("core", "enable_xcom_pickling", "True")
test_remote_file_content = \
"This is remote file content \n which is also multiline " \
"another line here \n this is last line. EOF"

Просмотреть файл

@ -25,7 +25,7 @@ from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONF
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.timezone import datetime
from airflow.utils.log.logging_mixin import set_context
from airflow import configuration as conf
from airflow.configuration import conf
DEFAULT_DATE = datetime(2019, 1, 1)
TASK_LOGGER = 'airflow.task'

Просмотреть файл

@ -45,7 +45,7 @@ from pendulum import utcnow
from airflow import configuration, models
from airflow import jobs, DAG, utils, settings, exceptions
from airflow.bin import cli
from airflow.configuration import AirflowConfigException, run_command
from airflow.configuration import AirflowConfigException, run_command, conf
from airflow.exceptions import AirflowException
from airflow.executors import SequentialExecutor
from airflow.hooks.base_hook import BaseHook
@ -364,7 +364,7 @@ class TestCore(unittest.TestCase):
self.assertIsNone(additional_dag_run)
def test_confirm_unittest_mod(self):
self.assertTrue(configuration.conf.get('core', 'unit_test_mode'))
self.assertTrue(conf.get('core', 'unit_test_mode'))
def test_pickling(self):
dp = self.dag.pickle()
@ -793,13 +793,13 @@ class TestCore(unittest.TestCase):
self.assertNotIn("{FERNET_KEY}", cfg)
def test_config_use_original_when_original_and_fallback_are_present(self):
self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY"))
self.assertFalse(configuration.conf.has_option("core", "FERNET_KEY_CMD"))
self.assertTrue(conf.has_option("core", "FERNET_KEY"))
self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))
FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
FERNET_KEY = conf.get('core', 'FERNET_KEY')
with conf_vars({('core', 'FERNET_KEY_CMD'): 'printf HELLO'}):
FALLBACK_FERNET_KEY = configuration.conf.get(
FALLBACK_FERNET_KEY = conf.get(
"core",
"FERNET_KEY"
)
@ -807,12 +807,12 @@ class TestCore(unittest.TestCase):
self.assertEqual(FERNET_KEY, FALLBACK_FERNET_KEY)
def test_config_throw_error_when_original_and_fallback_is_absent(self):
self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY"))
self.assertFalse(configuration.conf.has_option("core", "FERNET_KEY_CMD"))
self.assertTrue(conf.has_option("core", "FERNET_KEY"))
self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))
with conf_vars({('core', 'fernet_key'): None}):
with self.assertRaises(AirflowConfigException) as cm:
configuration.conf.get("core", "FERNET_KEY")
conf.get("core", "FERNET_KEY")
exception = str(cm.exception)
message = "section/key [core/fernet_key] not found in config"
@ -824,7 +824,7 @@ class TestCore(unittest.TestCase):
self.assertNotIn(key, os.environ)
os.environ[key] = value
FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
FERNET_KEY = conf.get('core', 'FERNET_KEY')
self.assertEqual(value, FERNET_KEY)
# restore the envvar back to the original state
@ -836,7 +836,7 @@ class TestCore(unittest.TestCase):
self.assertNotIn(key, os.environ)
os.environ[key] = value
FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
FERNET_KEY = conf.get('core', 'FERNET_KEY')
self.assertEqual(value, FERNET_KEY)
# restore the envvar back to the original state
@ -2226,7 +2226,7 @@ send_email_test = mock.Mock()
class TestEmail(unittest.TestCase):
def setUp(self):
configuration.conf.remove_option('email', 'EMAIL_BACKEND')
conf.remove_option('email', 'EMAIL_BACKEND')
@mock.patch('airflow.utils.email.send_email')
def test_default_backend(self, mock_send_email):
@ -2246,7 +2246,7 @@ class TestEmail(unittest.TestCase):
class TestEmailSmtp(unittest.TestCase):
def setUp(self):
configuration.conf.set('smtp', 'SMTP_SSL', 'False')
conf.set('smtp', 'SMTP_SSL', 'False')
@mock.patch('airflow.utils.email.send_MIME_email')
def test_send_smtp(self, mock_send_mime):
@ -2256,11 +2256,11 @@ class TestEmailSmtp(unittest.TestCase):
utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name])
self.assertTrue(mock_send_mime.called)
call_args = mock_send_mime.call_args[0]
self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
self.assertEqual(['to'], call_args[1])
msg = call_args[2]
self.assertEqual('subject', msg['Subject'])
self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
self.assertEqual(2, len(msg.get_payload()))
filename = 'attachment; filename="' + os.path.basename(attachment.name) + '"'
self.assertEqual(filename, msg.get_payload()[-1].get('Content-Disposition'))
@ -2284,11 +2284,11 @@ class TestEmailSmtp(unittest.TestCase):
utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name], cc='cc', bcc='bcc')
self.assertTrue(mock_send_mime.called)
call_args = mock_send_mime.call_args[0]
self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
self.assertEqual(['to', 'cc', 'bcc'], call_args[1])
msg = call_args[2]
self.assertEqual('subject', msg['Subject'])
self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
self.assertEqual(2, len(msg.get_payload()))
self.assertEqual('attachment; filename="' + os.path.basename(attachment.name) + '"',
msg.get_payload()[-1].get('Content-Disposition'))
@ -2303,13 +2303,13 @@ class TestEmailSmtp(unittest.TestCase):
msg = MIMEMultipart()
utils.email.send_MIME_email('from', 'to', msg, dryrun=False)
mock_smtp.assert_called_once_with(
configuration.conf.get('smtp', 'SMTP_HOST'),
configuration.conf.getint('smtp', 'SMTP_PORT'),
conf.get('smtp', 'SMTP_HOST'),
conf.getint('smtp', 'SMTP_PORT'),
)
self.assertTrue(mock_smtp.return_value.starttls.called)
mock_smtp.return_value.login.assert_called_once_with(
configuration.conf.get('smtp', 'SMTP_USER'),
configuration.conf.get('smtp', 'SMTP_PASSWORD'),
conf.get('smtp', 'SMTP_USER'),
conf.get('smtp', 'SMTP_PASSWORD'),
)
mock_smtp.return_value.sendmail.assert_called_once_with('from', 'to', msg.as_string())
self.assertTrue(mock_smtp.return_value.quit.called)
@ -2323,8 +2323,8 @@ class TestEmailSmtp(unittest.TestCase):
utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False)
self.assertFalse(mock_smtp.called)
mock_smtp_ssl.assert_called_once_with(
configuration.conf.get('smtp', 'SMTP_HOST'),
configuration.conf.getint('smtp', 'SMTP_PORT'),
conf.get('smtp', 'SMTP_HOST'),
conf.getint('smtp', 'SMTP_PORT'),
)
@mock.patch('smtplib.SMTP_SSL')
@ -2339,8 +2339,8 @@ class TestEmailSmtp(unittest.TestCase):
utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False)
self.assertFalse(mock_smtp_ssl.called)
mock_smtp.assert_called_once_with(
configuration.conf.get('smtp', 'SMTP_HOST'),
configuration.conf.getint('smtp', 'SMTP_PORT'),
conf.get('smtp', 'SMTP_HOST'),
conf.getint('smtp', 'SMTP_PORT'),
)
self.assertFalse(mock_smtp.login.called)

Просмотреть файл

@ -47,7 +47,7 @@ def print_today():
def check_hive_conf():
from airflow import configuration as conf
from airflow.configuration import conf
assert conf.get('hive', 'default_hive_mapred_queue') == 'airflow'

Просмотреть файл

@ -33,7 +33,7 @@ from parameterized import parameterized
from airflow.utils.state import State
from airflow.executors import celery_executor
from airflow import configuration
from airflow.configuration import conf
# leave this it is used by the test worker
import celery.contrib.testing.tasks # noqa: F401 pylint: disable=ungrouped-imports
@ -45,14 +45,14 @@ def _prepare_test_bodies():
(url, )
for url in os.environ['CELERY_BROKER_URLS'].split(',')
]
return [(configuration.conf.get('celery', 'BROKER_URL'))]
return [(conf.get('celery', 'BROKER_URL'))]
class TestCeleryExecutor(unittest.TestCase):
@contextlib.contextmanager
def _prepare_app(self, broker_url=None, execute=None):
broker_url = broker_url or configuration.conf.get('celery', 'BROKER_URL')
broker_url = broker_url or conf.get('celery', 'BROKER_URL')
execute = execute or celery_executor.execute_command.__wrapped__
test_config = dict(celery_executor.celery_configuration)
@ -70,7 +70,7 @@ class TestCeleryExecutor(unittest.TestCase):
set_event_loop(None)
@parameterized.expand(_prepare_test_bodies())
@unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
@unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
"sqlite is configured with SequentialExecutor")
def test_celery_integration(self, broker_url):
with self._prepare_app(broker_url) as app:
@ -123,7 +123,7 @@ class TestCeleryExecutor(unittest.TestCase):
self.assertNotIn('success', executor.last_state)
self.assertNotIn('fail', executor.last_state)
@unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
@unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
"sqlite is configured with SequentialExecutor")
def test_error_sending_task(self):
def fake_execute_command():

Просмотреть файл

@ -20,7 +20,7 @@
import unittest
from unittest import mock
from airflow import configuration
from airflow.configuration import conf
from airflow.models import DagBag
from airflow.jobs import BackfillJob
from airflow.utils import timezone
@ -41,7 +41,7 @@ try:
except ImportError:
SKIP_DASK = True
if 'sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'):
if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
SKIP_DASK = True
# Always skip due to issues on python 3 issues
@ -140,9 +140,9 @@ class TestDaskExecutorTLS(TestBaseDask):
# These use test certs that ship with dask/distributed and should not be
# used in production
configuration.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem'))
configuration.set('dask', 'tls_cert', get_cert('tls-key-cert.pem'))
configuration.set('dask', 'tls_key', get_cert('tls-key.pem'))
conf.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem'))
conf.set('dask', 'tls_cert', get_cert('tls-key-cert.pem'))
conf.set('dask', 'tls_key', get_cert('tls-key.pem'))
try:
executor = DaskExecutor(cluster_address=s['address'])
@ -153,9 +153,9 @@ class TestDaskExecutorTLS(TestBaseDask):
# and tasks to have completed.
executor.client.close()
finally:
configuration.set('dask', 'tls_ca', '')
configuration.set('dask', 'tls_key', '')
configuration.set('dask', 'tls_cert', '')
conf.set('dask', 'tls_ca', '')
conf.set('dask', 'tls_key', '')
conf.set('dask', 'tls_cert', '')
@unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
@mock.patch('airflow.executors.dask_executor.DaskExecutor.sync')

Просмотреть файл

@ -28,7 +28,7 @@ import sqlalchemy
from parameterized import parameterized
from airflow import AirflowException, settings
from airflow import configuration
from airflow.configuration import conf
from airflow.bin import cli
from airflow.exceptions import AirflowTaskTimeout
from airflow.exceptions import DagConcurrencyLimitReached, NoAvailablePoolSlot, TaskConcurrencyLimitReached
@ -129,7 +129,7 @@ class TestBackfillJob(unittest.TestCase):
self.assertEqual(State.SUCCESS, dag_run.state)
@unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
@unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
"concurrent access not supported in sqlite")
def test_trigger_controller_dag(self):
dag = self.dagbag.get_dag('example_trigger_controller_dag')
@ -155,7 +155,7 @@ class TestBackfillJob(unittest.TestCase):
self.assertTrue(task_instances_list.append.called)
@unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
@unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
"concurrent access not supported in sqlite")
def test_backfill_multi_dates(self):
dag = self.dagbag.get_dag('example_bash_operator')
@ -209,7 +209,7 @@ class TestBackfillJob(unittest.TestCase):
session.close()
@unittest.skipIf(
"sqlite" in configuration.conf.get("core", "sql_alchemy_conn"),
"sqlite" in conf.get("core", "sql_alchemy_conn"),
"concurrent access not supported in sqlite",
)
@parameterized.expand(

Просмотреть файл

@ -23,7 +23,7 @@ import time
import unittest
from airflow import AirflowException, models, settings
from airflow import configuration
from airflow.configuration import conf
from airflow.executors import SequentialExecutor
from airflow.jobs import LocalTaskJob
from airflow.models import DAG, TaskInstance as TI
@ -161,9 +161,9 @@ class TestLocalTaskJob(unittest.TestCase):
time2 = heartbeat_records[i]
self.assertGreaterEqual((time2 - time1).total_seconds(), job.heartrate)
@unittest.skipIf('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
@unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'),
"flaky when run on mysql")
@unittest.skipIf('postgresql' in configuration.conf.get('core', 'sql_alchemy_conn'),
@unittest.skipIf('postgresql' in conf.get('core', 'sql_alchemy_conn'),
'flaky when run on postgresql')
def test_mark_success_no_kill(self):
"""

Просмотреть файл

@ -30,7 +30,7 @@ from parameterized import parameterized
import airflow.example_dags
from airflow import AirflowException, models, settings
from airflow import configuration
from airflow.configuration import conf
from airflow.executors import BaseExecutor
from airflow.jobs import BackfillJob, SchedulerJob
from airflow.models import DAG, DagBag, DagModel, DagRun, Pool, SlaMiss, \
@ -79,20 +79,17 @@ class TestSchedulerJob(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.dagbag = DagBag()
def getboolean(section, key):
if section.lower() == 'core' and key.lower() == 'load_examples':
return False
else:
return configuration.conf.getboolean(section, key)
cls.patcher = mock.patch('airflow.jobs.scheduler_job.conf.getboolean')
cls.mock_getboolean = cls.patcher.start()
cls.mock_getboolean.side_effect = getboolean
cls.old_val = None
if conf.has_option('core', 'load_examples'):
cls.old_val = conf.get('core', 'load_examples')
conf.set('core', 'load_examples', 'false')
@classmethod
def tearDownClass(cls):
cls.patcher.stop()
if cls.old_val is not None:
conf.set('core', 'load_examples', cls.old_val)
else:
conf.remove_option('core', 'load_examples')
def test_is_alive(self):
job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
@ -2276,7 +2273,7 @@ class TestSchedulerJob(unittest.TestCase):
schedule_interval='* * * * *',
start_date=six_hours_ago_to_the_hour,
catchup=True)
default_catchup = configuration.conf.getboolean('scheduler', 'catchup_by_default')
default_catchup = conf.getboolean('scheduler', 'catchup_by_default')
self.assertEqual(default_catchup, True)
self.assertEqual(dag1.catchup, True)

Просмотреть файл

@ -21,7 +21,8 @@ import datetime
import os
import unittest
from airflow import settings, configuration
from airflow import settings
from airflow.configuration import conf
from airflow.models import DAG, TaskInstance as TI, clear_task_instances, XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils import timezone
@ -239,7 +240,7 @@ class TestClearTasks(unittest.TestCase):
dag_id = "test_dag1"
task_id = "test_task1"
configuration.set("core", "enable_xcom_pickling", "False")
conf.set("core", "enable_xcom_pickling", "False")
XCom.set(key=key,
value=json_obj,
@ -269,7 +270,7 @@ class TestClearTasks(unittest.TestCase):
dag_id = "test_dag2"
task_id = "test_task2"
configuration.set("core", "enable_xcom_pickling", "True")
conf.set("core", "enable_xcom_pickling", "True")
XCom.set(key=key,
value=json_obj,
@ -297,7 +298,7 @@ class TestClearTasks(unittest.TestCase):
def __reduce__(self):
return os.system, ("ls -alt",)
configuration.set("core", "xcom_enable_pickling", "False")
conf.set("core", "xcom_enable_pickling", "False")
self.assertRaises(TypeError, XCom.set,
key="xcom_test3",
@ -315,7 +316,7 @@ class TestClearTasks(unittest.TestCase):
dag_id2 = "test_dag5"
task_id2 = "test_task5"
configuration.set("core", "xcom_enable_pickling", "True")
conf.set("core", "xcom_enable_pickling", "True")
XCom.set(key=key,
value=json_obj,

Просмотреть файл

@ -29,7 +29,8 @@ from unittest.mock import patch
import pendulum
from airflow import models, settings, configuration
from airflow import models, settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowDagCycleException
from airflow.models import DAG, DagModel, TaskInstance as TI
from airflow.operators.dummy_operator import DummyOperator
@ -722,7 +723,7 @@ class TestDag(unittest.TestCase):
self.assertTrue(orm_dag.is_active)
self.assertIsNone(orm_dag.default_view)
self.assertEqual(orm_dag.get_default_view(),
configuration.conf.get('webserver', 'dag_default_view').lower())
conf.get('webserver', 'dag_default_view').lower())
self.assertEqual(orm_dag.safe_dag_id, 'dag')
orm_subdag = session.query(DagModel).filter(

Просмотреть файл

@ -26,7 +26,8 @@ import unittest
from unittest.mock import patch, ANY
from tempfile import mkdtemp, NamedTemporaryFile
from airflow import models, configuration
from airflow import models
from airflow.configuration import conf
from airflow.jobs import LocalTaskJob as LJ
from airflow.models import DagModel, DagBag, TaskInstance as TI
from airflow.utils.db import create_session
@ -625,7 +626,7 @@ class TestDagBag(unittest.TestCase):
dagbag.kill_zombies()
mock_ti_handle_failure.assert_called_once_with(
ANY,
configuration.getboolean('core', 'unit_test_mode'),
conf.getboolean('core', 'unit_test_mode'),
ANY
)
@ -635,7 +636,7 @@ class TestDagBag(unittest.TestCase):
Test that kill zombies calls TI's failure handler with proper context
"""
zombie_threshold_secs = (
configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
with create_session() as session:
session.query(TI).delete()
@ -657,7 +658,7 @@ class TestDagBag(unittest.TestCase):
dagbag.kill_zombies()
mock_ti_handle_failure.assert_called_once_with(
ANY,
configuration.getboolean('core', 'unit_test_mode'),
conf.getboolean('core', 'unit_test_mode'),
ANY
)

Просмотреть файл

@ -27,7 +27,8 @@ import pendulum
from freezegun import freeze_time
from parameterized import parameterized, param
from sqlalchemy.orm.session import Session
from airflow import models, settings, configuration
from airflow import models, settings
from airflow.configuration import conf
from airflow.contrib.sensors.python_sensor import PythonSensor
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import DAG, DagRun, Pool, TaskFail, TaskInstance as TI, TaskReschedule
@ -1085,8 +1086,8 @@ class TestTaskInstance(unittest.TestCase):
ti = TI(
task=task, execution_date=datetime.datetime.now())
configuration.set('email', 'subject_template', '/subject/path')
configuration.set('email', 'html_content_template', '/html_content/path')
conf.set('email', 'subject_template', '/subject/path')
conf.set('email', 'html_content_template', '/html_content/path')
opener = mock_open(read_data='template: {{ti.task_id}}')
with patch('airflow.models.taskinstance.open', opener, create=True):

Просмотреть файл

@ -24,7 +24,8 @@ import unittest
from unittest import mock
import nose
from airflow import DAG, configuration, operators
from airflow import DAG, operators
from airflow.configuration import conf
from airflow.models import TaskInstance
from airflow.operators.hive_operator import HiveOperator
from airflow.utils import timezone
@ -93,7 +94,7 @@ class HiveOperatorConfigTest(TestHiveEnvironment):
dag=self.dag)
# just check that the correct default value in test_default.cfg is used
test_config_hive_mapred_queue = configuration.conf.get(
test_config_hive_mapred_queue = conf.get(
'hive',
'default_hive_mapred_queue'
)

Просмотреть файл

@ -21,7 +21,7 @@ import os
import unittest
from argparse import Namespace
from airflow import configuration
from airflow.configuration import conf
from airflow.security.kerberos import renew_from_kt
from airflow import LoggingMixin
from tests.test_utils.config import conf_vars
@ -31,11 +31,12 @@ from tests.test_utils.config import conf_vars
'Skipping Kerberos API tests due to missing KRB5_KTNAME')
class TestKerberos(unittest.TestCase):
def setUp(self):
if not configuration.conf.has_section("kerberos"):
configuration.conf.add_section("kerberos")
configuration.conf.set("kerberos", "keytab",
os.environ['KRB5_KTNAME'])
keytab_from_cfg = configuration.conf.get("kerberos", "keytab")
if not conf.has_section("kerberos"):
conf.add_section("kerberos")
conf.set("kerberos", "keytab",
os.environ['KRB5_KTNAME'])
keytab_from_cfg = conf.get("kerberos", "keytab")
self.args = Namespace(keytab=keytab_from_cfg, principal=None, pid=None,
daemon=None, stdout=None, stderr=None, log_file=None)

Просмотреть файл

@ -20,7 +20,7 @@ from unittest import mock
import unittest
from airflow import DAG
from airflow import configuration
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.sensors.sql_sensor import SqlSensor
from airflow.utils.timezone import datetime
@ -50,7 +50,7 @@ class TestSqlSensor(unittest.TestCase):
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless(
'mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), "this is a mysql test")
'mysql' in conf.get('core', 'sql_alchemy_conn'), "this is a mysql test")
def test_sql_sensor_mysql(self):
t1 = SqlSensor(
task_id='sql_sensor_check',
@ -70,7 +70,7 @@ class TestSqlSensor(unittest.TestCase):
t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@unittest.skipUnless(
'postgresql' in configuration.conf.get('core', 'sql_alchemy_conn'), "this is a postgres test")
'postgresql' in conf.get('core', 'sql_alchemy_conn'), "this is a postgres test")
def test_sql_sensor_postgres(self):
t1 = SqlSensor(
task_id='sql_sensor_check',

Просмотреть файл

@ -26,6 +26,7 @@ from airflow import configuration
from airflow.configuration import conf, AirflowConfigParser, parameterized_config
import unittest
from unittest import mock
@contextlib.contextmanager
@ -426,3 +427,10 @@ AIRFLOW_HOME = /root/airflow
self.assertEqual(test_conf.get('core', 'task_runner'), 'NotBashTaskRunner')
self.assertListEqual([], w)
def test_deprecated_funcs(self):
for func in ['load_test_config', 'get', 'getboolean', 'getfloat', 'getint', 'has_option',
'remove_option', 'as_dict', 'set']:
with mock.patch('airflow.configuration.{}'.format(func)):
with self.assertWarns(DeprecationWarning):
getattr(configuration, func)()

Просмотреть файл

@ -23,9 +23,8 @@ import pathlib
import sys
import tempfile
from airflow import configuration as conf
from airflow.exceptions import AirflowConfigException
from tests.compat import mock, patch
from airflow.configuration import conf
from tests.compat import patch
from tests.test_utils.config import conf_vars
import unittest
@ -213,21 +212,12 @@ class TestLoggingSettings(unittest.TestCase):
# When the key is not available in the configuration
def test_when_the_config_key_does_not_exists(self):
from airflow import logging_config
conf_get = conf.get
def side_effect(*args):
if args[1] == 'logging_config_class':
raise AirflowConfigException
else:
return conf_get(*args)
logging_config.conf.get = mock.Mock(side_effect=side_effect)
with patch.object(logging_config.log, 'debug') as mock_debug:
logging_config.configure_logging()
mock_debug.assert_any_call(
'Could not find key logging_config_class in config'
)
with conf_vars({('core', 'logging_config_class'): None}):
with patch.object(logging_config.log, 'debug') as mock_debug:
logging_config.configure_logging()
mock_debug.assert_any_call(
'Could not find key logging_config_class in config'
)
# Just default
def test_loading_local_settings_without_logging_config(self):

Просмотреть файл

@ -26,7 +26,7 @@ import unittest
from unittest import mock
from unittest.mock import (MagicMock, PropertyMock)
from airflow import configuration as conf
from airflow.configuration import conf
from airflow.jobs import DagFileProcessor
from airflow.utils import timezone
from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager,

Просмотреть файл

@ -25,7 +25,7 @@ import socket
from datetime import datetime
from airflow import configuration
from airflow.configuration import conf
from airflow.api.auth.backend.kerberos_auth import CLIENT_AUTH
from airflow.www import app as application
@ -35,19 +35,19 @@ from airflow.www import app as application
class TestApiKerberos(unittest.TestCase):
def setUp(self):
try:
configuration.conf.add_section("api")
conf.add_section("api")
except Exception:
pass
configuration.conf.set("api",
"auth_backend",
"airflow.api.auth.backend.kerberos_auth")
conf.set("api",
"auth_backend",
"airflow.api.auth.backend.kerberos_auth")
try:
configuration.conf.add_section("kerberos")
conf.add_section("kerberos")
except Exception:
pass
configuration.conf.set("kerberos",
"keytab",
os.environ['KRB5_KTNAME'])
conf.set("kerberos",
"keytab",
os.environ['KRB5_KTNAME'])
self.app, _ = application.create_app(testing=True)

Просмотреть файл

@ -37,7 +37,7 @@ from parameterized import parameterized
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse
from airflow import configuration as conf
from airflow.configuration import conf
from airflow import models, settings
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.jobs import BaseJob
@ -1702,7 +1702,7 @@ class TestTriggerDag(TestBase):
self.assertIn('/trigger?dag_id=example_bash_operator', resp.data.decode('utf-8'))
self.assertIn("return confirmDeleteDag(this, 'example_bash_operator')", resp.data.decode('utf-8'))
@unittest.skipIf('mysql' in conf.conf.get('core', 'sql_alchemy_conn'),
@unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'),
"flaky when run on mysql")
def test_trigger_dag_button(self):