[AIRFLOW-1517] Restore authorship of secrets and init container
This commit is contained in:
Родитель
c5ced072b2
Коммит
361dad957c
|
@ -88,6 +88,37 @@ class KubernetesRequestFactory:
|
|||
def extract_name(pod, req):
|
||||
req['metadata']['name'] = pod.name
|
||||
|
||||
@staticmethod
|
||||
def extract_volume_secrets(pod, req):
|
||||
vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
|
||||
if any(vol_secrets):
|
||||
req['spec']['containers'][0]['volumeMounts'] = []
|
||||
req['spec']['volumes'] = []
|
||||
for idx, vol in enumerate(vol_secrets):
|
||||
vol_id = 'secretvol' + str(idx)
|
||||
req['spec']['containers'][0]['volumeMounts'].append({
|
||||
'mountPath': vol.deploy_target,
|
||||
'name': vol_id,
|
||||
'readOnly': True
|
||||
})
|
||||
req['spec']['volumes'].append({
|
||||
'name': vol_id,
|
||||
'secret': {
|
||||
'secretName': vol.secret
|
||||
}
|
||||
})
|
||||
|
||||
@staticmethod
|
||||
def extract_env_and_secrets(pod, req):
|
||||
env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
|
||||
if len(pod.envs) > 0 or len(env_secrets) > 0:
|
||||
env = []
|
||||
for k in pod.envs.keys():
|
||||
env.append({'name': k, 'value': pod.envs[k]})
|
||||
for secret in env_secrets:
|
||||
KubernetesRequestFactory.add_secret_to_env(env, secret)
|
||||
req['spec']['containers'][0]['env'] = env
|
||||
|
||||
@staticmethod
|
||||
def extract_resources(pod, req):
|
||||
if not pod.resources or pod.resources.is_empty_resource_request():
|
||||
|
@ -112,3 +143,20 @@ class KubernetesRequestFactory:
|
|||
if pod.resources.request_cpu:
|
||||
req['spec']['containers'][0]['resources']['limits'][
|
||||
'cpu'] = pod.resources.limit_cpu
|
||||
|
||||
@staticmethod
|
||||
def extract_init_containers(pod, req):
|
||||
if pod.init_containers:
|
||||
req['spec']['initContainers'] = pod.init_containers
|
||||
|
||||
@staticmethod
|
||||
def extract_service_account_name(pod, req):
|
||||
if pod.service_account_name:
|
||||
req['spec']['serviceAccountName'] = pod.service_account_name
|
||||
|
||||
@staticmethod
|
||||
def extract_image_pull_secrets(pod, req):
|
||||
if pod.image_pull_secrets:
|
||||
req['spec']['imagePullSecrets'] = [{
|
||||
'name': pull_secret
|
||||
} for pull_secret in pod.image_pull_secrets.split(',')]
|
||||
|
|
|
@ -45,8 +45,12 @@ spec:
|
|||
self.extract_cmds(pod, req)
|
||||
self.extract_args(pod, req)
|
||||
self.extract_node_selector(pod, req)
|
||||
self.extract_env_and_secrets(pod, req)
|
||||
self.extract_volume_secrets(pod, req)
|
||||
self.attach_volumes(pod, req)
|
||||
self.attach_volume_mounts(pod, req)
|
||||
self.extract_resources(pod, req)
|
||||
self.extract_service_account_name(pod, req)
|
||||
self.extract_init_containers(pod, req)
|
||||
self.extract_image_pull_secrets(pod, req)
|
||||
return req
|
||||
|
|
|
@ -86,4 +86,7 @@ class Pod:
|
|||
self.node_selectors = node_selectors or []
|
||||
self.namespace = namespace
|
||||
self.image_pull_policy = image_pull_policy
|
||||
self.image_pull_secrets = image_pull_secrets
|
||||
self.init_containers = init_containers
|
||||
self.service_account_name = service_account_name
|
||||
self.resources = resources or Resources()
|
||||
|
|
|
@ -29,6 +29,39 @@ class PodGenerator:
|
|||
self.init_containers = []
|
||||
self.secrets = []
|
||||
|
||||
def add_init_container(self,
|
||||
name,
|
||||
image,
|
||||
securityContext,
|
||||
init_environment,
|
||||
volume_mounts
|
||||
):
|
||||
"""
|
||||
|
||||
Adds an init container to the launched pod. useful for pre-
|
||||
|
||||
Args:
|
||||
name (str):
|
||||
image (str):
|
||||
securityContext (dict):
|
||||
init_environment (dict):
|
||||
volume_mounts (dict):
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
self.init_containers.append(
|
||||
{
|
||||
'name': name,
|
||||
'image': image,
|
||||
'securityContext': securityContext,
|
||||
'env': init_environment,
|
||||
'volumeMounts': volume_mounts
|
||||
}
|
||||
)
|
||||
|
||||
def _get_init_containers(self):
|
||||
return self.init_containers
|
||||
|
||||
def add_volume(self, name):
|
||||
"""
|
||||
|
@ -76,6 +109,12 @@ class PodGenerator:
|
|||
def _get_volumes_and_mounts(self):
|
||||
return self.volumes, self.volume_mounts
|
||||
|
||||
def _get_image_pull_secrets(self):
|
||||
"""Extracts any image pull secrets for fetching container(s)"""
|
||||
if not self.kube_config.image_pull_secrets:
|
||||
return []
|
||||
return self.kube_config.image_pull_secrets.split(',')
|
||||
|
||||
def make_pod(self, namespace, image, pod_id, cmds,
|
||||
arguments, labels, kube_executor_config=None):
|
||||
volumes, volume_mounts = self._get_volumes_and_mounts()
|
||||
|
@ -97,6 +136,9 @@ class PodGenerator:
|
|||
labels=labels,
|
||||
envs=self.env_vars,
|
||||
secrets={},
|
||||
# service_account_name=self.kube_config.worker_service_account_name,
|
||||
# image_pull_secrets=self.kube_config.image_pull_secrets,
|
||||
init_containers=worker_init_container_spec,
|
||||
volumes=volumes,
|
||||
volume_mounts=volume_mounts,
|
||||
resources=None
|
||||
|
@ -128,12 +170,32 @@ class WorkerGenerator(PodGenerator):
|
|||
'readOnly': True
|
||||
}]
|
||||
|
||||
# Mount the airflow.cfg file via a configmap the user has specified
|
||||
if self.kube_config.airflow_configmap:
|
||||
config_volume_name = "airflow-config"
|
||||
config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
|
||||
volumes.append({
|
||||
'name': config_volume_name,
|
||||
'configMap': {
|
||||
'name': self.kube_config.airflow_configmap
|
||||
}
|
||||
})
|
||||
volume_mounts.append({
|
||||
'name': config_volume_name,
|
||||
'mountPath': config_path,
|
||||
'subPath': 'airflow.cfg',
|
||||
'readOnly': True
|
||||
})
|
||||
|
||||
# A PV with the DAGs should be mounted
|
||||
if self.kube_config.dags_volume_claim:
|
||||
volumes[0]['persistentVolumeClaim'] = {
|
||||
"claimName": self.kube_config.dags_volume_claim}
|
||||
if self.kube_config.dags_volume_subpath:
|
||||
volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
|
||||
else:
|
||||
# Create a Shared Volume for the Git-Sync module to populate
|
||||
volumes[0]["emptyDir"] = {}
|
||||
return volumes, volume_mounts
|
||||
|
||||
def _init_labels(self, dag_id, task_id, execution_date):
|
||||
|
@ -154,6 +216,49 @@ class WorkerGenerator(PodGenerator):
|
|||
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
|
||||
return env
|
||||
|
||||
def _init_init_containers(self, volume_mounts):
|
||||
"""When using git to retrieve the DAGs, use the GitSync Init Container"""
|
||||
# If we're using volume claims to mount the dags, no init container is needed
|
||||
if self.kube_config.dags_volume_claim:
|
||||
return []
|
||||
|
||||
# Otherwise, define a git-sync init container
|
||||
init_environment = [{
|
||||
'name': 'GIT_SYNC_REPO',
|
||||
'value': self.kube_config.git_repo
|
||||
}, {
|
||||
'name': 'GIT_SYNC_BRANCH',
|
||||
'value': self.kube_config.git_branch
|
||||
}, {
|
||||
'name': 'GIT_SYNC_ROOT',
|
||||
'value': '/tmp'
|
||||
}, {
|
||||
'name': 'GIT_SYNC_DEST',
|
||||
'value': 'dags'
|
||||
}, {
|
||||
'name': 'GIT_SYNC_ONE_TIME',
|
||||
'value': 'true'
|
||||
}]
|
||||
if self.kube_config.git_user:
|
||||
init_environment.append({
|
||||
'name': 'GIT_SYNC_USERNAME',
|
||||
'value': self.kube_config.git_user
|
||||
})
|
||||
if self.kube_config.git_password:
|
||||
init_environment.append({
|
||||
'name': 'GIT_SYNC_PASSWORD',
|
||||
'value': self.kube_config.git_password
|
||||
})
|
||||
|
||||
volume_mounts[0]['readOnly'] = False
|
||||
return [{
|
||||
'name': self.kube_config.git_sync_init_container_name,
|
||||
'image': self.kube_config.git_sync_container,
|
||||
'securityContext': {'runAsUser': 0},
|
||||
'env': init_environment,
|
||||
'volumeMounts': volume_mounts
|
||||
}]
|
||||
|
||||
def make_worker_pod(self,
|
||||
namespace,
|
||||
pod_id,
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class Secret:
|
||||
"""Defines Kubernetes Secret Volume"""
|
||||
|
||||
def __init__(self, deploy_type, deploy_target, secret, key):
|
||||
"""Initialize a Kubernetes Secret Object. Used to track requested secrets from
|
||||
the user.
|
||||
|
||||
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or
|
||||
`volume`
|
||||
:type deploy_type: ``str``
|
||||
:param deploy_target: The environment variable to be created in the worker.
|
||||
:type deploy_target: ``str``
|
||||
:param secret: Name of the secrets object in Kubernetes
|
||||
:type secret: ``str``
|
||||
:param key: Key of the secret within the Kubernetes Secret
|
||||
:type key: ``str``
|
||||
"""
|
||||
self.deploy_type = deploy_type
|
||||
self.deploy_target = deploy_target.upper()
|
||||
self.secret = secret
|
||||
self.key = key
|
Загрузка…
Ссылка в новой задаче