[AIRFLOW-1314] Improve k8s support

Add kubernetes config section in airflow.cfg and Inject GCP secrets upon executor start. (#17)
Update Airflow to Pass configuration to k8s containers, add some Py3 … (#9)

* Update Airflow to Pass configuration to k8s containers, add some Py3 compat., create git-sync pod

* Undo changes to display-source config setter for to_dict

* WIP Secrets and Configmaps

* Improve secrets support for multiple secrets. Add support for registry secrets. Add support for RBAC service accounts.

* Swap order of variables, overlooked very basic issue

* Secret env var names must be upper

* Update logging

* Revert spothero test code in setup.py

* WIP Fix tests

* Worker should be using local executor

* Consolidate worker setup and address code review comments

* reconfigure airflow script to use new secrets method
This commit is contained in:
fenglu-g 2017-10-23 09:52:06 -07:00 коммит произвёл Fokko Driesprong
Родитель a9d90dc9a5
Коммит ad4e67ce1b
14 изменённых файлов: 513 добавлений и 230 удалений

Просмотреть файл

@ -5,9 +5,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -498,3 +498,62 @@ hide_sensitive_variable_fields = True
[elasticsearch]
elasticsearch_host =
[kubernetes]
# The repository and tag of the Kubernetes Image for the Worker to Run
worker_container_repository =
worker_container_tag =
# If True (default), worker pods will be deleted upon termination
delete_worker_pods = True
# The Kubernetes namespace where airflow workers should be created. Defaults to `default`
namespace = default
# The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file)
airflow_configmap =
# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs
dags_volume_subpath =
# For DAGs mounted via a volume claim (mutually exclusive with volume claim)
dags_volume_claim =
# Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim)
git_repo =
git_branch =
git_user =
git_password =
# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
git_sync_container_tag = v2.0.5
git_sync_init_container_name = git-sync-clone
# The name of the Kubernetes service account to be associated with airflow workers, if any.
# Service accounts are required for workers that require access to secrets or cluster resources.
# See the Kubernetes RBAC documentation for more:
# https://kubernetes.io/docs/admin/authorization/rbac/
worker_service_account_name =
# Any image pull secrets to be given to worker pods, If more than one secret is
# required, provide a comma separated list: secret_a,secret_b
image_pull_secrets =
# GCP Service Account Keys to be provided to tasks run on Kubernetes Executors
# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
gcp_service_account_keys =
[kubernetes_secrets]
# The scheduler mounts the following secrets into your workers as they are launched by the
# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the
# defined secrets and mount them as secret environment variables in the launched workers.
# Secrets in this section are defined as follows
# <environment_variable_mount> = <kubernetes_secret_object>:<kubernetes_secret_key>
#
# For example if you wanted to mount a kubernetes secret key named `postgres_password` from the
# kubernetes secret object `airflow-secret` as the environment variable `POSTGRES_PASSWORD` into
# your workers you would follow the following format:
# POSTGRES_PASSWORD = airflow-secret:postgres_credentials
#
# Additionally you may override worker airflow settings with the AIRFLOW__<SECTION>__<KEY>
# formatting as supported by airflow normally.

Просмотреть файл

@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import logging
import base64
import os
import multiprocessing
import six
from queue import Queue
from dateutil import parser
from uuid import uuid4
@ -23,13 +23,13 @@ from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
from airflow.executors.base_executor import BaseExecutor
from airflow.models import TaskInstance, KubeResourceVersion
from airflow.utils.state import State
from airflow import configuration, settings
from airflow.exceptions import AirflowConfigException
from airflow.contrib.kubernetes.pod import Pod
from airflow.utils.log.logging_mixin import LoggingMixin
class KubeConfig:
core_section = "core"
@ -50,98 +50,84 @@ class KubeConfig:
return default
def __init__(self):
configuration_dict = configuration.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict['core']
self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
self.airflow_home = configuration.get(self.core_section, 'airflow_home')
self.dags_folder = configuration.get(self.core_section, 'dags_folder')
self.parallelism = configuration.getint(self.core_section, 'PARALLELISM')
self.kube_image = configuration.get(self.kubernetes_section, 'container_image')
self.worker_container_repository = configuration.get(
self.kubernetes_section, 'worker_container_repository')
self.worker_container_tag = configuration.get(
self.kubernetes_section, 'worker_container_tag')
self.kube_image = '{}:{}'.format(
self.worker_container_repository, self.worker_container_tag)
self.delete_worker_pods = self.safe_getboolean(self.kubernetes_section, 'delete_worker_pods', True)
self.kube_namespace = os.environ.get('AIRFLOW_KUBE_NAMESPACE', 'default')
# These two props must be set together
self.worker_service_account_name = self.safe_get(
self.kubernetes_section, 'worker_service_account_name', 'default')
self.image_pull_secrets = self.safe_get(
self.kubernetes_section, 'image_pull_secrets', '')
# NOTE: `git_repo` and `git_branch` must be specified together as a pair
# The http URL of the git repository to clone from
self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None)
# The branch of the repository to be checked out
self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None)
# Optionally, the directory in the git repository containing the dags
self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '')
# Or this one prop
# Optionally a user may supply a `git_user` and `git_password` for private repositories
self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None)
self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None)
# NOTE: The user may optionally use a volume claim to mount a PV containing DAGs directly
self.dags_volume_claim = self.safe_get(self.kubernetes_section, 'dags_volume_claim', None)
# And optionally this prop
# This prop may optionally be set for PV Claims and is used to locate DAGs on a SubPath
self.dags_volume_subpath = self.safe_get(self.kubernetes_section, 'dags_volume_subpath', None)
# The Kubernetes Namespace in which the Scheduler and Webserver reside. Note that if your
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default')
# The Kubernetes Namespace in which pods will be created by the executor. Note that if your
# cluster has RBAC enabled, your workers may need service account permissions to
# interact with cluster components.
self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default')
# Task secrets managed by KubernetesExecutor.
self.gcp_service_account_keys = self.safe_get(self.kubernetes_section, 'gcp_service_account_keys', None)
# If the user is using the git-sync container to clone their repository via git,
# allow them to specify repository, tag, and pod name for the init container.
self.git_sync_container_repository = self.safe_get(
self.kubernetes_section, 'git_sync_container_repository',
'gcr.io/google-containers/git-sync-amd64')
self.git_sync_container_tag = self.safe_get(
self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5')
self.git_sync_container = '{}:{}'.format(
self.git_sync_container_repository, self.git_sync_container_tag)
self.git_sync_init_container_name = self.safe_get(
self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone')
# The worker pod may optionally have a valid Airflow config loaded via a configmap
self.airflow_configmap = self.safe_get(self.kubernetes_section, 'airflow_configmap', None)
self._validate()
def _validate(self):
if self.dags_volume_claim:
# do volume things
pass
elif self.git_repo and self.git_branch:
# do git things
pass
else:
if not self.dags_volume_claim and (not self.git_repo or not self.git_branch):
raise AirflowConfigException(
"In kubernetes mode you must set the following configs in the `kubernetes` section: "
"`dags_volume_claim` or "
"`git_repo and git_branch`"
"`git_repo and git_branch` "
)
class PodMaker:
def __init__(self, kube_config):
self.logger = logging.getLogger(__name__)
self.kube_config = kube_config
def _get_volumes_and_mounts(self):
volume_name = "airflow-dags"
if self.kube_config.dags_volume_claim:
volumes = [{
"name": volume_name, "persistentVolumeClaim": {"claimName": self.kube_config.dags_volume_claim}
}]
volume_mounts = [{
"name": volume_name, "mountPath": self.kube_config.dags_folder,
"readOnly": True
}]
if self.kube_config.dags_volume_subpath:
volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
return volumes, volume_mounts
else:
return [], []
def _get_args(self, airflow_command):
if self.kube_config.dags_volume_claim:
self.logger.info("Using k8s_dags_volume_claim for airflow dags")
return [airflow_command]
else:
self.logger.info("Using git-syncher for airflow dags")
cmd_args = "mkdir -p {dags_folder} && cd {dags_folder} &&" \
"git init && git remote add origin {git_repo} && git pull origin {git_branch} --depth=1 &&" \
"{command}".format(dags_folder=self.kube_config.dags_folder, git_repo=self.kube_config.git_repo,
git_branch=self.kube_config.git_branch, command=airflow_command)
return [cmd_args]
def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command):
volumes, volume_mounts = self._get_volumes_and_mounts()
pod = Pod(
namespace=namespace,
name=pod_id,
image=self.kube_config.kube_image,
cmds=["bash", "-cx", "--"],
args=self._get_args(airflow_command),
labels={
"airflow-slave": "",
"dag_id": dag_id,
"task_id": task_id,
"execution_date": execution_date
},
envs={"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"},
volumes=volumes,
volume_mounts=volume_mounts
)
return pod
class KubernetesJobWatcher(multiprocessing.Process, object):
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
def __init__(self, namespace, watcher_queue, resource_version):
self.logger = logging.getLogger(__name__)
multiprocessing.Process.__init__(self)
self.namespace = namespace
self.watcher_queue = watcher_queue
@ -153,14 +139,14 @@ class KubernetesJobWatcher(multiprocessing.Process, object):
try:
self.resource_version = self._run(kube_client, self.resource_version)
except Exception:
self.logger.exception("Unknown error in KubernetesJobWatcher. Failing")
self.log.exception("Unknown error in KubernetesJobWatcher. Failing")
raise
else:
self.logger.warn("Watch died gracefully, starting back up with: "
"last resource_version: {}".format(self.resource_version))
self.log.warn("Watch died gracefully, starting back up with: "
"last resource_version: {}".format(self.resource_version))
def _run(self, kube_client, resource_version):
self.logger.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version))
self.log.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version))
watcher = watch.Watch()
kwargs = {"label_selector": "airflow-slave"}
@ -170,7 +156,7 @@ class KubernetesJobWatcher(multiprocessing.Process, object):
last_resource_version = None
for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs):
task = event['object']
self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, event['type']))
self.log.info("Event: {} had an event of type {}".format(task.metadata.name, event['type']))
self.process_status(
task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version
)
@ -180,32 +166,31 @@ class KubernetesJobWatcher(multiprocessing.Process, object):
def process_status(self, pod_id, status, labels, resource_version):
if status == 'Pending':
self.logger.info("Event: {} Pending".format(pod_id))
self.log.info("Event: {} Pending".format(pod_id))
elif status == 'Failed':
self.logger.info("Event: {} Failed".format(pod_id))
self.log.info("Event: {} Failed".format(pod_id))
self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
elif status == 'Succeeded':
self.logger.info("Event: {} Succeeded".format(pod_id))
self.log.info("Event: {} Succeeded".format(pod_id))
self.watcher_queue.put((pod_id, None, labels, resource_version))
elif status == 'Running':
self.logger.info("Event: {} is Running".format(pod_id))
self.log.info("Event: {} is Running".format(pod_id))
else:
self.logger.warn("Event: Invalid state: {} on pod: {} with labels: {} "
self.log.warn("Event: Invalid state: {} on pod: {} with labels: {} "
"with resource_version: {}".format(status, pod_id, labels, resource_version))
class AirflowKubernetesScheduler(object):
class AirflowKubernetesScheduler(LoggingMixin, object):
def __init__(self, kube_config, task_queue, result_queue, session, kube_client):
self.logger = logging.getLogger(__name__)
self.logger.info("creating kubernetes executor")
self.kube_config = KubeConfig()
self.log.debug("creating kubernetes executor")
self.kube_config = kube_config
self.task_queue = task_queue
self.pending_jobs = set()
self.namespace = os.environ['k8s_POD_NAMESPACE']
self.logger.info("k8s: using namespace {}".format(self.namespace))
self.result_queue = result_queue
self.namespace = self.kube_config.kube_namespace
self.log.debug("k8s: using namespace {}".format(self.namespace))
self.kube_client = kube_client
self.launcher = PodLauncher(kube_client=self.kube_client)
self.pod_maker = PodMaker(kube_config=self.kube_config)
self.worker_configuration = WorkerConfiguration(kube_config=self.kube_config)
self.watcher_queue = multiprocessing.Queue()
self._session = session
self.kube_watcher = self._make_kube_watcher()
@ -220,7 +205,7 @@ class AirflowKubernetesScheduler(object):
if self.kube_watcher.is_alive():
pass
else:
self.logger.error("Error while health checking kube watcher process. Process died for unknown reasons")
self.log.error("Error while health checking kube watcher process. Process died for unknown reasons")
self.kube_watcher = self._make_kube_watcher()
def run_next(self, next_job):
@ -234,19 +219,19 @@ class AirflowKubernetesScheduler(object):
:return:
"""
self.logger.info('k8s: job is {}'.format(str(next_job)))
self.log.debug('k8s: job is {}'.format(str(next_job)))
key, command = next_job
dag_id, task_id, execution_date = key
self.logger.info("k8s: running for command {}".format(command))
self.logger.info("k8s: launching image {}".format(self.kube_config.kube_image))
pod = self.pod_maker.make_pod(
self.log.debug("k8s: running for command {}".format(command))
self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image))
pod = self.worker_configuration.make_pod(
namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id),
dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date),
airflow_command=command
)
# the watcher will monitor pods, so we do not block.
self.launcher.run_pod_async(pod)
self.logger.info("k8s: Job created!")
self.log.debug("k8s: Job created!")
def delete_pod(self, pod_id):
if self.kube_config.delete_worker_pods:
@ -272,10 +257,10 @@ class AirflowKubernetesScheduler(object):
def process_watcher_task(self):
pod_id, state, labels, resource_version = self.watcher_queue.get()
logging.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id, state, labels))
self.log.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id, state, labels))
key = self._labels_to_key(labels)
if key:
self.logger.info("finishing job {}".format(key))
self.log.debug("finishing job {} - {} ({})".format(key, state, pod_id))
self.result_queue.put((key, state, pod_id, resource_version))
@staticmethod
@ -336,13 +321,13 @@ class AirflowKubernetesScheduler(object):
try:
return labels["dag_id"], labels["task_id"], self._label_safe_datestring_to_datetime(labels["execution_date"])
except Exception as e:
self.logger.warn("Error while converting labels to key; labels: {}; exception: {}".format(
self.log.warn("Error while converting labels to key; labels: {}; exception: {}".format(
labels, e
))
return None
class KubernetesExecutor(BaseExecutor):
class KubernetesExecutor(BaseExecutor, LoggingMixin):
def __init__(self):
super(KubernetesExecutor, self).__init__(parallelism=PARALLELISM)
self.task_queue = None
@ -362,7 +347,7 @@ class KubernetesExecutor(BaseExecutor):
:return: None
"""
queued_tasks = self._session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
self.logger.info("When executor started up, found {} queued task instances".format(len(queued_tasks)))
self.log.info("When executor started up, found {} queued task instances".format(len(queued_tasks)))
for t in queued_tasks:
kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format(
@ -370,7 +355,7 @@ class KubernetesExecutor(BaseExecutor):
))
pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
if len(pod_list.items) == 0:
self.logger.info("TaskInstance: {} found in queued state but was not launched, rescheduling".format(t))
self.log.info("TaskInstance: {} found in queued state but was not launched, rescheduling".format(t))
self._session.query(TaskInstance).filter(
TaskInstance.dag_id == t.dag_id,
TaskInstance.task_id == t.task_id,
@ -379,8 +364,37 @@ class KubernetesExecutor(BaseExecutor):
self._session.commit()
def _inject_secrets(self):
def _create_or_update_secret(secret_name, secret_path):
try:
return self.kube_client.create_namespaced_secret(
self.kube_config.executor_namespace, kubernetes.client.V1Secret(
data={'key.json' : base64.b64encode(open(secret_path, 'r').read())},
metadata=kubernetes.client.V1ObjectMeta(name=secret_name)))
except ApiException as e:
if e.status == 409:
return self.kube_client.replace_namespaced_secret(
secret_name, self.kube_config.executor_namespace,
kubernetes.client.V1Secret(
data={'key.json' : base64.b64encode(open(secret_path, 'r').read())},
metadata=kubernetes.client.V1ObjectMeta(name=secret_name)))
self.log.exception("Exception while trying to inject secret."
"Secret name: {}, error details: {}.".format(secret_name, e))
raise
# For each GCP service account key, inject it as a secret in executor
# namespace with the specific secret name configured in the airflow.cfg.
# We let exceptions to pass through to users.
if self.kube_config.gcp_service_account_keys:
name_path_pair_list = [
{'name' : account_spec.strip().split('=')[0],
'path' : account_spec.strip().split('=')[1]}
for account_spec in self.kube_config.gcp_service_account_keys.split(',')]
for service_account in name_path_pair_list:
_create_or_update_secret(service_account['name'], service_account['path'])
def start(self):
self.logger.info('k8s: starting kubernetes executor')
self.log.info('k8s: starting kubernetes executor')
self._session = settings.Session()
self.task_queue = Queue()
self.result_queue = Queue()
@ -388,15 +402,16 @@ class KubernetesExecutor(BaseExecutor):
self.kube_scheduler = AirflowKubernetesScheduler(
self.kube_config, self.task_queue, self.result_queue, self._session, self.kube_client
)
self._inject_secrets()
self.clear_not_launched_queued_tasks()
def execute_async(self, key, command, queue=None):
self.logger.info("k8s: adding task {} with command {}".format(key, command))
self.log.info("k8s: adding task {} with command {}".format(key, command))
self.task_queue.put((key, command))
def sync(self):
self.logger.info("self.running: {}".format(self.running))
self.logger.info("self.queued: {}".format(self.queued_tasks))
self.log.info("self.running: {}".format(self.running))
self.log.info("self.queued: {}".format(self.queued_tasks))
self.kube_scheduler.sync()
last_resource_version = None
@ -404,7 +419,7 @@ class KubernetesExecutor(BaseExecutor):
results = self.result_queue.get()
key, state, pod_id, resource_version = results
last_resource_version = resource_version
self.logger.info("Changing state of {}".format(results))
self.log.info("Changing state of {}".format(results))
self._change_state(key, state, pod_id)
KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session)
@ -434,7 +449,7 @@ class KubernetesExecutor(BaseExecutor):
self._session.commit()
def end(self):
self.logger.info('ending kube executor')
self.log.info('ending kube executor')
self.task_queue.join()
def execute_async(self, key, command, queue=None):

