diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index 4f4a31f402..9162b0709b 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -20,8 +20,8 @@
 
 from importlib import import_module
 
-from airflow.exceptions import AirflowException
-from airflow import configuration as conf
+from airflow.exceptions import AirflowException, AirflowConfigException
+from airflow.configuration import conf
 
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -42,7 +42,7 @@ def load_auth():
     auth_backend = 'airflow.api.auth.backend.default'
     try:
         auth_backend = conf.get("api", "auth_backend")
-    except conf.AirflowConfigException:
+    except AirflowConfigException:
         pass
 
     try:
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index c2edc514ea..6db971383a 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -40,7 +40,7 @@ import kerberos
 
 from requests_kerberos import HTTPKerberosAuth
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
diff --git a/airflow/bin/airflow b/airflow/bin/airflow
index d3e1255dc0..dfb60dfb65 100755
--- a/airflow/bin/airflow
+++ b/airflow/bin/airflow
@@ -21,14 +21,14 @@
 import os
 import argcomplete
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.bin.cli import CLIFactory
 
 if __name__ == '__main__':
 
-    if configuration.conf.get("core", "security") == 'kerberos':
-        os.environ['KRB5CCNAME'] = configuration.conf.get('kerberos', 'ccache')
-        os.environ['KRB5_KTNAME'] = configuration.conf.get('kerberos', 'keytab')
+    if conf.get("core", "security") == 'kerberos':
+        os.environ['KRB5CCNAME'] = conf.get('kerberos', 'ccache')
+        os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
 
     parser = CLIFactory.get_parser()
     argcomplete.autocomplete(parser)
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 4af4b5af82..f2134c262d 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -53,7 +53,7 @@ from typing import Any
 import airflow
 from airflow import api
 from airflow import jobs, settings
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowWebServerTimeout
 from airflow.executors import get_default_executor
 from airflow.models import (
@@ -507,7 +507,7 @@ def run(args, dag=None):
         if os.path.exists(args.cfg_path):
             os.remove(args.cfg_path)
 
-        conf.conf.read_dict(conf_dict, source=args.cfg_path)
+        conf.read_dict(conf_dict, source=args.cfg_path)
         settings.configure_vars()
 
     # IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 318e32b49d..3e97c03dfd 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -20,7 +20,7 @@
 import os
 from typing import Dict, Any
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.utils.file import mkdirs
 
 # TODO: Logging format and level should be configured
diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py
index 7a9fd25064..4a6da2a507 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -19,7 +19,7 @@
 
 import ssl
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -30,9 +30,9 @@ def _broker_supports_visibility_timeout(url):
 
 log = LoggingMixin().log
 
-broker_url = configuration.conf.get('celery', 'BROKER_URL')
+broker_url = conf.get('celery', 'BROKER_URL')
 
-broker_transport_options = configuration.conf.getsection(
+broker_transport_options = conf.getsection(
     'celery_broker_transport_options'
 )
 if 'visibility_timeout' not in broker_transport_options:
@@ -44,31 +44,31 @@ DEFAULT_CELERY_CONFIG = {
     'event_serializer': 'json',
     'worker_prefetch_multiplier': 1,
     'task_acks_late': True,
-    'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
-    'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'),
+    'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
+    'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),
     'broker_url': broker_url,
     'broker_transport_options': broker_transport_options,
-    'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'),
-    'worker_concurrency': configuration.conf.getint('celery', 'WORKER_CONCURRENCY'),
+    'result_backend': conf.get('celery', 'RESULT_BACKEND'),
+    'worker_concurrency': conf.getint('celery', 'WORKER_CONCURRENCY'),
 }
 
 celery_ssl_active = False
 try:
-    celery_ssl_active = configuration.conf.getboolean('celery', 'SSL_ACTIVE')
+    celery_ssl_active = conf.getboolean('celery', 'SSL_ACTIVE')
 except AirflowConfigException:
     log.warning("Celery Executor will run without SSL")
 
 try:
     if celery_ssl_active:
         if 'amqp://' in broker_url:
