[AIRFLOW-7076] Add support for HashiCorp Vault as Secrets Backend (#7741)
This commit is contained in:
Родитель
d027b872ce
Коммит
a44beaf5bd
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,212 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""
|
||||
Objects relating to sourcing connections from Hashicorp Vault
|
||||
"""
|
||||
from typing import List, Optional
|
||||
|
||||
import hvac
|
||||
from cached_property import cached_property
|
||||
from hvac.exceptions import InvalidPath, VaultError
|
||||
|
||||
from airflow import AirflowException
|
||||
from airflow.models import Connection
|
||||
from airflow.secrets import BaseSecretsBackend
|
||||
from airflow.utils.log.logging_mixin import LoggingMixin
|
||||
|
||||
|
||||
class VaultSecrets(BaseSecretsBackend, LoggingMixin):
|
||||
"""
|
||||
Retrieves Connection object from Hashicorp Vault
|
||||
|
||||
Configurable via ``airflow.cfg`` as follows:
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
[secrets]
|
||||
backend = airflow.providers.hashicorp.secrets.vault.VaultSecrets
|
||||
backend_kwargs = {"path":"connections","url":"http://127.0.0.1:8200","mount_point":"airflow"}
|
||||
|
||||
For example, if your keys are under ``connections`` path in ``airflow`` mount_point, this
|
||||
would be accessible if you provide ``{"path": "connections"}`` and request
|
||||
conn_id ``smtp_default``.
|
||||
|
||||
:param connections_path: Specifies the path of the secret to read to get Connections.
|
||||
:type connections_path: str
|
||||
:param url: Base URL for the Vault instance being addressed.
|
||||
:type url: str
|
||||
:param auth_type: Authentication Type for Vault (one of 'token', 'ldap', 'userpass', 'approle',
|
||||
'github', 'gcp). Default is ``token``.
|
||||
:type auth_type: str
|
||||
:param mount_point: The "path" the secret engine was mounted on. (Default: ``secret``)
|
||||
:type mount_point: str
|
||||
:param token: Authentication token to include in requests sent to Vault.
|
||||
(for ``token`` and ``github`` auth_type)
|
||||
:type token: str
|
||||
:param kv_engine_version: Select the version of the engine to run (``1`` or ``2``, default: ``2``)
|
||||
:type kv_engine_version: int
|
||||
:param username: Username for Authentication (for ``ldap`` and ``userpass`` auth_type)
|
||||
:type username: str
|
||||
:param password: Password for Authentication (for ``ldap`` and ``userpass`` auth_type)
|
||||
:type password: str
|
||||
:param role_id: Role ID for Authentication (for ``approle`` auth_type)
|
||||
:type role_id: str
|
||||
:param secret_id: Secret ID for Authentication (for ``approle`` auth_type)
|
||||
:type secret_id: str
|
||||
:param gcp_key_path: Path to GCP Credential JSON file (for ``gcp`` auth_type)
|
||||
:type gcp_key_path: str
|
||||
:param gcp_scopes: Comma-separated string containing GCP scopes (for ``gcp`` auth_type)
|
||||
:type gcp_scopes: str
|
||||
"""
|
||||
def __init__( # pylint: disable=too-many-arguments
|
||||
self,
|
||||
connections_path: str,
|
||||
url: Optional[str] = None,
|
||||
auth_type: str = 'token',
|
||||
mount_point: str = 'secret',
|
||||
kv_engine_version: int = 2,
|
||||
token: Optional[str] = None,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
role_id: Optional[str] = None,
|
||||
secret_id: Optional[str] = None,
|
||||
gcp_key_path: Optional[str] = None,
|
||||
gcp_scopes: Optional[str] = None,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.connections_path = connections_path.rstrip('/')
|
||||
self.url = url
|
||||
self.auth_type = auth_type
|
||||
self.kwargs = kwargs
|
||||
self.token = token
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.role_id = role_id
|
||||
self.secret_id = secret_id
|
||||
self.mount_point = mount_point
|
||||
self.kv_engine_version = kv_engine_version
|
||||
self.gcp_key_path = gcp_key_path
|
||||
self.gcp_scopes = gcp_scopes
|
||||
|
||||
@cached_property
|
||||
def client(self) -> hvac.Client:
|
||||
"""
|
||||
Return an authenticated Hashicorp Vault client
|
||||
"""
|
||||
|
||||
_client = hvac.Client(url=self.url, **self.kwargs)
|
||||
if self.auth_type == "token":
|
||||
if not self.token:
|
||||
raise VaultError("token cannot be None for auth_type='token'")
|
||||
_client.token = self.token
|
||||
elif self.auth_type == "ldap":
|
||||
_client.auth.ldap.login(
|
||||
username=self.username, password=self.password)
|
||||
elif self.auth_type == "userpass":
|
||||
_client.auth_userpass(username=self.username, password=self.password)
|
||||
elif self.auth_type == "approle":
|
||||
_client.auth_approle(role_id=self.role_id, secret_id=self.secret_id)
|
||||
elif self.auth_type == "github":
|
||||
_client.auth.github.login(token=self.token)
|
||||
elif self.auth_type == "gcp":
|
||||
credentials = self._get_gcp_credentials()
|
||||
_client.auth.gcp.configure(credentials=credentials)
|
||||
else:
|
||||
raise AirflowException(f"Authentication type '{self.auth_type}' not supported")
|
||||
|
||||
if _client.is_authenticated():
|
||||
return _client
|
||||
else:
|
||||
raise VaultError("Vault Authentication Error!")
|
||||
|
||||
def build_path(self, conn_id: str):
|
||||
"""
|
||||
Given conn_id, build path for Vault Secret
|
||||
|
||||
:param conn_id: connection id
|
||||
:type conn_id: str
|
||||
"""
|
||||
return self.connections_path + "/" + conn_id
|
||||
|
||||
def get_conn_uri(self, conn_id: str) -> Optional[str]:
|
||||
"""
|
||||
Get secret value from Vault. Store the secret in the form of URI
|
||||
|
||||
:param conn_id: connection id
|
||||
:type conn_id: str
|
||||
"""
|
||||
secret_path = self.build_path(conn_id=conn_id)
|
||||
|
||||
try:
|
||||
if self.kv_engine_version == 1:
|
||||
response = self.client.secrets.kv.v1.read_secret(
|
||||
path=secret_path, mount_point=self.mount_point
|
||||
)
|
||||
else:
|
||||
response = self.client.secrets.kv.v2.read_secret_version(
|
||||
path=secret_path, mount_point=self.mount_point)
|
||||
except InvalidPath:
|
||||
self.log.info("Connection ID %s not found in Path: %s", conn_id, secret_path)
|
||||
return None
|
||||
|
||||
return_data = response["data"] if self.kv_engine_version == 1 else response["data"]["data"]
|
||||
return return_data.get("conn_uri")
|
||||
|
||||
def get_connections(self, conn_id: str) -> List[Connection]:
|
||||
"""
|
||||
Get connections with a specific ID
|
||||
|
||||
:param conn_id: connection id
|
||||
:type conn_id: str
|
||||
"""
|
||||
conn_uri = self.get_conn_uri(conn_id=conn_id)
|
||||
if not conn_uri:
|
||||
return []
|
||||
conn = Connection(conn_id=conn_id, uri=conn_uri)
|
||||
return [conn]
|
||||
|
||||
def _get_gcp_credentials(self):
|
||||
import google.auth
|
||||
import google.oauth2.service_account
|
||||
|
||||
default_scopes = ('https://www.googleapis.com/auth/cloud-platform',)
|
||||
scopes = [s.strip() for s in self.gcp_scopes.split(',')] \
|
||||
if self.gcp_scopes else default_scopes
|
||||
|
||||
if self.gcp_key_path:
|
||||
# Get credentials from a JSON file.
|
||||
if self.gcp_key_path.endswith('.json'):
|
||||
self.log.debug('Getting connection using JSON key file %s', self.gcp_key_path)
|
||||
credentials = (
|
||||
google.oauth2.service_account.Credentials.from_service_account_file(
|
||||
self.gcp_key_path, scopes=scopes)
|
||||
)
|
||||
elif self.gcp_key_path.endswith('.p12'):
|
||||
raise AirflowException(
|
||||
'Legacy P12 key file are not supported, use a JSON key file.'
|
||||
)
|
||||
else:
|
||||
raise AirflowException('Unrecognised extension for key file.')
|
||||
else:
|
||||
self.log.debug(
|
||||
'Getting connection using `google.auth.default()` since no key file is defined.'
|
||||
)
|
||||
credentials, _ = google.auth.default(scopes=scopes)
|
||||
|
||||
return credentials
|
|
@ -17,6 +17,7 @@
|
|||
# under the License.
|
||||
"""
|
||||
Secrets framework provides means of getting connection objects from various sources, e.g. the following:
|
||||
|
||||
* Environment variables
|
||||
* Metatsore database
|
||||
* AWS SSM Parameter store
|
||||
|
|
|
@ -351,8 +351,7 @@ persisted in the database.
|
|||
Secrets Backends
|
||||
----------------
|
||||
Airflow uses relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects.
|
||||
All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`. See :ref:`using an alternative secrets
|
||||
backend <alternative_secrets_backend>` for more info.
|
||||
All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`.
|
||||
|
||||
.. toctree::
|
||||
:includehidden:
|
||||
|
@ -362,3 +361,4 @@ backend <alternative_secrets_backend>` for more info.
|
|||
airflow/secrets/index
|
||||
|
||||
airflow/providers/amazon/aws/secrets/index
|
||||
airflow/providers/hashicorp/secrets/index
|
||||
|
|
|
@ -119,83 +119,11 @@ If using with a docker ``.env`` file, you may need to remove the single quotes.
|
|||
|
||||
AIRFLOW_CONN_MY_PROD_DATABASE=my-conn-type://login:password@host:port/schema?param1=val1¶m2=val2
|
||||
|
||||
.. _alternative_secrets_backend:
|
||||
|
||||
Alternative secrets backend
|
||||
---------------------------
|
||||
|
||||
In addition to retrieving connections from environment variables or the metastore database, you can enable
|
||||
an alternative secrets backend, such as :ref:`AWS SSM Parameter Store <ssm_parameter_store_secrets>`, or you
|
||||
can :ref:`roll your own <roll_your_own_secrets_backend>`.
|
||||
|
||||
Search path
|
||||
^^^^^^^^^^^
|
||||
When looking up a connection, by default airflow will search environment variables first and metastore
|
||||
database second.
|
||||
|
||||
If you enable an alternative secrets backend, it will be searched first, followed by environment variables,
|
||||
then metastore. This search ordering is not configurable.
|
||||
|
||||
.. _secrets_backend_configuration:
|
||||
|
||||
Configuration
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
The ``[secrets]`` section has the following options:
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
[secrets]
|
||||
backend =
|
||||
backend_kwargs =
|
||||
|
||||
Set ``backend`` to the fully qualified class name of the backend you want to enable.
|
||||
|
||||
You can provide ``backend_kwargs`` with json and it will be passed as kwargs to the ``__init__`` method of
|
||||
your secrets backend.
|
||||
|
||||
See :ref:`AWS SSM Parameter Store <ssm_parameter_store_secrets>` for an example configuration.
|
||||
|
||||
.. _ssm_parameter_store_secrets:
|
||||
|
||||
AWS SSM Parameter Store Secrets Backend
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To enable SSM parameter store, specify :py:class:`~airflow.providers.amazon.aws.secrets.ssm.AwsSsmSecretsBackend`
|
||||
as the ``backend`` in ``[secrets]`` section of ``airflow.cfg``.
|
||||
|
||||
Here is a sample configuration:
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
[secrets]
|
||||
backend = airflow.providers.amazon.aws.secrets.ssm.AwsSsmSecretsBackend
|
||||
backend_kwargs = {"prefix": "/airflow", "profile_name": "default"}
|
||||
|
||||
If you have set your prefix as ``/airflow``, then for a connection id of ``smtp_default``, you would want to
|
||||
store your connection at ``/airflow/AIRFLOW_CONN_SMTP_DEFAULT``.
|
||||
|
||||
Optionally you can supply a profile name to reference aws profile, e.g. defined in ``~/.aws/config``.
|
||||
|
||||
The value of the SSM parameter must be the :ref:`airflow connection URI representation <generating_connection_uri>` of the connection object.
|
||||
|
||||
.. _roll_your_own_secrets_backend:
|
||||
|
||||
Roll your own secrets backend
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
A secrets backend is a subclass of :py:class:`airflow.secrets.BaseSecretsBackend`, and just has to implement the
|
||||
:py:meth:`~airflow.secrets.BaseSecretsBackend.get_connections` method.
|
||||
|
||||
Just create your class, and put the fully qualified class name in ``backend`` key in the ``[secrets]``
|
||||
section of ``airflow.cfg``. You can you can also pass kwargs to ``__init__`` by supplying json to the
|
||||
``backend_kwargs`` config param. See :ref:`Configuration <secrets_backend_configuration>` for more details,
|
||||
and :ref:`SSM Parameter Store <ssm_parameter_store_secrets>` for an example.
|
||||
|
||||
.. note::
|
||||
|
||||
If you are rolling your own secrets backend, you don't strictly need to use airflow's URI format. But
|
||||
doing so makes it easier to switch between environment variables, the metastore, and your secrets backend.
|
||||
an alternative secrets backend to retrieve connections. For more details see :doc:`../use-alternative-secrets-backend`
|
||||
|
||||
Connection URI format
|
||||
---------------------
|
||||
|
|
|
@ -45,3 +45,4 @@ configuring an Airflow environment.
|
|||
define_extra_link
|
||||
tracking-user-activity
|
||||
email-config
|
||||
use-alternative-secrets-backend
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
.. Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
.. http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
.. Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
|
||||
|
||||
Alternative secrets backend
|
||||
---------------------------
|
||||
|
||||
In addition to retrieving connections from environment variables or the metastore database, you can enable
|
||||
an alternative secrets backend to retrieve connections,
|
||||
such as :ref:`AWS SSM Parameter Store <ssm_parameter_store_secrets>`,
|
||||
:ref:`Hashicorp Vault Secrets<hashicorp_vault_secrets>` or you can :ref:`roll your own <roll_your_own_secrets_backend>`.
|
||||
|
||||
Search path
|
||||
^^^^^^^^^^^
|
||||
When looking up a connection, by default Airflow will search environment variables first and metastore
|
||||
database second.
|
||||
|
||||
If you enable an alternative secrets backend, it will be searched first, followed by environment variables,
|
||||
then metastore. This search ordering is not configurable.
|
||||
|
||||
.. _secrets_backend_configuration:
|
||||
|
||||
Configuration
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
The ``[secrets]`` section has the following options:
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
[secrets]
|
||||
backend =
|
||||
backend_kwargs =
|
||||
|
||||
Set ``backend`` to the fully qualified class name of the backend you want to enable.
|
||||
|
||||
You can provide ``backend_kwargs`` with json and it will be passed as kwargs to the ``__init__`` method of
|
||||
your secrets backend.
|
||||
|
||||
See :ref:`AWS SSM Parameter Store <ssm_parameter_store_secrets>` for an example configuration.
|
||||
|
||||
.. _ssm_parameter_store_secrets:
|
||||
|
||||
AWS SSM Parameter Store Secrets Backend
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To enable SSM parameter store, specify :py:class:`~airflow.providers.amazon.aws.secrets.ssm.AwsSsmSecretsBackend`
|
||||
as the ``backend`` in ``[secrets]`` section of ``airflow.cfg``.
|
||||
|
||||
Here is a sample configuration:
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
[secrets]
|
||||
backend = airflow.providers.amazon.aws.secrets.ssm.AwsSsmSecretsBackend
|
||||
backend_kwargs = {"prefix": "/airflow", "profile_name": "default"}
|
||||
|
||||
If you have set your prefix as ``/airflow``, then for a connection id of ``smtp_default``, you would want to
|
||||
store your connection at ``/airflow/AIRFLOW_CONN_SMTP_DEFAULT``.
|
||||
|
||||
Optionally you can supply a profile name to reference aws profile, e.g. defined in ``~/.aws/config``.
|
||||
|
||||
The value of the SSM parameter must be the :ref:`airflow connection URI representation <generating_connection_uri>` of the connection object.
|
||||
|
||||
.. _hashicorp_vault_secrets:
|
||||
|
||||
Hashicorp Vault Secrets Backend
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To enable Hashicorp vault to retrieve connection, specify :py:class:`~airflow.providers.hashicorp.secrets.vault.VaultSecrets`
|
||||
as the ``backend`` in ``[secrets]`` section of ``airflow.cfg``.
|
||||
|
||||
Here is a sample configuration:
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
[secrets]
|
||||
backend = airflow.providers.hashicorp.secrets.vault.VaultSecrets
|
||||
backend_kwargs = {"connections_path": "connections", "mount_point": "airflow", "url": "http://127.0.0.1:8200"}
|
||||
|
||||
The default KV version engine is ``2``, pass ``kv_engine_version: 1`` in ``backend_kwargs`` if you use
|
||||
KV Secrets Engine Version ``1``.
|
||||
|
||||
You can also set and pass values to Vault client by setting environment variables. All the
|
||||
environment variables listed at https://www.vaultproject.io/docs/commands/#environment-variables are supported.
|
||||
|
||||
Hence, if you set ``VAULT_ADDR`` environment variable like below, you do not need to pass ``url``
|
||||
key to ``backend_kwargs``:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
export VAULT_ADDR="http://127.0.0.1:8200"
|
||||
|
||||
If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of
|
||||
``smtp_default``, you would want to store your secret as:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
vault kv put airflow/connections/smtp_default conn_uri=smtps://user:host@relay.example.com:465
|
||||
|
||||
Note that the ``key`` is ``conn_uri``, ``value`` is ``postgresql://airflow:airflow@host:5432/airflow`` and
|
||||
``mount_point`` is ``airflow``.
|
||||
|
||||
You can make a ``mount_point`` for ``airflow`` as follows:
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
vault secrets enable -path=airflow -version=2 kv
|
||||
|
||||
Verify that you can get the secret from ``vault``:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
❯ vault kv get airflow/connections/smtp_default
|
||||
====== Metadata ======
|
||||
Key Value
|
||||
--- -----
|
||||
created_time 2020-03-19T19:17:51.281721Z
|
||||
deletion_time n/a
|
||||
destroyed false
|
||||
version 1
|
||||
|
||||
====== Data ======
|
||||
Key Value
|
||||
--- -----
|
||||
conn_uri smtps://user:host@relay.example.com:465
|
||||
|
||||
The value of the Vault key must be the :ref:`connection URI representation <generating_connection_uri>`
|
||||
of the connection object.
|
||||
|
||||
.. _roll_your_own_secrets_backend:
|
||||
|
||||
Roll your own secrets backend
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
A secrets backend is a subclass of :py:class:`airflow.secrets.BaseSecretsBackend`, and just has to implement the
|
||||
:py:meth:`~airflow.secrets.BaseSecretsBackend.get_connections` method.
|
||||
|
||||
Just create your class, and put the fully qualified class name in ``backend`` key in the ``[secrets]``
|
||||
section of ``airflow.cfg``. You can you can also pass kwargs to ``__init__`` by supplying json to the
|
||||
``backend_kwargs`` config param. See :ref:`Configuration <secrets_backend_configuration>` for more details,
|
||||
and :ref:`SSM Parameter Store <ssm_parameter_store_secrets>` for an example.
|
||||
|
||||
.. note::
|
||||
|
||||
If you are rolling your own secrets backend, you don't strictly need to use airflow's URI format. But
|
||||
doing so makes it easier to switch between environment variables, the metastore, and your secrets backend.
|
|
@ -122,6 +122,8 @@ Here's the list of the subpackages and what they enable:
|
|||
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
|
||||
| google_auth | ``pip install 'apache-airflow[google_auth]'`` | Google auth backend |
|
||||
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
|
||||
| hashicorp | ``pip install 'apache-airflow[hashicorp]'`` | Hashicorp Services (Vault) |
|
||||
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
|
||||
| jira | ``pip install 'apache-airflow[jira]'`` | Jira hooks and operators |
|
||||
+---------------------+-----------------------------------------------------+----------------------------------------------------------------------+
|
||||
| qds | ``pip install 'apache-airflow[qds]'`` | Enable QDS (Qubole Data Service) support |
|
||||
|
|
|
@ -27,7 +27,7 @@ Airflow has a mechanism that allows you to expand its functionality and integrat
|
|||
* :doc:`Authentication backends </security>`
|
||||
* :doc:`Logging </howto/write-logs>`
|
||||
* :doc:`Tracking systems </howto/tracking-user-activity>`
|
||||
* :ref:`Secrets backends <alternative_secrets_backend>`
|
||||
* :doc:`Secrets backends </howto/use-alternative-secrets-backend>`
|
||||
|
||||
It also has integration with :doc:`Sentry <errors>` service for error tracking. Other applications can also integrate using
|
||||
the :doc:`REST API <rest-api-ref>`.
|
||||
|
|
6
setup.py
6
setup.py
|
@ -251,6 +251,9 @@ gcp = [
|
|||
grpc = [
|
||||
'grpcio>=1.15.0',
|
||||
]
|
||||
hashicorp = [
|
||||
'hvac~=0.10',
|
||||
]
|
||||
hdfs = [
|
||||
'snakebite>=2.7.8',
|
||||
]
|
||||
|
@ -429,7 +432,7 @@ else:
|
|||
devel_minreq = cgroups + devel + doc + kubernetes + mysql + password
|
||||
devel_hadoop = devel_minreq + hdfs + hive + kerberos + presto + webhdfs
|
||||
devel_all = (all_dbs + atlas + aws + azure + celery + cgroups + datadog + devel + doc + docker +
|
||||
elasticsearch + gcp + grpc + jdbc + jenkins + kerberos + kubernetes + ldap + odbc +
|
||||
elasticsearch + gcp + grpc + hashicorp + jdbc + jenkins + kerberos + kubernetes + ldap + odbc +
|
||||
oracle + pagerduty + papermill + password + redis + salesforce + samba + segment +
|
||||
sendgrid + sentry + singularity + slack + snowflake + ssh + statsd + tableau +
|
||||
virtualenv + webhdfs + yandexcloud + zendesk)
|
||||
|
@ -551,6 +554,7 @@ def do_setup():
|
|||
'grpc': grpc,
|
||||
'hdfs': hdfs,
|
||||
'hive': hive,
|
||||
'hvac': hashicorp,
|
||||
'jdbc': jdbc,
|
||||
'jira': jira,
|
||||
'kerberos': kerberos,
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
|
@ -0,0 +1,146 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest import TestCase, mock
|
||||
|
||||
from hvac.exceptions import InvalidPath, VaultError
|
||||
|
||||
from airflow.providers.hashicorp.secrets.vault import VaultSecrets
|
||||
|
||||
|
||||
class TestVaultSecrets(TestCase):
|
||||
|
||||
@mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
|
||||
def test_get_conn_uri(self, mock_hvac):
|
||||
mock_client = mock.MagicMock()
|
||||
mock_hvac.Client.return_value = mock_client
|
||||
mock_client.secrets.kv.v2.read_secret_version.return_value = {
|
||||
'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
|
||||
'lease_id': '',
|
||||
'renewable': False,
|
||||
'lease_duration': 0,
|
||||
'data': {
|
||||
'data': {'conn_uri': 'postgresql://airflow:airflow@host:5432/airflow'},
|
||||
'metadata': {'created_time': '2020-03-16T21:01:43.331126Z',
|
||||
'deletion_time': '',
|
||||
'destroyed': False,
|
||||
'version': 1}},
|
||||
'wrap_info': None,
|
||||
'warnings': None,
|
||||
'auth': None
|
||||
}
|
||||
|
||||
kwargs = {
|
||||
"connections_path": "connections",
|
||||
"mount_point": "airflow",
|
||||
"auth_type": "token",
|
||||
"url": "http://127.0.0.1:8200",
|
||||
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS"
|
||||
}
|
||||
|
||||
test_client = VaultSecrets(**kwargs)
|
||||
returned_uri = test_client.get_conn_uri(conn_id="test_postgres")
|
||||
self.assertEqual('postgresql://airflow:airflow@host:5432/airflow', returned_uri)
|
||||
|
||||
@mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
|
||||
def test_get_conn_uri_engine_version_1(self, mock_hvac):
|
||||
mock_client = mock.MagicMock()
|
||||
mock_hvac.Client.return_value = mock_client
|
||||
mock_client.secrets.kv.v1.read_secret.return_value = {
|
||||
'request_id': '182d0673-618c-9889-4cba-4e1f4cfe4b4b',
|
||||
'lease_id': '',
|
||||
'renewable': False,
|
||||
'lease_duration': 2764800,
|
||||
'data': {'conn_uri': 'postgresql://airflow:airflow@host:5432/airflow'},
|
||||
'wrap_info': None,
|
||||
'warnings': None,
|
||||
'auth': None}
|
||||
|
||||
kwargs = {
|
||||
"connections_path": "connections",
|
||||
"mount_point": "airflow",
|
||||
"auth_type": "token",
|
||||
"url": "http://127.0.0.1:8200",
|
||||
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS",
|
||||
"kv_engine_version": 1
|
||||
}
|
||||
|
||||
test_client = VaultSecrets(**kwargs)
|
||||
returned_uri = test_client.get_conn_uri(conn_id="test_postgres")
|
||||
mock_client.secrets.kv.v1.read_secret.assert_called_once_with(
|
||||
mount_point='airflow', path='connections/test_postgres')
|
||||
self.assertEqual('postgresql://airflow:airflow@host:5432/airflow', returned_uri)
|
||||
|
||||
@mock.patch.dict('os.environ', {
|
||||
'AIRFLOW_CONN_TEST_MYSQL': 'mysql://airflow:airflow@host:5432/airflow',
|
||||
})
|
||||
@mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
|
||||
def test_get_conn_uri_non_existent_key(self, mock_hvac):
|
||||
"""
|
||||
Test that if the key with connection ID is not present in Vault, VaultClient.get_connections
|
||||
should return None
|
||||
"""
|
||||
mock_client = mock.MagicMock()
|
||||
mock_hvac.Client.return_value = mock_client
|
||||
# Response does not contain the requested key
|
||||
mock_client.secrets.kv.v2.read_secret_version.side_effect = InvalidPath()
|
||||
|
||||
kwargs = {
|
||||
"connections_path": "connections",
|
||||
"mount_point": "airflow",
|
||||
"auth_type": "token",
|
||||
"url": "http://127.0.0.1:8200",
|
||||
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS"
|
||||
}
|
||||
|
||||
test_client = VaultSecrets(**kwargs)
|
||||
self.assertIsNone(test_client.get_conn_uri(conn_id="test_mysql"))
|
||||
mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
|
||||
mount_point='airflow', path='connections/test_mysql')
|
||||
self.assertEqual([], test_client.get_connections(conn_id="test_mysql"))
|
||||
|
||||
@mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
|
||||
def test_auth_failure_raises_error(self, mock_hvac):
|
||||
mock_client = mock.MagicMock()
|
||||
mock_hvac.Client.return_value = mock_client
|
||||
mock_client.is_authenticated.return_value = False
|
||||
|
||||
kwargs = {
|
||||
"connections_path": "connections",
|
||||
"mount_point": "airflow",
|
||||
"auth_type": "token",
|
||||
"url": "http://127.0.0.1:8200",
|
||||
"token": "test_wrong_token"
|
||||
}
|
||||
|
||||
with self.assertRaisesRegex(VaultError, "Vault Authentication Error!"):
|
||||
VaultSecrets(**kwargs).get_connections(conn_id='test')
|
||||
|
||||
@mock.patch("airflow.providers.hashicorp.secrets.vault.hvac")
|
||||
def test_empty_token_raises_error(self, mock_hvac):
|
||||
mock_client = mock.MagicMock()
|
||||
mock_hvac.Client.return_value = mock_client
|
||||
|
||||
kwargs = {
|
||||
"connections_path": "connections",
|
||||
"mount_point": "airflow",
|
||||
"auth_type": "token",
|
||||
"url": "http://127.0.0.1:8200",
|
||||
}
|
||||
|
||||
with self.assertRaisesRegex(VaultError, "token cannot be None for auth_type='token'"):
|
||||
VaultSecrets(**kwargs).get_connections(conn_id='test')
|
Загрузка…
Ссылка в новой задаче