Просмотреть файл

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
from airflow.contrib.kubernetes.kubernetes_request_factory import KubernetesRequestFactory
import logging
class KubernetesResourceBuilder:
@ -31,7 +30,6 @@ class KubernetesResourceBuilder:
self.cmds = cmds
self.kub_req_factory = kub_req_factory
self.namespace = namespace
self.logger = logging.getLogger(self.__class__.__name__)
self.envs = {}
self.labels = {}
self.secrets = {}

Просмотреть файл

@ -13,6 +13,7 @@
# limitations under the License.
from abc import ABCMeta, abstractmethod
import six
class KubernetesRequestFactory():
"""
@ -69,8 +70,9 @@ class KubernetesRequestFactoryHelper(object):
@staticmethod
def extract_labels(pod, req):
for k in pod.labels.keys():
req['metadata']['labels'][k] = pod.labels[k]
req['metadata']['labels'] = req['metadata'].get('labels', {})
for k, v in six.iteritems(pod.labels):
req['metadata']['labels'][k] = v
@staticmethod
def extract_cmds(pod, req):
@ -114,7 +116,7 @@ class KubernetesRequestFactoryHelper(object):
})
@staticmethod
def extract_secrets(pod, req):
def extract_env_and_secrets(pod, req):
env_secrets = [s for s in pod.secrets if s.deploy_type == 'env']
if len(pod.envs) > 0 or len(env_secrets) > 0:
env = []
@ -123,3 +125,20 @@ class KubernetesRequestFactoryHelper(object):
for secret in env_secrets:
KubernetesRequestFactory.add_secret_to_env(env, secret)
req['spec']['containers'][0]['env'] = env
@staticmethod
def extract_init_containers(pod, req):
if pod.init_containers:
req['spec']['initContainers'] = pod.init_containers
@staticmethod
def extract_service_account_name(pod, req):
if pod.service_account_name:
req['spec']['serviceAccountName'] = pod.service_account_name
@staticmethod
def extract_image_pull_secrets(pod, req):
if pod.image_pull_secrets:
req['spec']['imagePullSecrets'] = [{
'name': pull_secret
} for pull_secret in pod.image_pull_secrets.split(',')]

Просмотреть файл

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
import yaml
import airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory as kreq
from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory
from airflow.contrib.kubernetes.pod import Pod
class SimplePodRequestFactory(kreq.KubernetesRequestFactory):
class SimplePodRequestFactory(KubernetesRequestFactory):
"""
Request generator for a simple pod.
"""
@ -41,16 +41,18 @@ spec:
def create(self, pod):
# type: (Pod) -> dict
req = yaml.load(self._yaml)
kreq.extract_name(pod, req)
kreq.extract_labels(pod, req)
kreq.extract_image(pod, req)
if pod.image_pull_policy:
kreq.extract_image_pull_policy(pod, req)
kreq.extract_cmds(pod, req)
kreq.extract_args(pod, req)
if len(pod.node_selectors) > 0:
self.extract_node_selector(pod, req)
self.extract_secrets(pod, req)
self.extract_name(pod, req)
self.extract_labels(pod, req)
self.extract_image(pod, req)
self.extract_image_pull_policy(pod, req)
self.extract_cmds(pod, req)
self.extract_args(pod, req)
self.extract_node_selector(pod, req)
self.extract_env_and_secrets(pod, req)
self.extract_volume_secrets(pod, req)
self.attach_volume_mounts(req=req, pod=pod)
self.attach_volumes(pod, req)
self.attach_volume_mounts(pod, req)
self.extract_service_account_name(pod, req)
self.extract_init_containers(pod, req)
self.extract_image_pull_secrets(pod, req)
return req

Просмотреть файл

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
class Pod:
"""
@ -43,7 +42,11 @@ class Pod:
name=None,
volumes = [],
namespace='default',
result=None):
result=None,
image_pull_policy="IfNotPresent",
image_pull_secrets=None,
init_containers=None,
service_account_name=None):
self.image = image
self.envs = envs if envs else {}
self.cmds = cmds
@ -54,5 +57,7 @@ class Pod:
self.volumes = volumes
self.node_selectors = node_selectors if node_selectors else []
self.namespace = namespace
self.logger = logging.getLogger(self.__class__.__name__)
self.image_pull_policy = image_pull_policy
self.image_pull_secrets = image_pull_secrets
self.init_containers = init_containers
self.service_account_name = service_account_name