-            broker_use_ssl = {'keyfile': configuration.conf.get('celery', 'SSL_KEY'),
-                              'certfile': configuration.conf.get('celery', 'SSL_CERT'),
-                              'ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
+            broker_use_ssl = {'keyfile': conf.get('celery', 'SSL_KEY'),
+                              'certfile': conf.get('celery', 'SSL_CERT'),
+                              'ca_certs': conf.get('celery', 'SSL_CACERT'),
                               'cert_reqs': ssl.CERT_REQUIRED}
         elif 'redis://' in broker_url:
-            broker_use_ssl = {'ssl_keyfile': configuration.conf.get('celery', 'SSL_KEY'),
-                              'ssl_certfile': configuration.conf.get('celery', 'SSL_CERT'),
-                              'ssl_ca_certs': configuration.conf.get('celery', 'SSL_CACERT'),
+            broker_use_ssl = {'ssl_keyfile': conf.get('celery', 'SSL_KEY'),
+                              'ssl_certfile': conf.get('celery', 'SSL_CERT'),
+                              'ssl_ca_certs': conf.get('celery', 'SSL_CACERT'),
                               'ssl_cert_reqs': ssl.CERT_REQUIRED}
         else:
             raise AirflowException('The broker you configured does not support SSL_ACTIVE to be True. '
diff --git a/airflow/configuration.py b/airflow/configuration.py
index a46c813bb0..b36062d19c 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -595,7 +595,7 @@ set = conf.set # noqa
 for func in [load_test_config, get, getboolean, getfloat, getint, has_option,
              remove_option, as_dict, set]:
     deprecated(
-        func,
+        func.__name__,
         "Accessing configuration method '{f.__name__}' directly from "
         "the configuration module is deprecated. Please access the "
         "configuration from the 'configuration.conf' object via "
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index f9b1af7d61..0426a00f9b 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -26,8 +26,8 @@ from flask import url_for, redirect, request
 
 from flask_oauthlib.client import OAuth
 
-from airflow import models, configuration
-from airflow.configuration import AirflowConfigException
+from airflow import models
+from airflow.configuration import AirflowConfigException, conf
 from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -35,7 +35,7 @@ log = LoggingMixin().log
 
 
 def get_config_param(param):
-    return str(configuration.conf.get('github_enterprise', param))
+    return str(conf.get('github_enterprise', param))
 
 
 class GHEUser(models.User):
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index 9f880a51c5..8179c7448e 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -26,7 +26,8 @@ from flask import url_for, redirect, request
 
 from flask_oauthlib.client import OAuth
 
-from airflow import models, configuration
+from airflow import models
+from airflow.configuration import conf
 from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -34,7 +35,7 @@ log = LoggingMixin().log
 
 
 def get_config_param(param):
-    return str(configuration.conf.get('google', param))
+    return str(conf.get('google', param))
 
 
 class GoogleUser(models.User):
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index 4d1fbe079d..7f67848c09 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -33,7 +33,7 @@ from airflow.security import utils
 from flask import url_for, redirect
 
 from airflow import models
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.utils.db import provide_session
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -55,13 +55,13 @@ class KerberosUser(models.User, LoggingMixin):
     @staticmethod
     def authenticate(username, password):
         service_principal = "%s/%s" % (
-            configuration.conf.get('kerberos', 'principal'),
+            conf.get('kerberos', 'principal'),
             utils.get_fqdn()
         )
-        realm = configuration.conf.get("kerberos", "default_realm")
+        realm = conf.get("kerberos", "default_realm")
 
         try:
-            user_realm = configuration.conf.get("security", "default_realm")
+            user_realm = conf.get("security", "default_realm")
         except AirflowConfigException:
             user_realm = realm
 
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 6729f9df2c..eeadccaffe 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -29,7 +29,7 @@ import ssl
 from flask import url_for, redirect
 
 from airflow import models
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.configuration import AirflowConfigException
 from airflow.utils.db import provide_session
 
@@ -55,12 +55,12 @@ class LdapException(Exception):
 
 def get_ldap_connection(dn=None, password=None):
     try:
-        cacert = configuration.conf.get("ldap", "cacert")
+        cacert = conf.get("ldap", "cacert")
     except AirflowConfigException:
         pass
 
     try:
-        ignore_malformed_schema = configuration.conf.get("ldap", "ignore_malformed_schema")
+        ignore_malformed_schema = conf.get("ldap", "ignore_malformed_schema")
     except AirflowConfigException:
         pass
 
@@ -70,7 +70,7 @@ def get_ldap_connection(dn=None, password=None):
     tls_configuration = Tls(validate=ssl.CERT_REQUIRED,
                             ca_certs_file=cacert)
 
-    server = Server(configuration.conf.get("ldap", "uri"),
+    server = Server(conf.get("ldap", "uri"),
                     use_ssl=True,
                     tls=tls_configuration)
 
@@ -100,7 +100,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam
 def groups_user(conn, search_base, user_filter, user_name_att, username):
     search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, username)
     try:
-        memberof_attr = configuration.conf.get("ldap", "group_member_attr")
+        memberof_attr = conf.get("ldap", "group_member_attr")
     except Exception:
         memberof_attr = "memberOf"
     res = conn.search(search_base, search_filter, attributes=[memberof_attr])
@@ -135,13 +135,13 @@ class LdapUser(models.User):
         self.ldap_groups = []
 
         # Load and cache superuser and data_profiler settings.
-        conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
-                                   configuration.conf.get("ldap", "bind_password"))
+        conn = get_ldap_connection(conf.get("ldap", "bind_user"),
+                                   conf.get("ldap", "bind_password"))
 
         superuser_filter = None
         data_profiler_filter = None
         try:
-            superuser_filter = configuration.conf.get("ldap", "superuser_filter")
+            superuser_filter = conf.get("ldap", "superuser_filter")
         except AirflowConfigException:
             pass
 
@@ -150,14 +150,14 @@ class LdapUser(models.User):
             log.debug("Missing configuration for superuser settings or empty. Skipping.")
         else:
             self.superuser = group_contains_user(conn,
-                                                 configuration.conf.get("ldap", "basedn"),
+                                                 conf.get("ldap", "basedn"),
                                                  superuser_filter,
-                                                 configuration.conf.get("ldap",
-                                                                        "user_name_attr"),
+                                                 conf.get("ldap",
+                                                          "user_name_attr"),
                                                  user.username)
 
         try:
-            data_profiler_filter = configuration.conf.get("ldap", "data_profiler_filter")
+            data_profiler_filter = conf.get("ldap", "data_profiler_filter")
         except AirflowConfigException:
             pass
 
@@ -168,10 +168,10 @@ class LdapUser(models.User):
         else:
             self.data_profiler = group_contains_user(
                 conn,
-                configuration.conf.get("ldap", "basedn"),
+                conf.get("ldap", "basedn"),
                 data_profiler_filter,
-                configuration.conf.get("ldap",
-                                       "user_name_attr"),
+                conf.get("ldap",
+                         "user_name_attr"),
                 user.username
             )
 
@@ -179,9 +179,9 @@ class LdapUser(models.User):
         try:
             self.ldap_groups = groups_user(
                 conn,
-                configuration.conf.get("ldap", "basedn"),
-                configuration.conf.get("ldap", "user_filter"),
-                configuration.conf.get("ldap", "user_name_attr"),
+                conf.get("ldap", "basedn"),
+                conf.get("ldap", "user_filter"),
+                conf.get("ldap", "user_name_attr"),
                 user.username
             )
         except AirflowConfigException:
@@ -189,25 +189,25 @@ class LdapUser(models.User):
 
     @staticmethod
     def try_login(username, password):
-        conn = get_ldap_connection(configuration.conf.get("ldap", "bind_user"),
-                                   configuration.conf.get("ldap", "bind_password"))
+        conn = get_ldap_connection(conf.get("ldap", "bind_user"),
+                                   conf.get("ldap", "bind_password"))
 
         search_filter = "(&({0})({1}={2}))".format(
-            configuration.conf.get("ldap", "user_filter"),
-            configuration.conf.get("ldap", "user_name_attr"),
+            conf.get("ldap", "user_filter"),
+            conf.get("ldap", "user_name_attr"),
             username
         )
 
         search_scope = LEVEL
-        if configuration.conf.has_option("ldap", "search_scope"):
-            if configuration.conf.get("ldap", "search_scope") == "SUBTREE":
+        if conf.has_option("ldap", "search_scope"):
+            if conf.get("ldap", "search_scope") == "SUBTREE":
                 search_scope = SUBTREE
             else:
                 search_scope = LEVEL
 
         # todo: BASE or ONELEVEL?
 
-        res = conn.search(configuration.conf.get("ldap", "basedn"), search_filter, search_scope=search_scope)
+        res = conn.search(conf.get("ldap", "basedn"), search_filter, search_scope=search_scope)
 
         # todo: use list or result?
         if not res:
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 572f1e385d..c2177c1974 100644
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -26,7 +26,7 @@ import re
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 from airflow.models import TaskInstance
@@ -174,7 +174,7 @@ class QuboleHook(BaseHook):
         if fp is None:
             iso = datetime.datetime.utcnow().isoformat()
             logpath = os.path.expanduser(
-                configuration.conf.get('core', 'BASE_LOG_FOLDER')
+                conf.get('core', 'BASE_LOG_FOLDER')
             )
             resultpath = logpath + '/' + self.dag_id + '/' + self.task_id + '/results'
             pathlib.Path(resultpath).mkdir(parents=True, exist_ok=True)
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index 63bf249645..bddea63bcc 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -20,7 +20,7 @@
 from base64 import b64encode
 from select import select
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -152,7 +152,7 @@ class SSHOperator(BaseOperator):
 
                 exit_status = stdout.channel.recv_exit_status()
                 if exit_status == 0:
-                    enable_pickling = configuration.conf.getboolean(
+                    enable_pickling = conf.getboolean(
                         'core', 'enable_xcom_pickling'
                     )
                     if enable_pickling:
diff --git a/airflow/contrib/operators/winrm_operator.py b/airflow/contrib/operators/winrm_operator.py
index 03acb358b1..7bf1bab935 100644
--- a/airflow/contrib/operators/winrm_operator.py
+++ b/airflow/contrib/operators/winrm_operator.py
@@ -22,7 +22,7 @@ import logging
 
 from winrm.exceptions import WinRMOperationTimeoutError
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.contrib.hooks.winrm_hook import WinRMHook
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -124,7 +124,7 @@ class WinRMOperator(BaseOperator):
 
         if return_code == 0:
             # returning output if do_xcom_push is set
-            enable_pickling = configuration.conf.getboolean(
+            enable_pickling = conf.getboolean(
                 'core', 'enable_xcom_pickling'
             )
             if enable_pickling:
diff --git a/airflow/contrib/utils/log/task_handler_with_custom_formatter.py b/airflow/contrib/utils/log/task_handler_with_custom_formatter.py
index 3fd690ccd8..af29502126 100644
--- a/airflow/contrib/utils/log/task_handler_with_custom_formatter.py
+++ b/airflow/contrib/utils/log/task_handler_with_custom_formatter.py
@@ -20,7 +20,7 @@
 import logging
 
 from logging import StreamHandler
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.utils.helpers import parse_template_string
 
 
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 5196883421..258682b790 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -20,7 +20,7 @@
 from typing import Optional
 import sys
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow.executors.local_executor import LocalExecutor
@@ -44,7 +44,7 @@ def get_default_executor():
     if DEFAULT_EXECUTOR is not None:
         return DEFAULT_EXECUTOR
 
-    executor_name = configuration.conf.get('core', 'EXECUTOR')
+    executor_name = conf.get('core', 'EXECUTOR')
 
     DEFAULT_EXECUTOR = _get_executor(executor_name)
 
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index a8856bb93c..506713a741 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -21,12 +21,12 @@ from collections import OrderedDict
 
 # To avoid circular imports
 import airflow.utils.dag_processing
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
-PARALLELISM = configuration.conf.getint('core', 'PARALLELISM')
+PARALLELISM = conf.getint('core', 'PARALLELISM')
 
 
 class BaseExecutor(LoggingMixin):
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 070593998a..8558918ee1 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -27,7 +27,7 @@ from multiprocessing import Pool, cpu_count
 from celery import Celery
 from celery import states as celery_states
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
@@ -45,15 +45,15 @@ To start the celery worker, run the command:
 airflow worker
 '''
 
-if configuration.conf.has_option('celery', 'celery_config_options'):
+if conf.has_option('celery', 'celery_config_options'):
     celery_configuration = import_string(
-        configuration.conf.get('celery', 'celery_config_options')
+        conf.get('celery', 'celery_config_options')
     )
 else:
     celery_configuration = DEFAULT_CELERY_CONFIG
 
 app = Celery(
-    configuration.conf.get('celery', 'CELERY_APP_NAME'),
+    conf.get('celery', 'CELERY_APP_NAME'),
     config_source=celery_configuration)
 
 
@@ -141,7 +141,7 @@ class CeleryExecutor(BaseExecutor):
         # (which can become a bottleneck on bigger clusters) so we use
         # a multiprocessing pool to speed this up.
         # How many worker processes are created for checking celery task state.
-        self._sync_parallelism = configuration.getint('celery', 'SYNC_PARALLELISM')
+        self._sync_parallelism = conf.getint('celery', 'SYNC_PARALLELISM')
         if self._sync_parallelism == 0:
             self._sync_parallelism = max(1, cpu_count() - 1)
 
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 41cf110a47..2ddf2ec311 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -21,7 +21,7 @@ import distributed
 import subprocess
 import warnings
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.executors.base_executor import BaseExecutor
 
 
@@ -31,15 +31,15 @@ class DaskExecutor(BaseExecutor):
     """
     def __init__(self, cluster_address=None):
         if cluster_address is None:
-            cluster_address = configuration.conf.get('dask', 'cluster_address')
+            cluster_address = conf.get('dask', 'cluster_address')
         if not cluster_address:
             raise ValueError(
                 'Please provide a Dask cluster address in airflow.cfg')
         self.cluster_address = cluster_address
         # ssl / tls parameters
-        self.tls_ca = configuration.get('dask', 'tls_ca')
-        self.tls_key = configuration.get('dask', 'tls_key')
-        self.tls_cert = configuration.get('dask', 'tls_cert')
+        self.tls_ca = conf.get('dask', 'tls_ca')
+        self.tls_key = conf.get('dask', 'tls_key')
+        self.tls_cert = conf.get('dask', 'tls_cert')
         super().__init__(parallelism=0)
 
     def start(self):
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index d20eeb2aae..6ebd0495ce 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -27,7 +27,6 @@ from uuid import uuid4
 import kubernetes
 from kubernetes import watch, client
 from kubernetes.client.rest import ApiException
-from airflow.configuration import conf
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.worker_configuration import WorkerConfiguration
@@ -36,7 +35,8 @@ from airflow.executors import Executors
 from airflow.models import KubeResourceVersion, KubeWorkerIdentifier, TaskInstance
 from airflow.utils.state import State
 from airflow.utils.db import provide_session, create_session
-from airflow import configuration, settings
+from airflow import settings
+from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -127,24 +127,24 @@ class KubeConfig:
     kubernetes_section = 'kubernetes'
 
     def __init__(self):
-        configuration_dict = configuration.as_dict(display_sensitive=True)
+        configuration_dict = conf.as_dict(display_sensitive=True)
         self.core_configuration = configuration_dict['core']
         self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
         self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {})
-        self.env_from_configmap_ref = configuration.get(self.kubernetes_section,
-                                                        'env_from_configmap_ref')
-        self.env_from_secret_ref = configuration.get(self.kubernetes_section,
-                                                     'env_from_secret_ref')
+        self.env_from_configmap_ref = conf.get(self.kubernetes_section,
+                                               'env_from_configmap_ref')
+        self.env_from_secret_ref = conf.get(self.kubernetes_section,
+                                            'env_from_secret_ref')
         self.airflow_home = settings.AIRFLOW_HOME
-        self.dags_folder = configuration.get(self.core_section, 'dags_folder')
-        self.parallelism = configuration.getint(self.core_section, 'parallelism')
-        self.worker_container_repository = configuration.get(
+        self.dags_folder = conf.get(self.core_section, 'dags_folder')
+        self.parallelism = conf.getint(self.core_section, 'parallelism')
+        self.worker_container_repository = conf.get(
             self.kubernetes_section, 'worker_container_repository')
-        self.worker_container_tag = configuration.get(
+        self.worker_container_tag = conf.get(
             self.kubernetes_section, 'worker_container_tag')
         self.kube_image = '{}:{}'.format(
             self.worker_container_repository, self.worker_container_tag)
-        self.kube_image_pull_policy = configuration.get(
+        self.kube_image_pull_policy = conf.get(
             self.kubernetes_section, "worker_container_image_pull_policy"
         )
         self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {})
@@ -216,7 +216,7 @@ class KubeConfig:
         self.logs_volume_host = conf.get(self.kubernetes_section, 'logs_volume_host')
 
         # This prop may optionally be set for PV Claims and is used to write logs
-        self.base_log_folder = configuration.get(self.core_section, 'base_log_folder')
+        self.base_log_folder = conf.get(self.core_section, 'base_log_folder')
 
         # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note
         # that if your
@@ -277,7 +277,7 @@ class KubeConfig:
     # pod security context items should return integers
     # and only return a blank string if contexts are not set.
     def _get_security_context_val(self, scontext):
-        val = configuration.get(self.kubernetes_section, scontext)
+        val = conf.get(self.kubernetes_section, scontext)
         if len(val) == 0:
             return val
         else:
diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py
index 8dbd5e7030..f9b3bebcc0 100644
--- a/airflow/hooks/hdfs_hook.py
+++ b/airflow/hooks/hdfs_hook.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """Hook for HDFS operations"""
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 
@@ -64,7 +64,7 @@ class HDFSHook(BaseHook):
         # take the first.
         effective_user = self.proxy_user
         autoconfig = self.autoconfig
-        use_sasl = configuration.conf.get('core', 'security') == 'kerberos'
+        use_sasl = conf.get('core', 'security') == 'kerberos'
 
         try:
             connections = self.get_connections(self.hdfs_conn_id)
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 7708b4bd67..8308d14082 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -28,7 +28,7 @@ from tempfile import NamedTemporaryFile
 
 import unicodecsv as csv
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.security import utils
@@ -97,8 +97,8 @@ class HiveCliHook(BaseHook):
                     "Invalid Mapred Queue Priority.  Valid values are: "
                     "{}".format(', '.join(HIVE_QUEUE_PRIORITIES)))
 
-        self.mapred_queue = mapred_queue or configuration.get('hive',
-                                                              'default_hive_mapred_queue')
+        self.mapred_queue = mapred_queue or conf.get('hive',
+                                                     'default_hive_mapred_queue')
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
 
@@ -129,7 +129,7 @@ class HiveCliHook(BaseHook):
             hive_bin = 'beeline'
             jdbc_url = "jdbc:hive2://{host}:{port}/{schema}".format(
                 host=conn.host, port=conn.port, schema=conn.schema)
-            if configuration.conf.get('core', 'security') == 'kerberos':
+            if conf.get('core', 'security') == 'kerberos':
                 template = conn.extra_dejson.get(
                     'principal', "hive/_HOST@EXAMPLE.COM")
                 if "_HOST" in template:
@@ -511,13 +511,13 @@ class HiveMetastoreHook(BaseHook):
 
         auth_mechanism = ms.extra_dejson.get('authMechanism', 'NOSASL')
 
-        if configuration.conf.get('core', 'security') == 'kerberos':
+        if conf.get('core', 'security') == 'kerberos':
             auth_mechanism = ms.extra_dejson.get('authMechanism', 'GSSAPI')
             kerberos_service_name = ms.extra_dejson.get('kerberos_service_name', 'hive')
 
         conn_socket = TSocket.TSocket(ms.host, ms.port)
 
-        if configuration.conf.get('core', 'security') == 'kerberos' \
+        if conf.get('core', 'security') == 'kerberos' \
                 and auth_mechanism == 'GSSAPI':
             try:
                 import saslwrapper as sasl
@@ -798,7 +798,7 @@ class HiveServer2Hook(BaseHook):
             # we need to give a username
             username = 'airflow'
         kerberos_service_name = None
-        if configuration.conf.get('core', 'security') == 'kerberos':
+        if conf.get('core', 'security') == 'kerberos':
             auth_mechanism = db.extra_dejson.get('authMechanism', 'KERBEROS')
             kerberos_service_name = db.extra_dejson.get('kerberos_service_name', 'hive')
 
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index 73314dc14e..1998f56ed6 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -19,12 +19,12 @@
 """Hook for Web HDFS"""
 from hdfs import InsecureClient, HdfsError
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.utils.log.logging_mixin import LoggingMixin
 
-_kerberos_security_mode = configuration.conf.get("core", "security") == "kerberos"
+_kerberos_security_mode = conf.get("core", "security") == "kerberos"
 if _kerberos_security_mode:
     try:
         from hdfs.ext.kerberos import KerberosClient  # pylint: disable=ungrouped-imports
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 4d5e5e1d43..12b051e6ed 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -26,7 +26,7 @@ from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from typing import Optional
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow import executors, models
 from airflow.exceptions import (
     AirflowException,
diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index c0f2b96420..c34bc58d7f 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -22,7 +22,7 @@ import os
 import signal
 import time
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.stats import Stats
 from airflow.task.task_runner import get_task_runner
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 83ad2ef411..5f2e63a843 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -34,7 +34,7 @@ from setproctitle import setproctitle
 from sqlalchemy import and_, func, not_, or_
 from sqlalchemy.orm.session import make_transient
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow import executors, models, settings
 from airflow.exceptions import AirflowException
 from airflow.jobs.base_job import BaseJob
diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
index e3f7e8489d..f444139f00 100644
--- a/airflow/lineage/__init__.py
+++ b/airflow/lineage/__init__.py
@@ -18,7 +18,7 @@
 # under the License.
 from functools import wraps
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.lineage.datasets import DataSet
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.module_loading import import_string
diff --git a/airflow/lineage/backend/atlas/__init__.py b/airflow/lineage/backend/atlas/__init__.py
index 0f626a0844..8e0dfc5dc3 100644
--- a/airflow/lineage/backend/atlas/__init__.py
+++ b/airflow/lineage/backend/atlas/__init__.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.lineage import datasets
 from airflow.lineage.backend import LineageBackend
 from airflow.lineage.backend.atlas.typedefs import operator_typedef
diff --git a/airflow/logging_config.py b/airflow/logging_config.py
index 0cf8f4db0d..c835098274 100644
--- a/airflow/logging_config.py
+++ b/airflow/logging_config.py
@@ -21,7 +21,7 @@ import logging
 import warnings
 from logging.config import dictConfig
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.utils.module_loading import import_string
 
diff --git a/airflow/models/base.py b/airflow/models/base.py
index 97c6b77798..3dacb8adce 100644
--- a/airflow/models/base.py
+++ b/airflow/models/base.py
@@ -21,9 +21,9 @@ from typing import Any
 from sqlalchemy import MetaData
 from sqlalchemy.ext.declarative import declarative_base
 
-import airflow
+from airflow.configuration import conf
 
-SQL_ALCHEMY_SCHEMA = airflow.configuration.get("core", "SQL_ALCHEMY_SCHEMA")
+SQL_ALCHEMY_SCHEMA = conf.get("core", "SQL_ALCHEMY_SCHEMA")
 
 metadata = (
     None
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 51753b573f..951cab4151 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -29,7 +29,8 @@ from typing import Callable, Dict, Iterable, List, Optional, Set, Any
 
 import jinja2
 
-from airflow import configuration, settings
+from airflow import settings
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.lineage import prepare_lineage, apply_lineage, DataSet
 from airflow.models.dag import DAG
@@ -261,7 +262,7 @@ class BaseOperator(LoggingMixin):
     def __init__(
         self,
         task_id: str,
-        owner: str = configuration.conf.get('operators', 'DEFAULT_OWNER'),
+        owner: str = conf.get('operators', 'DEFAULT_OWNER'),
         email: Optional[str] = None,
         email_on_retry: bool = True,
         email_on_failure: bool = True,
@@ -279,7 +280,7 @@ class BaseOperator(LoggingMixin):
         default_args: Optional[Dict] = None,
         priority_weight: int = 1,
         weight_rule: str = WeightRule.DOWNSTREAM,
-        queue: str = configuration.conf.get('celery', 'default_queue'),
+        queue: str = conf.get('celery', 'default_queue'),
         pool: str = Pool.DEFAULT_POOL_NAME,
         sla: Optional[timedelta] = None,
         execution_timeout: Optional[timedelta] = None,
@@ -348,7 +349,7 @@ class BaseOperator(LoggingMixin):
             )
         self._schedule_interval = schedule_interval
         self.retries = retries if retries is not None else \
-            configuration.conf.getint('core', 'default_task_retries', fallback=0)
+            conf.getint('core', 'default_task_retries', fallback=0)
         self.queue = queue
         self.pool = pool
         self.sla = sla
diff --git a/airflow/models/crypto.py b/airflow/models/crypto.py
index 327ef4f837..8ba656e52f 100644
--- a/airflow/models/crypto.py
+++ b/airflow/models/crypto.py
@@ -19,7 +19,7 @@
 
 from airflow.typing import Protocol
 from typing import Optional
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -87,7 +87,7 @@ def get_fernet():
         return _fernet
 
     try:
-        fernet_key = configuration.conf.get('core', 'FERNET_KEY')
+        fernet_key = conf.get('core', 'FERNET_KEY')
         if not fernet_key:
             log.warning(
                 "empty cryptography key - values will not be stored encrypted."
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1df5f2bb1d..419525ae94 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -35,7 +35,8 @@ from croniter import croniter
 from dateutil.relativedelta import relativedelta
 from sqlalchemy import Column, String, Boolean, Integer, Text, func, or_
 
-from airflow import configuration, settings, utils
+from airflow import settings, utils
+from airflow.configuration import conf
 from airflow.dag.base_dag import BaseDag
 from airflow.exceptions import AirflowException, AirflowDagCycleException
 from airflow.executors import LocalExecutor, get_default_executor
@@ -208,13 +209,13 @@ class DAG(BaseDag, LoggingMixin):
         user_defined_macros: Optional[Dict] = None,
         user_defined_filters: Optional[Dict] = None,
         default_args: Optional[Dict] = None,
-        concurrency: int = configuration.conf.getint('core', 'dag_concurrency'),
-        max_active_runs: int = configuration.conf.getint('core', 'max_active_runs_per_dag'),
+        concurrency: int = conf.getint('core', 'dag_concurrency'),
+        max_active_runs: int = conf.getint('core', 'max_active_runs_per_dag'),
         dagrun_timeout: Optional[timedelta] = None,
         sla_miss_callback: Optional[Callable] = None,
         default_view: Optional[str] = None,
-        orientation: str = configuration.conf.get('webserver', 'dag_orientation'),
-        catchup: bool = configuration.conf.getboolean('scheduler', 'catchup_by_default'),
+        orientation: str = conf.get('webserver', 'dag_orientation'),
+        catchup: bool = conf.getboolean('scheduler', 'catchup_by_default'),
         on_success_callback: Optional[Callable] = None,
         on_failure_callback: Optional[Callable] = None,
         doc_md: Optional[str] = None,
@@ -360,7 +361,7 @@ class DAG(BaseDag, LoggingMixin):
     def get_default_view(self):
         """This is only there for backward compatible jinja2 templates"""
         if self._default_view is None:
-            return configuration.conf.get('webserver', 'dag_default_view').lower()
+            return conf.get('webserver', 'dag_default_view').lower()
         else:
             return self._default_view
 
@@ -1202,7 +1203,7 @@ class DAG(BaseDag, LoggingMixin):
             mark_success=False,
             local=False,
             executor=None,
-            donot_pickle=configuration.conf.getboolean('core', 'donot_pickle'),
+            donot_pickle=conf.getboolean('core', 'donot_pickle'),
             ignore_task_deps=False,
             ignore_first_depends_on_past=False,
             pool=None,
@@ -1493,7 +1494,7 @@ class DagModel(Base):
     dag_id = Column(String(ID_LEN), primary_key=True)
     # A DAG can be paused from the UI / DB
     # Set this default value of is_paused based on a configuration value!
-    is_paused_at_creation = configuration.conf\
+    is_paused_at_creation = conf\
         .getboolean('core',
                     'dags_are_paused_at_creation')
     is_paused = Column(Boolean, default=is_paused_at_creation)
@@ -1545,7 +1546,7 @@ class DagModel(Base):
 
     def get_default_view(self):
         if self.default_view is None:
-            return configuration.conf.get('webserver', 'dag_default_view').lower()
+            return conf.get('webserver', 'dag_default_view').lower()
         else:
             return self.default_view
 
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 2edfb94639..b3ff663c94 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -30,7 +30,8 @@ from datetime import datetime, timedelta
 from croniter import croniter, CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError
 from sqlalchemy import or_
 
-from airflow import configuration, settings
+from airflow import settings
+from airflow.configuration import conf
 from airflow.dag.base_dag import BaseDagBag
 from airflow.exceptions import AirflowDagCycleException
 from airflow.executors import get_default_executor
@@ -76,8 +77,8 @@ class DagBag(BaseDagBag, LoggingMixin):
             self,
             dag_folder=None,
             executor=None,
-            include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'),
-            safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
+            include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
+            safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
 
         # do not use default arg in signature, to fix import cycle on plugin load
         if executor is None:
@@ -193,7 +194,7 @@ class DagBag(BaseDagBag, LoggingMixin):
             if mod_name in sys.modules:
                 del sys.modules[mod_name]
 
-            with timeout(configuration.conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
+            with timeout(conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
                 try:
                     m = imp.load_source(mod_name, filepath)
                     mods.append(m)
@@ -283,7 +284,7 @@ class DagBag(BaseDagBag, LoggingMixin):
 
         # How many seconds do we wait for tasks to heartbeat before mark them as zombies.
         zombie_threshold_secs = (
-            configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
+            conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
         limit_dttm = timezone.utcnow() - timedelta(
             seconds=zombie_threshold_secs)
         self.log.debug("Failing jobs without heartbeat after %s", limit_dttm)
@@ -303,7 +304,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         for ti in tis:
             self.log.info("Detected zombie job with dag_id %s, task_id %s, and execution date %s",
                           ti.dag_id, ti.task_id, ti.execution_date.isoformat())
-            ti.test_mode = configuration.getboolean('core', 'unit_test_mode')
+            ti.test_mode = conf.getboolean('core', 'unit_test_mode')
             ti.task = self.dags[ti.dag_id].get_task(ti.task_id)
             ti.handle_failure("{} detected as zombie".format(ti),
                               ti.test_mode, ti.get_template_context())
@@ -351,8 +352,8 @@ class DagBag(BaseDagBag, LoggingMixin):
             self,
             dag_folder=None,
             only_if_updated=True,
-            include_examples=configuration.conf.getboolean('core', 'LOAD_EXAMPLES'),
-            safe_mode=configuration.conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
+            include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
+            safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE')):
         """
         Given a file path or a folder, this method looks for python modules,
         imports them and adds them to the dagbag collection.
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index d6ef4097a1..f58ce863d2 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -37,9 +37,10 @@ from sqlalchemy import Column, Float, Index, Integer, PickleType, String, func
 from sqlalchemy.orm import reconstructor
 from sqlalchemy.orm.session import Session
 
-from airflow import configuration, settings
 from airflow.exceptions import (AirflowException, AirflowRescheduleException,
                                 AirflowSkipException, AirflowTaskTimeout)
+from airflow import settings
+from airflow.configuration import conf
 from airflow.models.base import Base, ID_LEN
 from airflow.models.log import Log
 from airflow.models.pool import Pool
@@ -367,14 +368,14 @@ class TaskInstance(Base, LoggingMixin):
     @property
     def log_filepath(self):
         iso = self.execution_date.isoformat()
-        log = os.path.expanduser(configuration.conf.get('core', 'BASE_LOG_FOLDER'))
+        log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
         return ("{log}/{dag_id}/{task_id}/{iso}.log".format(
             log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))
 
     @property
     def log_url(self):
         iso = quote(self.execution_date.isoformat())
-        base_url = configuration.conf.get('webserver', 'BASE_URL')
+        base_url = conf.get('webserver', 'BASE_URL')
         return base_url + (
             "/log?"
             "execution_date={iso}"
@@ -385,7 +386,7 @@ class TaskInstance(Base, LoggingMixin):
     @property
     def mark_success_url(self):
         iso = quote(self.execution_date.isoformat())
-        base_url = configuration.conf.get('webserver', 'BASE_URL')
+        base_url = conf.get('webserver', 'BASE_URL')
         return base_url + (
             "/success"
             "?task_id={task_id}"
@@ -1159,7 +1160,7 @@ class TaskInstance(Base, LoggingMixin):
         if task.params:
             params.update(task.params)
 
-        if configuration.getboolean('core', 'dag_run_conf_overrides_params'):
+        if conf.getboolean('core', 'dag_run_conf_overrides_params'):
             self.overwrite_params_with_dag_run_conf(params=params, dag_run=dag_run)
 
         class VariableAccessor:
@@ -1193,7 +1194,7 @@ class TaskInstance(Base, LoggingMixin):
                 return str(self.var)
 
         return {
-            'conf': configuration,
+            'conf': conf,
             'dag': task.dag,
             'dag_run': dag_run,
             'ds': ds,
@@ -1269,8 +1270,8 @@ class TaskInstance(Base, LoggingMixin):
         )
 
         def render(key, content):
-            if configuration.has_option('email', key):
-                path = configuration.get('email', key)
+            if conf.has_option('email', key):
+                path = conf.get('email', key)
                 with open(path) as file:
                     content = file.read()
 
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 034b69bc6e..eb7ced6137 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -23,7 +23,7 @@ import pickle
 from sqlalchemy import Column, Integer, String, Index, LargeBinary, and_
 from sqlalchemy.orm import reconstructor
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.models.base import Base, ID_LEN
 from airflow.utils import timezone
 from airflow.utils.db import provide_session
@@ -65,7 +65,7 @@ class XCom(Base, LoggingMixin):
     """
     @reconstructor
     def init_on_load(self):
-        enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
+        enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
         if enable_pickling:
             self.value = pickle.loads(self.value)
         else:
@@ -157,7 +157,7 @@ class XCom(Base, LoggingMixin):
 
         result = query.first()
         if result:
-            enable_pickling = configuration.getboolean('core', 'enable_xcom_pickling')
+            enable_pickling = conf.getboolean('core', 'enable_xcom_pickling')
             if enable_pickling:
                 return pickle.loads(result.value)
             else:
@@ -222,7 +222,7 @@ class XCom(Base, LoggingMixin):
     def serialize_value(value):
         # TODO: "pickling" has been deprecated and JSON is preferred.
         # "pickling" will be removed in Airflow 2.0.
-        if configuration.getboolean('core', 'enable_xcom_pickling'):
+        if conf.getboolean('core', 'enable_xcom_pickling'):
             return pickle.dumps(value)
 
         try:
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 16e6a9325f..dbefb96144 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -21,7 +21,7 @@ import re
 from typing import Dict
 
 from airflow.hooks.hive_hooks import HiveCliHook
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.operator_helpers import context_to_airflow_vars
@@ -95,8 +95,8 @@ class HiveOperator(BaseOperator):
         self.mapred_queue = mapred_queue
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
-        self.mapred_job_name_template = configuration.get('hive',
-                                                          'mapred_job_name_template')
+        self.mapred_job_name_template = conf.get('hive',
+                                                 'mapred_job_name_template')
 
         # assigned lazily - just for consistency we can create the attribute with a
         # `None` initial value, later it will be populated by the execute method.
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index bd79027c3e..1c891bef38 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -21,7 +21,8 @@ import subprocess
 import sys
 import time
 
-from airflow import configuration, LoggingMixin
+from airflow import LoggingMixin
+from airflow.configuration import conf
 
 NEED_KRB181_WORKAROUND = None  # type: Optional[bool]
 
@@ -32,18 +33,18 @@ def renew_from_kt(principal, keytab):
     # The config is specified in seconds. But we ask for that same amount in
     # minutes to give ourselves a large renewal buffer.
 
-    renewal_lifetime = "%sm" % configuration.conf.getint('kerberos', 'reinit_frequency')
+    renewal_lifetime = "%sm" % conf.getint('kerberos', 'reinit_frequency')
 
-    cmd_principal = principal or configuration.conf.get('kerberos', 'principal').replace(
+    cmd_principal = principal or conf.get('kerberos', 'principal').replace(
         "_HOST", socket.getfqdn()
     )
 
     cmdv = [
-        configuration.conf.get('kerberos', 'kinit_path'),
+        conf.get('kerberos', 'kinit_path'),
         "-r", renewal_lifetime,
         "-k",  # host ticket
         "-t", keytab,  # specify keytab
-        "-c", configuration.conf.get('kerberos', 'ccache'),  # specify credentials cache
+        "-c", conf.get('kerberos', 'ccache'),  # specify credentials cache
         cmd_principal
     ]
     log.info("Reinitting kerberos from keytab: %s", " ".join(cmdv))
@@ -73,8 +74,8 @@ def renew_from_kt(principal, keytab):
 
 
 def perform_krb181_workaround(principal):
-    cmdv = [configuration.conf.get('kerberos', 'kinit_path'),
-            "-c", configuration.conf.get('kerberos', 'ccache'),
+    cmdv = [conf.get('kerberos', 'kinit_path'),
+            "-c", conf.get('kerberos', 'ccache'),
             "-R"]  # Renew ticket_cache
 
     log.info(
@@ -84,10 +85,10 @@ def perform_krb181_workaround(principal):
     ret = subprocess.call(cmdv, close_fds=True)
 
     if ret != 0:
-        principal = "%s/%s" % (principal or configuration.conf.get('kerberos', 'principal'),
+        principal = "%s/%s" % (principal or conf.get('kerberos', 'principal'),
                                socket.getfqdn())
         princ = principal
-        ccache = configuration.conf.get('kerberos', 'principal')
+        ccache = conf.get('kerberos', 'principal')
         log.error(
             "Couldn't renew kerberos ticket in order to work around Kerberos 1.8.1 issue. Please check that "
             "the ticket for '%s' is still renewable:\n  $ kinit -f -c %s\nIf the 'renew until' date is the "
@@ -104,7 +105,7 @@ def detect_conf_var() -> bool:
     Sun Java Krb5LoginModule in Java6, so we need to take an action to work
     around it.
     """
-    ticket_cache = configuration.conf.get('kerberos', 'ccache')
+    ticket_cache = conf.get('kerberos', 'ccache')
 
     with open(ticket_cache, 'rb') as file:
         # Note: this file is binary, so we check against a bytearray.
@@ -118,4 +119,4 @@ def run(principal, keytab):
 
     while True:
         renew_from_kt(principal, keytab)
-        time.sleep(configuration.conf.getint('kerberos', 'reinit_frequency'))
+        time.sleep(conf.getint('kerberos', 'reinit_frequency'))
diff --git a/airflow/stats.py b/airflow/stats.py
index 7869244343..4d713579f6 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -25,7 +25,7 @@ import string
 import textwrap
 from typing import Any
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.exceptions import InvalidStatsNameException
 
 log = logging.getLogger(__name__)
diff --git a/airflow/task/task_runner/__init__.py b/airflow/task/task_runner/__init__.py
index 960a9e9e8e..0c422013db 100644
--- a/airflow/task/task_runner/__init__.py
+++ b/airflow/task/task_runner/__init__.py
@@ -17,11 +17,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
 from airflow.exceptions import AirflowException
 
-_TASK_RUNNER = configuration.conf.get('core', 'TASK_RUNNER')
+_TASK_RUNNER = conf.get('core', 'TASK_RUNNER')
 
 
 def get_task_runner(local_task_job):
diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py
index 15f0b4f2c5..a46a3bfe19 100644
--- a/airflow/task/task_runner/base_task_runner.py
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -24,7 +24,7 @@ import threading
 
 from airflow.utils.log.logging_mixin import LoggingMixin
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.utils.configuration import tmp_configuration_copy
 
 
diff --git a/airflow/utils/configuration.py b/airflow/utils/configuration.py
index 3ad6bc1c6d..79e5f1bb1f 100644
--- a/airflow/utils/configuration.py
+++ b/airflow/utils/configuration.py
@@ -21,7 +21,7 @@ import os
 import json
 from tempfile import mkstemp
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 
 
 def tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True):
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index ae62d89ea1..c96f574600 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -39,7 +39,7 @@ from tabulate import tabulate
 
 # To avoid circular imports
 import airflow.models
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
 from airflow.models import errors
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index 3057b8b986..46153b0195 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -26,7 +26,7 @@ from email.mime.multipart import MIMEMultipart
 from email.mime.application import MIMEApplication
 from email.utils import formatdate
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
@@ -37,7 +37,7 @@ def send_email(to, subject, html_content,
     """
     Send email using backend specified in EMAIL_BACKEND.
     """
-    path, attr = configuration.conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
+    path, attr = conf.get('email', 'EMAIL_BACKEND').rsplit('.', 1)
     module = importlib.import_module(path)
     backend = getattr(module, attr)
     to = get_email_address_list(to)
@@ -57,7 +57,7 @@ def send_email_smtp(to, subject, html_content, files=None,
 
     >>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
     """
-    smtp_mail_from = configuration.conf.get('smtp', 'SMTP_MAIL_FROM')
+    smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM')
 
     to = get_email_address_list(to)
 
@@ -97,16 +97,16 @@ def send_email_smtp(to, subject, html_content, files=None,
 def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
     log = LoggingMixin().log
 
-    SMTP_HOST = configuration.conf.get('smtp', 'SMTP_HOST')
-    SMTP_PORT = configuration.conf.getint('smtp', 'SMTP_PORT')
-    SMTP_STARTTLS = configuration.conf.getboolean('smtp', 'SMTP_STARTTLS')
-    SMTP_SSL = configuration.conf.getboolean('smtp', 'SMTP_SSL')
+    SMTP_HOST = conf.get('smtp', 'SMTP_HOST')
+    SMTP_PORT = conf.getint('smtp', 'SMTP_PORT')
+    SMTP_STARTTLS = conf.getboolean('smtp', 'SMTP_STARTTLS')
+    SMTP_SSL = conf.getboolean('smtp', 'SMTP_SSL')
     SMTP_USER = None
     SMTP_PASSWORD = None
 
     try:
-        SMTP_USER = configuration.conf.get('smtp', 'SMTP_USER')
-        SMTP_PASSWORD = configuration.conf.get('smtp', 'SMTP_PASSWORD')
+        SMTP_USER = conf.get('smtp', 'SMTP_USER')
+        SMTP_PASSWORD = conf.get('smtp', 'SMTP_PASSWORD')
     except AirflowConfigException:
         log.debug("No user/password found for SMTP, so logging in with no authentication.")
 
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 042253e2d8..96dfb78944 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -35,12 +35,12 @@ import signal
 
 from jinja2 import Template
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 
 # When killing processes, time to wait after issuing a SIGTERM before issuing a
 # SIGKILL.
-DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = configuration.conf.getint(
+DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint(
     'core', 'KILLED_TASK_CLEANUP_TIME'
 )
 
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6c17ac0df8..1e65539fb1 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -21,7 +21,7 @@ import logging
 import os
 import requests
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.configuration import AirflowConfigException
 from airflow.utils.file import mkdirs
 from airflow.utils.helpers import parse_template_string
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 7b04e5f972..c80e53e031 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -21,7 +21,7 @@ import os
 from cached_property import cached_property
 from urllib.parse import urlparse
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -44,7 +44,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
 
     @cached_property
     def hook(self):
-        remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID')
+        remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID')
         try:
             from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
             return GoogleCloudStorageHook(
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index ae8a52975d..848a5bc859 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -20,7 +20,7 @@ import os
 
 from cached_property import cached_property
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
@@ -41,7 +41,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
 
     @cached_property
     def hook(self):
-        remote_conn_id = configuration.conf.get('core', 'REMOTE_LOG_CONN_ID')
+        remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID')
         try:
             from airflow.hooks.S3_hook import S3Hook
             return S3Hook(remote_conn_id)
@@ -164,7 +164,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
                 log,
                 key=remote_log_location,
                 replace=True,
-                encrypt=configuration.conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
+                encrypt=conf.getboolean('core', 'ENCRYPT_S3_LOGS'),
             )
         except Exception:
             self.log.exception('Could not write logs to %s', remote_log_location)
diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py
index 1813fc9b50..d480a86931 100644
--- a/airflow/utils/log/wasb_task_handler.py
+++ b/airflow/utils/log/wasb_task_handler.py
@@ -21,7 +21,7 @@ import shutil
 
 from cached_property import cached_property
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from azure.common import AzureHttpError
@@ -47,7 +47,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
 
     @cached_property
     def hook(self):
-        remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
+        remote_conn_id = conf.get('core', 'REMOTE_LOG_CONN_ID')
         try:
             from airflow.contrib.hooks.wasb_hook import WasbHook
             return WasbHook(remote_conn_id)
diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py
index 0d024eb6e0..08220e1f30 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -17,7 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 
 # Constants for resources (megabytes are the base unit)
@@ -105,10 +105,10 @@ class Resources:
     :type gpus: long
     """
     def __init__(self,
-                 cpus=configuration.conf.getint('operators', 'default_cpus'),
-                 ram=configuration.conf.getint('operators', 'default_ram'),
-                 disk=configuration.conf.getint('operators', 'default_disk'),
-                 gpus=configuration.conf.getint('operators', 'default_gpus')
+                 cpus=conf.getint('operators', 'default_cpus'),
+                 ram=conf.getint('operators', 'default_ram'),
+                 disk=conf.getint('operators', 'default_disk'),
+                 gpus=conf.getint('operators', 'default_gpus')
                  ):
         self.cpus = CpuResource(cpus)
         self.ram = RamResource(ram)
diff --git a/airflow/www/app.py b/airflow/www/app.py
index e0b4b00153..543ac41189 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -30,7 +30,7 @@ from urllib.parse import urlparse
 from werkzeug.middleware.proxy_fix import ProxyFix
 from werkzeug.middleware.dispatcher import DispatcherMiddleware
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow import settings
 from airflow.logging_config import configure_logging
 from airflow.utils.json import AirflowJsonEncoder
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 4211526d25..5a77822c23 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -36,7 +36,7 @@ import flask_appbuilder.models.sqla.filters as fab_sqlafilters
 import sqlalchemy as sqla
 from urllib.parse import urlencode
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.utils import timezone
@@ -57,8 +57,8 @@ DEFAULT_SENSITIVE_VARIABLE_FIELDS = (
 def should_hide_value_for_key(key_name):
     # It is possible via importing variables from file that a key is empty.
     if key_name:
-        config_set = configuration.conf.getboolean('admin',
-                                                   'hide_sensitive_variable_fields')
+        config_set = conf.getboolean('admin',
+                                     'hide_sensitive_variable_fields')
         field_comp = any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS)
         return config_set and field_comp
     return False
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1aae6cbc60..1c12c5aa53 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -46,7 +46,7 @@ from urllib.parse import quote
 from wtforms import SelectField, validators
 
 import airflow
-from airflow import configuration as conf
+from airflow.configuration import conf, AIRFLOW_CONFIG
 from airflow import jobs, models
 from airflow import settings
 from airflow._vendor import nvd3
@@ -1952,10 +1952,10 @@ class ConfigurationView(AirflowBaseView):
     def conf(self):
         raw = request.args.get('raw') == "true"
         title = "Airflow Configuration"
-        subtitle = conf.AIRFLOW_CONFIG
+        subtitle = AIRFLOW_CONFIG
         # Don't show config when expose_config variable is False in airflow config
         if conf.getboolean("webserver", "expose_config"):
-            with open(conf.AIRFLOW_CONFIG, 'r') as file:
+            with open(AIRFLOW_CONFIG, 'r') as file:
                 config = file.read()
             table = [(section, key, value, source)
                      for section, parameters in conf.as_dict(True, True).items()
diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py
index 34cbf22d47..c44ade0e41 100644
--- a/scripts/perf/scheduler_ops_metrics.py
+++ b/scripts/perf/scheduler_ops_metrics.py
@@ -21,7 +21,8 @@ import logging
 import pandas as pd
 import sys
 
-from airflow import configuration, settings
+from airflow import settings
+from airflow.configuration import conf
 from airflow.jobs import SchedulerJob
 from airflow.models import DagBag, DagModel, DagRun, TaskInstance
 from airflow.utils import timezone
@@ -191,7 +192,7 @@ def main():
             logging.error('Specify a positive integer for timeout.')
             sys.exit(1)
 
-    configuration.load_test_config()
+    conf.load_test_config()
 
     set_dags_paused_state(False)
     clear_dag_runs()
diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py
index 0922f03a05..2685cc6694 100644
--- a/tests/api/common/experimental/test_mark_tasks.py
+++ b/tests/api/common/experimental/test_mark_tasks.py
@@ -21,7 +21,8 @@ import unittest
 import time
 from datetime import datetime, timedelta
 
-from airflow import configuration, models
+from airflow import models
+from airflow.configuration import conf
 from airflow.api.common.experimental.mark_tasks import (
     set_state, _create_dagruns, set_dag_run_state_to_success, set_dag_run_state_to_failed,
     set_dag_run_state_to_running)
@@ -251,7 +252,7 @@ class TestMarkTasks(unittest.TestCase):
     # TODO: this skipIf should be removed once a fixing solution is found later
     #       We skip it here because this test case is working with Postgres & SQLite
     #       but not with MySQL
-    @unittest.skipIf('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), "Flaky with MySQL")
+    @unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'), "Flaky with MySQL")
     def test_mark_tasks_subdag(self):
         # set one task to success towards end of scheduled dag runs
         task = self.dag2.get_task("section-1")
diff --git a/tests/contrib/operators/test_s3_to_sftp_operator.py b/tests/contrib/operators/test_s3_to_sftp_operator.py
index c6751cd34f..c6cd036955 100644
--- a/tests/contrib/operators/test_s3_to_sftp_operator.py
+++ b/tests/contrib/operators/test_s3_to_sftp_operator.py
@@ -19,7 +19,7 @@
 
 import unittest
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow import models
 from airflow.contrib.operators.s3_to_sftp_operator import S3ToSFTPOperator
 from airflow.contrib.operators.ssh_operator import SSHOperator
@@ -88,7 +88,7 @@ class TestS3ToSFTPOperator(unittest.TestCase):
     @mock_s3
     def test_s3_to_sftp_operation(self):
         # Setting
-        configuration.conf.set("core", "enable_xcom_pickling", "True")
+        conf.set("core", "enable_xcom_pickling", "True")
         test_remote_file_content = \
             "This is remote file content \n which is also multiline " \
             "another line here \n this is last line. EOF"
diff --git a/tests/contrib/utils/test_task_handler_with_custom_formatter.py b/tests/contrib/utils/test_task_handler_with_custom_formatter.py
index b695db3a8f..e69c434395 100644
--- a/tests/contrib/utils/test_task_handler_with_custom_formatter.py
+++ b/tests/contrib/utils/test_task_handler_with_custom_formatter.py
@@ -25,7 +25,7 @@ from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONF
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.timezone import datetime
 from airflow.utils.log.logging_mixin import set_context
-from airflow import configuration as conf
+from airflow.configuration import conf
 
 DEFAULT_DATE = datetime(2019, 1, 1)
 TASK_LOGGER = 'airflow.task'
diff --git a/tests/core.py b/tests/core.py
index afadbf2c6e..0dfb9ba3d0 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -45,7 +45,7 @@ from pendulum import utcnow
 from airflow import configuration, models
 from airflow import jobs, DAG, utils, settings, exceptions
 from airflow.bin import cli
-from airflow.configuration import AirflowConfigException, run_command
+from airflow.configuration import AirflowConfigException, run_command, conf
 from airflow.exceptions import AirflowException
 from airflow.executors import SequentialExecutor
 from airflow.hooks.base_hook import BaseHook
@@ -364,7 +364,7 @@ class TestCore(unittest.TestCase):
         self.assertIsNone(additional_dag_run)
 
     def test_confirm_unittest_mod(self):
-        self.assertTrue(configuration.conf.get('core', 'unit_test_mode'))
+        self.assertTrue(conf.get('core', 'unit_test_mode'))
 
     def test_pickling(self):
         dp = self.dag.pickle()
@@ -793,13 +793,13 @@ class TestCore(unittest.TestCase):
         self.assertNotIn("{FERNET_KEY}", cfg)
 
     def test_config_use_original_when_original_and_fallback_are_present(self):
-        self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY"))
-        self.assertFalse(configuration.conf.has_option("core", "FERNET_KEY_CMD"))
+        self.assertTrue(conf.has_option("core", "FERNET_KEY"))
+        self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))
 
-        FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
+        FERNET_KEY = conf.get('core', 'FERNET_KEY')
 
         with conf_vars({('core', 'FERNET_KEY_CMD'): 'printf HELLO'}):
-            FALLBACK_FERNET_KEY = configuration.conf.get(
+            FALLBACK_FERNET_KEY = conf.get(
                 "core",
                 "FERNET_KEY"
             )
@@ -807,12 +807,12 @@ class TestCore(unittest.TestCase):
         self.assertEqual(FERNET_KEY, FALLBACK_FERNET_KEY)
 
     def test_config_throw_error_when_original_and_fallback_is_absent(self):
-        self.assertTrue(configuration.conf.has_option("core", "FERNET_KEY"))
-        self.assertFalse(configuration.conf.has_option("core", "FERNET_KEY_CMD"))
+        self.assertTrue(conf.has_option("core", "FERNET_KEY"))
+        self.assertFalse(conf.has_option("core", "FERNET_KEY_CMD"))
 
         with conf_vars({('core', 'fernet_key'): None}):
             with self.assertRaises(AirflowConfigException) as cm:
-                configuration.conf.get("core", "FERNET_KEY")
+                conf.get("core", "FERNET_KEY")
 
         exception = str(cm.exception)
         message = "section/key [core/fernet_key] not found in config"
@@ -824,7 +824,7 @@ class TestCore(unittest.TestCase):
         self.assertNotIn(key, os.environ)
 
         os.environ[key] = value
-        FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
+        FERNET_KEY = conf.get('core', 'FERNET_KEY')
         self.assertEqual(value, FERNET_KEY)
 
         # restore the envvar back to the original state
@@ -836,7 +836,7 @@ class TestCore(unittest.TestCase):
         self.assertNotIn(key, os.environ)
 
         os.environ[key] = value
-        FERNET_KEY = configuration.conf.get('core', 'FERNET_KEY')
+        FERNET_KEY = conf.get('core', 'FERNET_KEY')
         self.assertEqual(value, FERNET_KEY)
 
         # restore the envvar back to the original state
@@ -2226,7 +2226,7 @@ send_email_test = mock.Mock()
 
 class TestEmail(unittest.TestCase):
     def setUp(self):
-        configuration.conf.remove_option('email', 'EMAIL_BACKEND')
+        conf.remove_option('email', 'EMAIL_BACKEND')
 
     @mock.patch('airflow.utils.email.send_email')
     def test_default_backend(self, mock_send_email):
@@ -2246,7 +2246,7 @@ class TestEmail(unittest.TestCase):
 
 class TestEmailSmtp(unittest.TestCase):
     def setUp(self):
-        configuration.conf.set('smtp', 'SMTP_SSL', 'False')
+        conf.set('smtp', 'SMTP_SSL', 'False')
 
     @mock.patch('airflow.utils.email.send_MIME_email')
     def test_send_smtp(self, mock_send_mime):
@@ -2256,11 +2256,11 @@ class TestEmailSmtp(unittest.TestCase):
         utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name])
         self.assertTrue(mock_send_mime.called)
         call_args = mock_send_mime.call_args[0]
-        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
+        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
         self.assertEqual(['to'], call_args[1])
         msg = call_args[2]
         self.assertEqual('subject', msg['Subject'])
-        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
+        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
         filename = 'attachment; filename="' + os.path.basename(attachment.name) + '"'
         self.assertEqual(filename, msg.get_payload()[-1].get('Content-Disposition'))
@@ -2284,11 +2284,11 @@ class TestEmailSmtp(unittest.TestCase):
         utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name], cc='cc', bcc='bcc')
         self.assertTrue(mock_send_mime.called)
         call_args = mock_send_mime.call_args[0]
-        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
+        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
         self.assertEqual(['to', 'cc', 'bcc'], call_args[1])
         msg = call_args[2]
         self.assertEqual('subject', msg['Subject'])
-        self.assertEqual(configuration.conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
+        self.assertEqual(conf.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
         self.assertEqual(2, len(msg.get_payload()))
         self.assertEqual('attachment; filename="' + os.path.basename(attachment.name) + '"',
                          msg.get_payload()[-1].get('Content-Disposition'))
@@ -2303,13 +2303,13 @@ class TestEmailSmtp(unittest.TestCase):
         msg = MIMEMultipart()
         utils.email.send_MIME_email('from', 'to', msg, dryrun=False)
         mock_smtp.assert_called_once_with(
-            configuration.conf.get('smtp', 'SMTP_HOST'),
-            configuration.conf.getint('smtp', 'SMTP_PORT'),
+            conf.get('smtp', 'SMTP_HOST'),
+            conf.getint('smtp', 'SMTP_PORT'),
         )
         self.assertTrue(mock_smtp.return_value.starttls.called)
         mock_smtp.return_value.login.assert_called_once_with(
-            configuration.conf.get('smtp', 'SMTP_USER'),
-            configuration.conf.get('smtp', 'SMTP_PASSWORD'),
+            conf.get('smtp', 'SMTP_USER'),
+            conf.get('smtp', 'SMTP_PASSWORD'),
         )
         mock_smtp.return_value.sendmail.assert_called_once_with('from', 'to', msg.as_string())
         self.assertTrue(mock_smtp.return_value.quit.called)
@@ -2323,8 +2323,8 @@ class TestEmailSmtp(unittest.TestCase):
             utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False)
         self.assertFalse(mock_smtp.called)
         mock_smtp_ssl.assert_called_once_with(
-            configuration.conf.get('smtp', 'SMTP_HOST'),
-            configuration.conf.getint('smtp', 'SMTP_PORT'),
+            conf.get('smtp', 'SMTP_HOST'),
+            conf.getint('smtp', 'SMTP_PORT'),
         )
 
     @mock.patch('smtplib.SMTP_SSL')
@@ -2339,8 +2339,8 @@ class TestEmailSmtp(unittest.TestCase):
             utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False)
         self.assertFalse(mock_smtp_ssl.called)
         mock_smtp.assert_called_once_with(
-            configuration.conf.get('smtp', 'SMTP_HOST'),
-            configuration.conf.getint('smtp', 'SMTP_PORT'),
+            conf.get('smtp', 'SMTP_HOST'),
+            conf.getint('smtp', 'SMTP_PORT'),
         )
         self.assertFalse(mock_smtp.login.called)
 
diff --git a/tests/dags/test_impersonation_custom.py b/tests/dags/test_impersonation_custom.py
index c6d0039ded..c82e2159d1 100644
--- a/tests/dags/test_impersonation_custom.py
+++ b/tests/dags/test_impersonation_custom.py
@@ -47,7 +47,7 @@ def print_today():
 
 
 def check_hive_conf():
-    from airflow import configuration as conf
+    from airflow.configuration import conf
     assert conf.get('hive', 'default_hive_mapred_queue') == 'airflow'
 
 
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 52a0f5ab56..f86ff25e39 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -33,7 +33,7 @@ from parameterized import parameterized
 from airflow.utils.state import State
 from airflow.executors import celery_executor
 
-from airflow import configuration
+from airflow.configuration import conf
 
 # leave this it is used by the test worker
 import celery.contrib.testing.tasks  # noqa: F401 pylint: disable=ungrouped-imports
@@ -45,14 +45,14 @@ def _prepare_test_bodies():
             (url, )
             for url in os.environ['CELERY_BROKER_URLS'].split(',')
         ]
-    return [(configuration.conf.get('celery', 'BROKER_URL'))]
+    return [(conf.get('celery', 'BROKER_URL'))]
 
 
 class TestCeleryExecutor(unittest.TestCase):
 
     @contextlib.contextmanager
     def _prepare_app(self, broker_url=None, execute=None):
-        broker_url = broker_url or configuration.conf.get('celery', 'BROKER_URL')
+        broker_url = broker_url or conf.get('celery', 'BROKER_URL')
         execute = execute or celery_executor.execute_command.__wrapped__
 
         test_config = dict(celery_executor.celery_configuration)
@@ -70,7 +70,7 @@ class TestCeleryExecutor(unittest.TestCase):
                 set_event_loop(None)
 
     @parameterized.expand(_prepare_test_bodies())
-    @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
                      "sqlite is configured with SequentialExecutor")
     def test_celery_integration(self, broker_url):
         with self._prepare_app(broker_url) as app:
@@ -123,7 +123,7 @@ class TestCeleryExecutor(unittest.TestCase):
         self.assertNotIn('success', executor.last_state)
         self.assertNotIn('fail', executor.last_state)
 
-    @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
                      "sqlite is configured with SequentialExecutor")
     def test_error_sending_task(self):
         def fake_execute_command():
diff --git a/tests/executors/test_dask_executor.py b/tests/executors/test_dask_executor.py
index 8e246efb86..08f1ce373b 100644
--- a/tests/executors/test_dask_executor.py
+++ b/tests/executors/test_dask_executor.py
@@ -20,7 +20,7 @@
 import unittest
 from unittest import mock
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.models import DagBag
 from airflow.jobs import BackfillJob
 from airflow.utils import timezone
@@ -41,7 +41,7 @@ try:
 except ImportError:
     SKIP_DASK = True
 
-if 'sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'):
+if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
     SKIP_DASK = True
 
 # Always skip due to issues on python 3 issues
@@ -140,9 +140,9 @@ class TestDaskExecutorTLS(TestBaseDask):
 
             # These use test certs that ship with dask/distributed and should not be
             #  used in production
-            configuration.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem'))
-            configuration.set('dask', 'tls_cert', get_cert('tls-key-cert.pem'))
-            configuration.set('dask', 'tls_key', get_cert('tls-key.pem'))
+            conf.set('dask', 'tls_ca', get_cert('tls-ca-cert.pem'))
+            conf.set('dask', 'tls_cert', get_cert('tls-key-cert.pem'))
+            conf.set('dask', 'tls_key', get_cert('tls-key.pem'))
             try:
                 executor = DaskExecutor(cluster_address=s['address'])
 
@@ -153,9 +153,9 @@ class TestDaskExecutorTLS(TestBaseDask):
                 # and tasks to have completed.
                 executor.client.close()
             finally:
-                configuration.set('dask', 'tls_ca', '')
-                configuration.set('dask', 'tls_key', '')
-                configuration.set('dask', 'tls_cert', '')
+                conf.set('dask', 'tls_ca', '')
+                conf.set('dask', 'tls_key', '')
+                conf.set('dask', 'tls_cert', '')
 
     @unittest.skipIf(SKIP_DASK, 'Dask unsupported by this configuration')
     @mock.patch('airflow.executors.dask_executor.DaskExecutor.sync')
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 606af90bc0..abb35bec5f 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -28,7 +28,7 @@ import sqlalchemy
 from parameterized import parameterized
 
 from airflow import AirflowException, settings
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.bin import cli
 from airflow.exceptions import AirflowTaskTimeout
 from airflow.exceptions import DagConcurrencyLimitReached, NoAvailablePoolSlot, TaskConcurrencyLimitReached
@@ -129,7 +129,7 @@ class TestBackfillJob(unittest.TestCase):
 
         self.assertEqual(State.SUCCESS, dag_run.state)
 
-    @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
                      "concurrent access not supported in sqlite")
     def test_trigger_controller_dag(self):
         dag = self.dagbag.get_dag('example_trigger_controller_dag')
@@ -155,7 +155,7 @@ class TestBackfillJob(unittest.TestCase):
 
         self.assertTrue(task_instances_list.append.called)
 
-    @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('sqlite' in conf.get('core', 'sql_alchemy_conn'),
                      "concurrent access not supported in sqlite")
     def test_backfill_multi_dates(self):
         dag = self.dagbag.get_dag('example_bash_operator')
@@ -209,7 +209,7 @@ class TestBackfillJob(unittest.TestCase):
         session.close()
 
     @unittest.skipIf(
-        "sqlite" in configuration.conf.get("core", "sql_alchemy_conn"),
+        "sqlite" in conf.get("core", "sql_alchemy_conn"),
         "concurrent access not supported in sqlite",
     )
     @parameterized.expand(
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 284b43d43e..2b47829461 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -23,7 +23,7 @@ import time
 import unittest
 
 from airflow import AirflowException, models, settings
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.executors import SequentialExecutor
 from airflow.jobs import LocalTaskJob
 from airflow.models import DAG, TaskInstance as TI
@@ -161,9 +161,9 @@ class TestLocalTaskJob(unittest.TestCase):
                 time2 = heartbeat_records[i]
                 self.assertGreaterEqual((time2 - time1).total_seconds(), job.heartrate)
 
-    @unittest.skipIf('mysql' in configuration.conf.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'),
                      "flaky when run on mysql")
-    @unittest.skipIf('postgresql' in configuration.conf.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('postgresql' in conf.get('core', 'sql_alchemy_conn'),
                      'flaky when run on postgresql')
     def test_mark_success_no_kill(self):
         """
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 70bf0c34b2..68fcdd6c13 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -30,7 +30,7 @@ from parameterized import parameterized
 
 import airflow.example_dags
 from airflow import AirflowException, models, settings
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.executors import BaseExecutor
 from airflow.jobs import BackfillJob, SchedulerJob
 from airflow.models import DAG, DagBag, DagModel, DagRun, Pool, SlaMiss, \
@@ -79,20 +79,17 @@ class TestSchedulerJob(unittest.TestCase):
     @classmethod
     def setUpClass(cls):
         cls.dagbag = DagBag()
-
-        def getboolean(section, key):
-            if section.lower() == 'core' and key.lower() == 'load_examples':
-                return False
-            else:
-                return configuration.conf.getboolean(section, key)
-
-        cls.patcher = mock.patch('airflow.jobs.scheduler_job.conf.getboolean')
-        cls.mock_getboolean = cls.patcher.start()
-        cls.mock_getboolean.side_effect = getboolean
+        cls.old_val = None
+        if conf.has_option('core', 'load_examples'):
+            cls.old_val = conf.get('core', 'load_examples')
+        conf.set('core', 'load_examples', 'false')
 
     @classmethod
     def tearDownClass(cls):
-        cls.patcher.stop()
+        if cls.old_val is not None:
+            conf.set('core', 'load_examples', cls.old_val)
+        else:
+            conf.remove_option('core', 'load_examples')
 
     def test_is_alive(self):
         job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
@@ -2276,7 +2273,7 @@ class TestSchedulerJob(unittest.TestCase):
                          schedule_interval='* * * * *',
                          start_date=six_hours_ago_to_the_hour,
                          catchup=True)
-        default_catchup = configuration.conf.getboolean('scheduler', 'catchup_by_default')
+        default_catchup = conf.getboolean('scheduler', 'catchup_by_default')
         self.assertEqual(default_catchup, True)
         self.assertEqual(dag1.catchup, True)
 
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index d3f7912b31..dd69084f2f 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -21,7 +21,8 @@ import datetime
 import os
 import unittest
 
-from airflow import settings, configuration
+from airflow import settings
+from airflow.configuration import conf
 from airflow.models import DAG, TaskInstance as TI, clear_task_instances, XCom
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils import timezone
@@ -239,7 +240,7 @@ class TestClearTasks(unittest.TestCase):
         dag_id = "test_dag1"
         task_id = "test_task1"
 
-        configuration.set("core", "enable_xcom_pickling", "False")
+        conf.set("core", "enable_xcom_pickling", "False")
 
         XCom.set(key=key,
                  value=json_obj,
@@ -269,7 +270,7 @@ class TestClearTasks(unittest.TestCase):
         dag_id = "test_dag2"
         task_id = "test_task2"
 
-        configuration.set("core", "enable_xcom_pickling", "True")
+        conf.set("core", "enable_xcom_pickling", "True")
 
         XCom.set(key=key,
                  value=json_obj,
@@ -297,7 +298,7 @@ class TestClearTasks(unittest.TestCase):
             def __reduce__(self):
                 return os.system, ("ls -alt",)
 
-        configuration.set("core", "xcom_enable_pickling", "False")
+        conf.set("core", "xcom_enable_pickling", "False")
 
         self.assertRaises(TypeError, XCom.set,
                           key="xcom_test3",
@@ -315,7 +316,7 @@ class TestClearTasks(unittest.TestCase):
         dag_id2 = "test_dag5"
         task_id2 = "test_task5"
 
-        configuration.set("core", "xcom_enable_pickling", "True")
+        conf.set("core", "xcom_enable_pickling", "True")
 
         XCom.set(key=key,
                  value=json_obj,
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f2c6ad34b4..1416d35349 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -29,7 +29,8 @@ from unittest.mock import patch
 
 import pendulum
 
-from airflow import models, settings, configuration
+from airflow import models, settings
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowDagCycleException
 from airflow.models import DAG, DagModel, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
@@ -722,7 +723,7 @@ class TestDag(unittest.TestCase):
         self.assertTrue(orm_dag.is_active)
         self.assertIsNone(orm_dag.default_view)
         self.assertEqual(orm_dag.get_default_view(),
-                         configuration.conf.get('webserver', 'dag_default_view').lower())
+                         conf.get('webserver', 'dag_default_view').lower())
         self.assertEqual(orm_dag.safe_dag_id, 'dag')
 
         orm_subdag = session.query(DagModel).filter(
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 0c343b5694..11a140ace1 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -26,7 +26,8 @@ import unittest
 from unittest.mock import patch, ANY
 from tempfile import mkdtemp, NamedTemporaryFile
 
-from airflow import models, configuration
+from airflow import models
+from airflow.configuration import conf
 from airflow.jobs import LocalTaskJob as LJ
 from airflow.models import DagModel, DagBag, TaskInstance as TI
 from airflow.utils.db import create_session
@@ -625,7 +626,7 @@ class TestDagBag(unittest.TestCase):
             dagbag.kill_zombies()
             mock_ti_handle_failure.assert_called_once_with(
                 ANY,
-                configuration.getboolean('core', 'unit_test_mode'),
+                conf.getboolean('core', 'unit_test_mode'),
                 ANY
             )
 
@@ -635,7 +636,7 @@ class TestDagBag(unittest.TestCase):
         Test that kill zombies calls TI's failure handler with proper context
         """
         zombie_threshold_secs = (
-            configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
+            conf.getint('scheduler', 'scheduler_zombie_task_threshold'))
         dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)
         with create_session() as session:
             session.query(TI).delete()
@@ -657,7 +658,7 @@ class TestDagBag(unittest.TestCase):
             dagbag.kill_zombies()
             mock_ti_handle_failure.assert_called_once_with(
                 ANY,
-                configuration.getboolean('core', 'unit_test_mode'),
+                conf.getboolean('core', 'unit_test_mode'),
                 ANY
             )
 
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 16b5719079..ebcdedb6be 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -27,7 +27,8 @@ import pendulum
 from freezegun import freeze_time
 from parameterized import parameterized, param
 from sqlalchemy.orm.session import Session
-from airflow import models, settings, configuration
+from airflow import models, settings
+from airflow.configuration import conf
 from airflow.contrib.sensors.python_sensor import PythonSensor
 from airflow.exceptions import AirflowException, AirflowSkipException
 from airflow.models import DAG, DagRun, Pool, TaskFail, TaskInstance as TI, TaskReschedule
@@ -1085,8 +1086,8 @@ class TestTaskInstance(unittest.TestCase):
         ti = TI(
             task=task, execution_date=datetime.datetime.now())
 
-        configuration.set('email', 'subject_template', '/subject/path')
-        configuration.set('email', 'html_content_template', '/html_content/path')
+        conf.set('email', 'subject_template', '/subject/path')
+        conf.set('email', 'html_content_template', '/html_content/path')
 
         opener = mock_open(read_data='template: {{ti.task_id}}')
         with patch('airflow.models.taskinstance.open', opener, create=True):
diff --git a/tests/operators/test_hive_operator.py b/tests/operators/test_hive_operator.py
index 7cdd5ff32b..833e5207b2 100644
--- a/tests/operators/test_hive_operator.py
+++ b/tests/operators/test_hive_operator.py
@@ -24,7 +24,8 @@ import unittest
 from unittest import mock
 import nose
 
-from airflow import DAG, configuration, operators
+from airflow import DAG, operators
+from airflow.configuration import conf
 from airflow.models import TaskInstance
 from airflow.operators.hive_operator import HiveOperator
 from airflow.utils import timezone
@@ -93,7 +94,7 @@ class HiveOperatorConfigTest(TestHiveEnvironment):
             dag=self.dag)
 
         # just check that the correct default value in test_default.cfg is used
-        test_config_hive_mapred_queue = configuration.conf.get(
+        test_config_hive_mapred_queue = conf.get(
             'hive',
             'default_hive_mapred_queue'
         )
diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py
index bf15104bd1..c5aeef20aa 100644
--- a/tests/security/test_kerberos.py
+++ b/tests/security/test_kerberos.py
@@ -21,7 +21,7 @@ import os
 import unittest
 from argparse import Namespace
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.security.kerberos import renew_from_kt
 from airflow import LoggingMixin
 from tests.test_utils.config import conf_vars
@@ -31,11 +31,12 @@ from tests.test_utils.config import conf_vars
                  'Skipping Kerberos API tests due to missing KRB5_KTNAME')
 class TestKerberos(unittest.TestCase):
     def setUp(self):
-        if not configuration.conf.has_section("kerberos"):
-            configuration.conf.add_section("kerberos")
-        configuration.conf.set("kerberos", "keytab",
-                               os.environ['KRB5_KTNAME'])
-        keytab_from_cfg = configuration.conf.get("kerberos", "keytab")
+
+        if not conf.has_section("kerberos"):
+            conf.add_section("kerberos")
+        conf.set("kerberos", "keytab",
+                 os.environ['KRB5_KTNAME'])
+        keytab_from_cfg = conf.get("kerberos", "keytab")
         self.args = Namespace(keytab=keytab_from_cfg, principal=None, pid=None,
                               daemon=None, stdout=None, stderr=None, log_file=None)
 
diff --git a/tests/sensors/test_sql_sensor.py b/tests/sensors/test_sql_sensor.py
index 2b607f3bb3..c7987970d7 100644
--- a/tests/sensors/test_sql_sensor.py
+++ b/tests/sensors/test_sql_sensor.py
@@ -20,7 +20,7 @@ from unittest import mock
 import unittest
 
 from airflow import DAG
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.sensors.sql_sensor import SqlSensor
 from airflow.utils.timezone import datetime
@@ -50,7 +50,7 @@ class TestSqlSensor(unittest.TestCase):
             t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     @unittest.skipUnless(
-        'mysql' in configuration.conf.get('core', 'sql_alchemy_conn'), "this is a mysql test")
+        'mysql' in conf.get('core', 'sql_alchemy_conn'), "this is a mysql test")
     def test_sql_sensor_mysql(self):
         t1 = SqlSensor(
             task_id='sql_sensor_check',
@@ -70,7 +70,7 @@ class TestSqlSensor(unittest.TestCase):
         t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     @unittest.skipUnless(
-        'postgresql' in configuration.conf.get('core', 'sql_alchemy_conn'), "this is a postgres test")
+        'postgresql' in conf.get('core', 'sql_alchemy_conn'), "this is a postgres test")
     def test_sql_sensor_postgres(self):
         t1 = SqlSensor(
             task_id='sql_sensor_check',
diff --git a/tests/test_configuration.py b/tests/test_configuration.py
index 94483dd2ec..2f1066ef29 100644
--- a/tests/test_configuration.py
+++ b/tests/test_configuration.py
@@ -26,6 +26,7 @@ from airflow import configuration
 from airflow.configuration import conf, AirflowConfigParser, parameterized_config
 
 import unittest
+from unittest import mock
 
 
 @contextlib.contextmanager
@@ -426,3 +427,10 @@ AIRFLOW_HOME = /root/airflow
                 self.assertEqual(test_conf.get('core', 'task_runner'), 'NotBashTaskRunner')
 
                 self.assertListEqual([], w)
+
+    def test_deprecated_funcs(self):
+        for func in ['load_test_config', 'get', 'getboolean', 'getfloat', 'getint', 'has_option',
+                     'remove_option', 'as_dict', 'set']:
+            with mock.patch('airflow.configuration.{}'.format(func)):
+                with self.assertWarns(DeprecationWarning):
+                    getattr(configuration, func)()
diff --git a/tests/test_logging_config.py b/tests/test_logging_config.py
index 4203920264..2b31e4e40a 100644
--- a/tests/test_logging_config.py
+++ b/tests/test_logging_config.py
@@ -23,9 +23,8 @@ import pathlib
 import sys
 import tempfile
 
-from airflow import configuration as conf
-from airflow.exceptions import AirflowConfigException
-from tests.compat import mock, patch
+from airflow.configuration import conf
+from tests.compat import patch
 from tests.test_utils.config import conf_vars
 
 import unittest
@@ -213,21 +212,12 @@ class TestLoggingSettings(unittest.TestCase):
     # When the key is not available in the configuration
     def test_when_the_config_key_does_not_exists(self):
         from airflow import logging_config
-        conf_get = conf.get
-
-        def side_effect(*args):
-            if args[1] == 'logging_config_class':
-                raise AirflowConfigException
-            else:
-                return conf_get(*args)
-
-        logging_config.conf.get = mock.Mock(side_effect=side_effect)
-
-        with patch.object(logging_config.log, 'debug') as mock_debug:
-            logging_config.configure_logging()
-            mock_debug.assert_any_call(
-                'Could not find key logging_config_class in config'
-            )
+        with conf_vars({('core', 'logging_config_class'): None}):
+            with patch.object(logging_config.log, 'debug') as mock_debug:
+                logging_config.configure_logging()
+                mock_debug.assert_any_call(
+                    'Could not find key logging_config_class in config'
+                )
 
     # Just default
     def test_loading_local_settings_without_logging_config(self):
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index 054672a453..513565ead7 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -26,7 +26,7 @@ import unittest
 from unittest import mock
 from unittest.mock import (MagicMock, PropertyMock)
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow.jobs import DagFileProcessor
 from airflow.utils import timezone
 from airflow.utils.dag_processing import (DagFileProcessorAgent, DagFileProcessorManager,
diff --git a/tests/www/api/experimental/test_kerberos_endpoints.py b/tests/www/api/experimental/test_kerberos_endpoints.py
index 8505c9cbdf..ecd8fedf13 100644
--- a/tests/www/api/experimental/test_kerberos_endpoints.py
+++ b/tests/www/api/experimental/test_kerberos_endpoints.py
@@ -25,7 +25,7 @@ import socket
 
 from datetime import datetime
 
-from airflow import configuration
+from airflow.configuration import conf
 from airflow.api.auth.backend.kerberos_auth import CLIENT_AUTH
 from airflow.www import app as application
 
@@ -35,19 +35,19 @@ from airflow.www import app as application
 class TestApiKerberos(unittest.TestCase):
     def setUp(self):
         try:
-            configuration.conf.add_section("api")
+            conf.add_section("api")
         except Exception:
             pass
-        configuration.conf.set("api",
-                               "auth_backend",
-                               "airflow.api.auth.backend.kerberos_auth")
+        conf.set("api",
+                 "auth_backend",
+                 "airflow.api.auth.backend.kerberos_auth")
         try:
-            configuration.conf.add_section("kerberos")
+            conf.add_section("kerberos")
         except Exception:
             pass
-        configuration.conf.set("kerberos",
-                               "keytab",
-                               os.environ['KRB5_KTNAME'])
+        conf.set("kerberos",
+                 "keytab",
+                 os.environ['KRB5_KTNAME'])
 
         self.app, _ = application.create_app(testing=True)
 
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 85242b26f5..021d9fe8ac 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -37,7 +37,7 @@ from parameterized import parameterized
 from werkzeug.test import Client
 from werkzeug.wrappers import BaseResponse
 
-from airflow import configuration as conf
+from airflow.configuration import conf
 from airflow import models, settings
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
 from airflow.jobs import BaseJob
@@ -1702,7 +1702,7 @@ class TestTriggerDag(TestBase):
         self.assertIn('/trigger?dag_id=example_bash_operator', resp.data.decode('utf-8'))
         self.assertIn("return confirmDeleteDag(this, 'example_bash_operator')", resp.data.decode('utf-8'))
 
-    @unittest.skipIf('mysql' in conf.conf.get('core', 'sql_alchemy_conn'),
+    @unittest.skipIf('mysql' in conf.get('core', 'sql_alchemy_conn'),
                      "flaky when run on mysql")
     def test_trigger_dag_button(self):