Просмотреть файл

@ -11,28 +11,34 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from airflow.contrib.kubernetes.pod import Pod
from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from kubernetes import watch
from kubernetes.client import V1Pod
from airflow.utils.state import State
import json
import logging
from kubernetes.client.rest import ApiException
from .kube_client import get_kube_client
class PodLauncher:
class PodLauncher(LoggingMixin):
def __init__(self, kube_client=None):
self.kube_req_factory = SimplePodRequestFactory()
self._client = kube_client or get_kube_client()
self._watch = watch.Watch()
self.logger = logging.getLogger(__name__)
def run_pod_async(self, pod):
req = self.kube_req_factory.create(pod)
print(json.dumps(req))
resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace)
self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, indent=2)))
try:
resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace)
self.log.debug('Pod Creation Response: {}'.format(resp))
except ApiException:
self.log.exception('Exception when attempting to create Namespaced Pod.')
raise
return resp
def run_pod(self, pod):
@ -54,7 +60,7 @@ class PodLauncher:
def _task_status(self, event):
# type: (V1Pod) -> State
task = event['object']
self.logger.info(
self.log.info(
"Event: {} had an event of type {}".format(task.metadata.name,
event['type']))
status = self.process_status(task.metadata.name, task.status.phase)
@ -67,13 +73,13 @@ class PodLauncher:
if status == 'Pending':
return State.QUEUED
elif status == 'Failed':
self.logger.info("Event: {} Failed".format(job_id))
self.log.info("Event: {} Failed".format(job_id))
return State.FAILED
elif status == 'Succeeded':
self.logger.info("Event: {} Succeeded".format(job_id))
self.log.info("Event: {} Succeeded".format(job_id))
return State.SUCCESS
elif status == 'Running':
return State.RUNNING
else:
self.logger.info("Event: Invalid state {} on job {}".format(status, job_id))
self.log.info("Event: Invalid state {} on job {}".format(status, job_id))
return State.FAILED

Просмотреть файл

@ -1,30 +1,25 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# -*- coding: utf-8 -*-
#
# http://www.apache.org/licenses/LICENSE-2.0
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class Secret:
"""Defines Kubernetes Secret Volume"""
"""Defines Kubernetes Secret Containers"""
def __init__(self, deploy_type, deploy_target, secret, key):
"""Initialize a Kubernetes Secret Object. Used to track requested secrets from
the user.
"""Initialize a Kubernetes Secret Object. Used to track requested secrets from the user.
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or
`volume`
:param deploy_type: The type of secret deploy in Kubernetes, either `env` or `volume`
:type deploy_type: ``str``
:param deploy_target: The environment variable to be created in the worker.
:type deploy_target: ``str``

Просмотреть файл

@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import os
import six
from airflow.contrib.kubernetes.pod import Pod
from airflow.contrib.kubernetes.secret import Secret
class WorkerConfiguration:
"""Contains Kubernetes Airflow Worker configuration logic"""
def __init__(self, kube_config):
self.kube_config = kube_config
def _get_init_containers(self, volume_mounts):
"""When using git to retrieve the DAGs, use the GitSync Init Container"""
# If we're using volume claims to mount the dags, no init container is needed
if self.kube_config.dags_volume_claim:
return []
# Otherwise, define a git-sync init container
init_environment = [{
'name': 'GIT_SYNC_REPO',
'value': self.kube_config.git_repo
}, {
'name': 'GIT_SYNC_BRANCH',
'value': self.kube_config.git_branch
}, {
'name': 'GIT_SYNC_ROOT',
'value': '/tmp'
}, {
'name': 'GIT_SYNC_DEST',
'value': 'dags'
}, {
'name': 'GIT_SYNC_ONE_TIME',
'value': 'true'
}]
if self.kube_config.git_user:
init_environment.append({
'name': 'GIT_SYNC_USERNAME',
'value': self.kube_config.git_user
})
if self.kube_config.git_password:
init_environment.append({
'name': 'GIT_SYNC_PASSWORD',
'value': self.kube_config.git_password
})
volume_mounts[0]['readOnly'] = False
return [{
'name': self.kube_config.git_sync_init_container_name,
'image': self.kube_config.git_sync_container,
'securityContext': {'runAsUser': 0},
'env': init_environment,
'volumeMounts': volume_mounts
}]
def _get_volumes_and_mounts(self):
"""Determine volumes and volume mounts for Airflow workers"""
dags_volume_name = "airflow-dags"
dags_path = os.path.join(self.kube_config.dags_folder, self.kube_config.git_subpath)
volumes = [{
'name': dags_volume_name
}]
volume_mounts = [{
'name': dags_volume_name,
'mountPath': dags_path,
'readOnly': True
}]
# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = "airflow-config"
config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home)
volumes.append({
'name': config_volume_name,
'configMap': {
'name': self.kube_config.airflow_configmap
}
})
volume_mounts.append({
'name': config_volume_name,
'mountPath': config_path,
'subPath': 'airflow.cfg',
'readOnly': True
})
# A PV with the DAGs should be mounted
if self.kube_config.dags_volume_claim:
volumes[0]['persistentVolumeClaim'] = {"claimName": self.kube_config.dags_volume_claim}
if self.kube_config.dags_volume_subpath:
volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath
else:
# Create a Shared Volume for the Git-Sync module to populate
volumes[0]["emptyDir"] = {}
return volumes, volume_mounts
def _get_environment(self):
"""Defines any necessary environment variables for the pod executor"""
env = {
'AIRFLOW__CORE__DAGS_FOLDER': '/tmp/dags',
'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
}
if self.kube_config.airflow_configmap:
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home
return env
def _get_secrets(self):
"""Defines any necessary secrets for the pod executor"""
worker_secrets = []
for env_var_name, obj_key_pair in six.iteritems(self.kube_config.kube_secrets):
k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=')
worker_secrets.append(Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key))
return worker_secrets
def _get_image_pull_secrets(self):
"""Extracts any image pull secrets for fetching container(s)"""
if not self.kube_config.image_pull_secrets:
return []
return self.kube_config.image_pull_secrets.split(',')
def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command):
volumes, volume_mounts = self._get_volumes_and_mounts()
worker_init_container_spec = self._get_init_containers(copy.deepcopy(volume_mounts))
return Pod(
namespace=namespace,
name=pod_id,
image=self.kube_config.kube_image,
cmds=["bash", "-cx", "--"],
args=[airflow_command],
labels={
"airflow-slave": "",
"dag_id": dag_id,
"task_id": task_id,
"execution_date": execution_date
},
envs=self._get_environment(),
secrets=self._get_secrets(),
service_account_name=self.kube_config.worker_service_account_name,
image_pull_secrets=self.kube_config.image_pull_secrets,
init_containers=worker_init_container_spec,
volumes=volumes,
volume_mounts=volume_mounts
)

Просмотреть файл

@ -1587,7 +1587,9 @@ class SchedulerJob(BaseJob):
timeout = 5
self.log.info("Waiting up to %s seconds for processes to exit...", timeout)
try:
psutil.wait_procs(child_processes, timeout)
psutil.wait_procs(
child_processes, timeout=timeout,
callback=lambda x: self.log.info('Terminated PID %s', x.pid))
except psutil.TimeoutExpired:
self.log.debug("Ran out of time while waiting for processes to exit")
@ -1595,6 +1597,7 @@ class SchedulerJob(BaseJob):
child_processes = [x for x in this_process.children(recursive=True)
if x.is_running() and x.pid in pids_to_kill]
if len(child_processes) > 0:
self.log.info("SIGKILL processes that did not terminate gracefully")
for child in child_processes:
self.log.info("Killing child PID: %s", child.pid)
child.kill()

Просмотреть файл

@ -461,7 +461,7 @@ class DagFileProcessorManager(LoggingMixin):
def heartbeat(self):
"""
This should be periodically called by the scheduler. This method will
kick of new processes to process DAG definition files and read the
kick off new processes to process DAG definition files and read the
results from the finished processors.
:return: a list of SimpleDags that were produced by processors that

Просмотреть файл

@ -41,12 +41,6 @@ RUN pip install kubernetes && \
COPY airflow.tar.gz /tmp/airflow.tar.gz
RUN pip install /tmp/airflow.tar.gz
# prep airflow
ENV AIRFLOW_HOME=/root/airflow
ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
COPY bootstrap.sh /bootstrap.sh
RUN chmod +x /bootstrap.sh

Просмотреть файл

@ -52,29 +52,31 @@ spec:
metadata:
labels:
name: airflow
annotations:
pod.beta.kubernetes.io/init-containers: '[
{
"name": "init",
"image": "{{docker_image}}",
"imagePullPolicy": "IfNotPresent",
"command": [
"bash", "-cx", "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* $AIRFLOW_HOME/dags/ && airflow initdb && alembic upgrade head"
],
"env": [
{"name": "AIRFLOW__KUBERNETES__CONTAINER_IMAGE", "value": ""},
{"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM", "value": "airflow-dags"},
{"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH", "value": "git"}
],
"volumeMounts": [
{"name": "airflow-dags", "mountPath": "/root/airflow/dags"}
]
}
]'
spec:
initContainers:
- name: "init"
image: "{{docker_image}}:{{docker_tag}}"
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command:
- "bash"
args:
- "-cx"
- "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade head"
containers:
- name: web
image: {{docker_image}}
image: {{docker_image}}:{{docker_tag}}
imagePullPolicy: IfNotPresent
ports:
- name: web
@ -85,22 +87,15 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE
value: {{docker_image}}
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: "True"
# set these two confs
- name: AIRFLOW__KUBERNETES__GIT_REPO
value: https://github.com/grantnicholas/testdags.git
- name: AIRFLOW__KUBERNETES__GIT_BRANCH
value: master
# or this one
- name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
value: airflow-dags
#
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
readinessProbe:
@ -126,30 +121,24 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE
value: {{docker_image}}
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: "True"
# set these two confs
- name: AIRFLOW__KUBERNETES__GIT_REPO
value: https://github.com/grantnicholas/testdags.git
- name: AIRFLOW__KUBERNETES__GIT_BRANCH
value: master
# or set this one
- name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM
value: airflow-dags
#
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "60"
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-configmap
configMap:
name: airflow-configmap
---
apiVersion: v1
kind: Service
@ -162,4 +151,45 @@ spec:
nodePort: 30809
selector:
name: airflow
---
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
type: Opaque
data:
# The sql_alchemy_conn value is a base64 encoded represenation of this connection string:
# postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo=
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
data:
airflow.cfg: |
[core]
airflow_home = /root/airflow
dags_folder = /root/airflow/dags
base_log_folder = /root/airflow/logs
logging_level = INFO
executor = KubernetesExecutor
parallelism = 32
plugins_folder = /root/airflow/plugins
sql_alchemy_conn = $SQL_ALCHEMY_CONN
[scheduler]
dag_dir_list_interval = 60
child_process_log_directory = /root/airflow/logs/scheduler
[kubernetes]
airflow_configmap = airflow-configmap
worker_container_repository = {{docker_image}}
worker_container_tag = {{docker_tag}}
delete_worker_pods = True
git_repo = https://github.com/grantnicholas/testdags.git
git_branch = master
dags_volume_claim = airflow-dags
[kubernetes_secrets]
SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn

Просмотреть файл

@ -24,8 +24,7 @@ sudo mkdir -p /data/postgres-airflow
mkdir -p $DIRNAME/.generated
kubectl apply -f $DIRNAME/postgres.yaml
sed "s#{{docker_image}}#$IMAGE:$TAG#g" $DIRNAME/airflow.yaml.template > $DIRNAME/.generated/airflow.yaml && kubectl apply -f $DIRNAME/.generated/airflow.yaml
sed -e "s#{{docker_image}}#$IMAGE#g" -e "s#{{docker_tag}}#$TAG#g" $DIRNAME/airflow.yaml.template > $DIRNAME/.generated/airflow.yaml && kubectl apply -f $DIRNAME/.generated/airflow.yaml
# wait for up to 10 minutes for everything to be deployed
for i in {1..150}
@ -33,8 +32,8 @@ do
echo "------- Running kubectl get pods -------"
PODS=$(kubectl get pods | awk 'NR>1 {print $0}')
echo "$PODS"
NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l)
NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l)
NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l | xargs)
NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l | xargs)
if [ "$NUM_AIRFLOW_READY" == "1" ] && [ "$NUM_POSTGRES_READY" == "1" ]; then
break
fi