diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 400bcc0dfe..6a966cb032 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -5,9 +5,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -498,3 +498,62 @@ hide_sensitive_variable_fields = True [elasticsearch] elasticsearch_host = +[kubernetes] +# The repository and tag of the Kubernetes Image for the Worker to Run +worker_container_repository = +worker_container_tag = + +# If True (default), worker pods will be deleted upon termination +delete_worker_pods = True + +# The Kubernetes namespace where airflow workers should be created. Defaults to `default` +namespace = default + +# The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file) +airflow_configmap = + +# For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs +dags_volume_subpath = + +# For DAGs mounted via a volume claim (mutually exclusive with volume claim) +dags_volume_claim = + +# Git credentials and repository for DAGs mounted via Git (mutually exclusive with volume claim) +git_repo = +git_branch = +git_user = +git_password = + +# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync +git_sync_container_repository = gcr.io/google-containers/git-sync-amd64 +git_sync_container_tag = v2.0.5 +git_sync_init_container_name = git-sync-clone + +# The name of the Kubernetes service account to be associated with airflow workers, if any. +# Service accounts are required for workers that require access to secrets or cluster resources. +# See the Kubernetes RBAC documentation for more: +# https://kubernetes.io/docs/admin/authorization/rbac/ +worker_service_account_name = + +# Any image pull secrets to be given to worker pods, If more than one secret is +# required, provide a comma separated list: secret_a,secret_b +image_pull_secrets = + +# GCP Service Account Keys to be provided to tasks run on Kubernetes Executors +# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2 +gcp_service_account_keys = + +[kubernetes_secrets] +# The scheduler mounts the following secrets into your workers as they are launched by the +# scheduler. You may define as many secrets as needed and the kubernetes launcher will parse the +# defined secrets and mount them as secret environment variables in the launched workers. +# Secrets in this section are defined as follows +# = : +# +# For example if you wanted to mount a kubernetes secret key named `postgres_password` from the +# kubernetes secret object `airflow-secret` as the environment variable `POSTGRES_PASSWORD` into +# your workers you would follow the following format: +# POSTGRES_PASSWORD = airflow-secret:postgres_credentials +# +# Additionally you may override worker airflow settings with the AIRFLOW__
__ +# formatting as supported by airflow normally. diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 8989add004..9675e811f6 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import calendar -import logging +import base64 import os import multiprocessing +import six from queue import Queue from dateutil import parser from uuid import uuid4 @@ -23,13 +23,13 @@ from kubernetes import watch, client from kubernetes.client.rest import ApiException from airflow.contrib.kubernetes.pod_launcher import PodLauncher from airflow.contrib.kubernetes.kube_client import get_kube_client +from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration from airflow.executors.base_executor import BaseExecutor from airflow.models import TaskInstance, KubeResourceVersion from airflow.utils.state import State from airflow import configuration, settings from airflow.exceptions import AirflowConfigException -from airflow.contrib.kubernetes.pod import Pod - +from airflow.utils.log.logging_mixin import LoggingMixin class KubeConfig: core_section = "core" @@ -50,98 +50,84 @@ class KubeConfig: return default def __init__(self): + configuration_dict = configuration.as_dict(display_sensitive=True) + self.core_configuration = configuration_dict['core'] + self.kube_secrets = configuration_dict.get('kubernetes_secrets', {}) + self.airflow_home = configuration.get(self.core_section, 'airflow_home') self.dags_folder = configuration.get(self.core_section, 'dags_folder') self.parallelism = configuration.getint(self.core_section, 'PARALLELISM') - self.kube_image = configuration.get(self.kubernetes_section, 'container_image') + self.worker_container_repository = configuration.get( + self.kubernetes_section, 'worker_container_repository') + self.worker_container_tag = configuration.get( + self.kubernetes_section, 'worker_container_tag') + self.kube_image = '{}:{}'.format( + self.worker_container_repository, self.worker_container_tag) self.delete_worker_pods = self.safe_getboolean(self.kubernetes_section, 'delete_worker_pods', True) - self.kube_namespace = os.environ.get('AIRFLOW_KUBE_NAMESPACE', 'default') - # These two props must be set together + self.worker_service_account_name = self.safe_get( + self.kubernetes_section, 'worker_service_account_name', 'default') + self.image_pull_secrets = self.safe_get( + self.kubernetes_section, 'image_pull_secrets', '') + + # NOTE: `git_repo` and `git_branch` must be specified together as a pair + # The http URL of the git repository to clone from self.git_repo = self.safe_get(self.kubernetes_section, 'git_repo', None) + # The branch of the repository to be checked out self.git_branch = self.safe_get(self.kubernetes_section, 'git_branch', None) + # Optionally, the directory in the git repository containing the dags + self.git_subpath = self.safe_get(self.kubernetes_section, 'git_subpath', '') - # Or this one prop + # Optionally a user may supply a `git_user` and `git_password` for private repositories + self.git_user = self.safe_get(self.kubernetes_section, 'git_user', None) + self.git_password = self.safe_get(self.kubernetes_section, 'git_password', None) + + # NOTE: The user may optionally use a volume claim to mount a PV containing DAGs directly self.dags_volume_claim = self.safe_get(self.kubernetes_section, 'dags_volume_claim', None) - # And optionally this prop + + # This prop may optionally be set for PV Claims and is used to locate DAGs on a SubPath self.dags_volume_subpath = self.safe_get(self.kubernetes_section, 'dags_volume_subpath', None) + # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note that if your + # cluster has RBAC enabled, your scheduler may need service account permissions to + # create, watch, get, and delete pods in this namespace. + self.kube_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default') + # The Kubernetes Namespace in which pods will be created by the executor. Note that if your + # cluster has RBAC enabled, your workers may need service account permissions to + # interact with cluster components. + self.executor_namespace = self.safe_get(self.kubernetes_section, 'namespace', 'default') + # Task secrets managed by KubernetesExecutor. + self.gcp_service_account_keys = self.safe_get(self.kubernetes_section, 'gcp_service_account_keys', None) + + # If the user is using the git-sync container to clone their repository via git, + # allow them to specify repository, tag, and pod name for the init container. + self.git_sync_container_repository = self.safe_get( + self.kubernetes_section, 'git_sync_container_repository', + 'gcr.io/google-containers/git-sync-amd64') + + self.git_sync_container_tag = self.safe_get( + self.kubernetes_section, 'git_sync_container_tag', 'v2.0.5') + self.git_sync_container = '{}:{}'.format( + self.git_sync_container_repository, self.git_sync_container_tag) + + self.git_sync_init_container_name = self.safe_get( + self.kubernetes_section, 'git_sync_init_container_name', 'git-sync-clone') + + # The worker pod may optionally have a valid Airflow config loaded via a configmap + self.airflow_configmap = self.safe_get(self.kubernetes_section, 'airflow_configmap', None) + self._validate() def _validate(self): - if self.dags_volume_claim: - # do volume things - pass - elif self.git_repo and self.git_branch: - # do git things - pass - else: + if not self.dags_volume_claim and (not self.git_repo or not self.git_branch): raise AirflowConfigException( "In kubernetes mode you must set the following configs in the `kubernetes` section: " "`dags_volume_claim` or " - "`git_repo and git_branch`" + "`git_repo and git_branch` " ) -class PodMaker: - def __init__(self, kube_config): - self.logger = logging.getLogger(__name__) - self.kube_config = kube_config - - def _get_volumes_and_mounts(self): - volume_name = "airflow-dags" - - if self.kube_config.dags_volume_claim: - volumes = [{ - "name": volume_name, "persistentVolumeClaim": {"claimName": self.kube_config.dags_volume_claim} - }] - volume_mounts = [{ - "name": volume_name, "mountPath": self.kube_config.dags_folder, - "readOnly": True - }] - if self.kube_config.dags_volume_subpath: - volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath - - return volumes, volume_mounts - else: - return [], [] - - def _get_args(self, airflow_command): - if self.kube_config.dags_volume_claim: - self.logger.info("Using k8s_dags_volume_claim for airflow dags") - return [airflow_command] - else: - self.logger.info("Using git-syncher for airflow dags") - cmd_args = "mkdir -p {dags_folder} && cd {dags_folder} &&" \ - "git init && git remote add origin {git_repo} && git pull origin {git_branch} --depth=1 &&" \ - "{command}".format(dags_folder=self.kube_config.dags_folder, git_repo=self.kube_config.git_repo, - git_branch=self.kube_config.git_branch, command=airflow_command) - return [cmd_args] - - def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command): - volumes, volume_mounts = self._get_volumes_and_mounts() - - pod = Pod( - namespace=namespace, - name=pod_id, - image=self.kube_config.kube_image, - cmds=["bash", "-cx", "--"], - args=self._get_args(airflow_command), - labels={ - "airflow-slave": "", - "dag_id": dag_id, - "task_id": task_id, - "execution_date": execution_date - }, - envs={"AIRFLOW__CORE__EXECUTOR": "LocalExecutor"}, - volumes=volumes, - volume_mounts=volume_mounts - ) - return pod - - -class KubernetesJobWatcher(multiprocessing.Process, object): +class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): def __init__(self, namespace, watcher_queue, resource_version): - self.logger = logging.getLogger(__name__) multiprocessing.Process.__init__(self) self.namespace = namespace self.watcher_queue = watcher_queue @@ -153,14 +139,14 @@ class KubernetesJobWatcher(multiprocessing.Process, object): try: self.resource_version = self._run(kube_client, self.resource_version) except Exception: - self.logger.exception("Unknown error in KubernetesJobWatcher. Failing") + self.log.exception("Unknown error in KubernetesJobWatcher. Failing") raise else: - self.logger.warn("Watch died gracefully, starting back up with: " - "last resource_version: {}".format(self.resource_version)) + self.log.warn("Watch died gracefully, starting back up with: " + "last resource_version: {}".format(self.resource_version)) def _run(self, kube_client, resource_version): - self.logger.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version)) + self.log.info("Event: and now my watch begins starting at resource_version: {}".format(resource_version)) watcher = watch.Watch() kwargs = {"label_selector": "airflow-slave"} @@ -170,7 +156,7 @@ class KubernetesJobWatcher(multiprocessing.Process, object): last_resource_version = None for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs): task = event['object'] - self.logger.info("Event: {} had an event of type {}".format(task.metadata.name, event['type'])) + self.log.info("Event: {} had an event of type {}".format(task.metadata.name, event['type'])) self.process_status( task.metadata.name, task.status.phase, task.metadata.labels, task.metadata.resource_version ) @@ -180,32 +166,31 @@ class KubernetesJobWatcher(multiprocessing.Process, object): def process_status(self, pod_id, status, labels, resource_version): if status == 'Pending': - self.logger.info("Event: {} Pending".format(pod_id)) + self.log.info("Event: {} Pending".format(pod_id)) elif status == 'Failed': - self.logger.info("Event: {} Failed".format(pod_id)) + self.log.info("Event: {} Failed".format(pod_id)) self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version)) elif status == 'Succeeded': - self.logger.info("Event: {} Succeeded".format(pod_id)) + self.log.info("Event: {} Succeeded".format(pod_id)) self.watcher_queue.put((pod_id, None, labels, resource_version)) elif status == 'Running': - self.logger.info("Event: {} is Running".format(pod_id)) + self.log.info("Event: {} is Running".format(pod_id)) else: - self.logger.warn("Event: Invalid state: {} on pod: {} with labels: {} " + self.log.warn("Event: Invalid state: {} on pod: {} with labels: {} " "with resource_version: {}".format(status, pod_id, labels, resource_version)) -class AirflowKubernetesScheduler(object): +class AirflowKubernetesScheduler(LoggingMixin, object): def __init__(self, kube_config, task_queue, result_queue, session, kube_client): - self.logger = logging.getLogger(__name__) - self.logger.info("creating kubernetes executor") - self.kube_config = KubeConfig() + self.log.debug("creating kubernetes executor") + self.kube_config = kube_config self.task_queue = task_queue - self.pending_jobs = set() - self.namespace = os.environ['k8s_POD_NAMESPACE'] - self.logger.info("k8s: using namespace {}".format(self.namespace)) + self.result_queue = result_queue + self.namespace = self.kube_config.kube_namespace + self.log.debug("k8s: using namespace {}".format(self.namespace)) self.kube_client = kube_client self.launcher = PodLauncher(kube_client=self.kube_client) - self.pod_maker = PodMaker(kube_config=self.kube_config) + self.worker_configuration = WorkerConfiguration(kube_config=self.kube_config) self.watcher_queue = multiprocessing.Queue() self._session = session self.kube_watcher = self._make_kube_watcher() @@ -220,7 +205,7 @@ class AirflowKubernetesScheduler(object): if self.kube_watcher.is_alive(): pass else: - self.logger.error("Error while health checking kube watcher process. Process died for unknown reasons") + self.log.error("Error while health checking kube watcher process. Process died for unknown reasons") self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job): @@ -234,19 +219,19 @@ class AirflowKubernetesScheduler(object): :return: """ - self.logger.info('k8s: job is {}'.format(str(next_job))) + self.log.debug('k8s: job is {}'.format(str(next_job))) key, command = next_job dag_id, task_id, execution_date = key - self.logger.info("k8s: running for command {}".format(command)) - self.logger.info("k8s: launching image {}".format(self.kube_config.kube_image)) - pod = self.pod_maker.make_pod( + self.log.debug("k8s: running for command {}".format(command)) + self.log.debug("k8s: launching image {}".format(self.kube_config.kube_image)) + pod = self.worker_configuration.make_pod( namespace=self.namespace, pod_id=self._create_pod_id(dag_id, task_id), dag_id=dag_id, task_id=task_id, execution_date=self._datetime_to_label_safe_datestring(execution_date), airflow_command=command ) # the watcher will monitor pods, so we do not block. self.launcher.run_pod_async(pod) - self.logger.info("k8s: Job created!") + self.log.debug("k8s: Job created!") def delete_pod(self, pod_id): if self.kube_config.delete_worker_pods: @@ -272,10 +257,10 @@ class AirflowKubernetesScheduler(object): def process_watcher_task(self): pod_id, state, labels, resource_version = self.watcher_queue.get() - logging.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id, state, labels)) + self.log.info("Attempting to finish pod; pod_id: {}; state: {}; labels: {}".format(pod_id, state, labels)) key = self._labels_to_key(labels) if key: - self.logger.info("finishing job {}".format(key)) + self.log.debug("finishing job {} - {} ({})".format(key, state, pod_id)) self.result_queue.put((key, state, pod_id, resource_version)) @staticmethod @@ -336,13 +321,13 @@ class AirflowKubernetesScheduler(object): try: return labels["dag_id"], labels["task_id"], self._label_safe_datestring_to_datetime(labels["execution_date"]) except Exception as e: - self.logger.warn("Error while converting labels to key; labels: {}; exception: {}".format( + self.log.warn("Error while converting labels to key; labels: {}; exception: {}".format( labels, e )) return None -class KubernetesExecutor(BaseExecutor): +class KubernetesExecutor(BaseExecutor, LoggingMixin): def __init__(self): super(KubernetesExecutor, self).__init__(parallelism=PARALLELISM) self.task_queue = None @@ -362,7 +347,7 @@ class KubernetesExecutor(BaseExecutor): :return: None """ queued_tasks = self._session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all() - self.logger.info("When executor started up, found {} queued task instances".format(len(queued_tasks))) + self.log.info("When executor started up, found {} queued task instances".format(len(queued_tasks))) for t in queued_tasks: kwargs = dict(label_selector="dag_id={},task_id={},execution_date={}".format( @@ -370,7 +355,7 @@ class KubernetesExecutor(BaseExecutor): )) pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs) if len(pod_list.items) == 0: - self.logger.info("TaskInstance: {} found in queued state but was not launched, rescheduling".format(t)) + self.log.info("TaskInstance: {} found in queued state but was not launched, rescheduling".format(t)) self._session.query(TaskInstance).filter( TaskInstance.dag_id == t.dag_id, TaskInstance.task_id == t.task_id, @@ -379,8 +364,37 @@ class KubernetesExecutor(BaseExecutor): self._session.commit() + def _inject_secrets(self): + def _create_or_update_secret(secret_name, secret_path): + try: + return self.kube_client.create_namespaced_secret( + self.kube_config.executor_namespace, kubernetes.client.V1Secret( + data={'key.json' : base64.b64encode(open(secret_path, 'r').read())}, + metadata=kubernetes.client.V1ObjectMeta(name=secret_name))) + except ApiException as e: + if e.status == 409: + return self.kube_client.replace_namespaced_secret( + secret_name, self.kube_config.executor_namespace, + kubernetes.client.V1Secret( + data={'key.json' : base64.b64encode(open(secret_path, 'r').read())}, + metadata=kubernetes.client.V1ObjectMeta(name=secret_name))) + self.log.exception("Exception while trying to inject secret." + "Secret name: {}, error details: {}.".format(secret_name, e)) + raise + + # For each GCP service account key, inject it as a secret in executor + # namespace with the specific secret name configured in the airflow.cfg. + # We let exceptions to pass through to users. + if self.kube_config.gcp_service_account_keys: + name_path_pair_list = [ + {'name' : account_spec.strip().split('=')[0], + 'path' : account_spec.strip().split('=')[1]} + for account_spec in self.kube_config.gcp_service_account_keys.split(',')] + for service_account in name_path_pair_list: + _create_or_update_secret(service_account['name'], service_account['path']) + def start(self): - self.logger.info('k8s: starting kubernetes executor') + self.log.info('k8s: starting kubernetes executor') self._session = settings.Session() self.task_queue = Queue() self.result_queue = Queue() @@ -388,15 +402,16 @@ class KubernetesExecutor(BaseExecutor): self.kube_scheduler = AirflowKubernetesScheduler( self.kube_config, self.task_queue, self.result_queue, self._session, self.kube_client ) + self._inject_secrets() self.clear_not_launched_queued_tasks() def execute_async(self, key, command, queue=None): - self.logger.info("k8s: adding task {} with command {}".format(key, command)) + self.log.info("k8s: adding task {} with command {}".format(key, command)) self.task_queue.put((key, command)) def sync(self): - self.logger.info("self.running: {}".format(self.running)) - self.logger.info("self.queued: {}".format(self.queued_tasks)) + self.log.info("self.running: {}".format(self.running)) + self.log.info("self.queued: {}".format(self.queued_tasks)) self.kube_scheduler.sync() last_resource_version = None @@ -404,7 +419,7 @@ class KubernetesExecutor(BaseExecutor): results = self.result_queue.get() key, state, pod_id, resource_version = results last_resource_version = resource_version - self.logger.info("Changing state of {}".format(results)) + self.log.info("Changing state of {}".format(results)) self._change_state(key, state, pod_id) KubeResourceVersion.checkpoint_resource_version(last_resource_version, session=self._session) @@ -434,7 +449,7 @@ class KubernetesExecutor(BaseExecutor): self._session.commit() def end(self): - self.logger.info('ending kube executor') + self.log.info('ending kube executor') self.task_queue.join() def execute_async(self, key, command, queue=None): diff --git a/airflow/contrib/kubernetes/kubernetes_factory.py b/airflow/contrib/kubernetes/kubernetes_factory.py index 90b6f6cad0..715075ac58 100644 --- a/airflow/contrib/kubernetes/kubernetes_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_factory.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and from airflow.contrib.kubernetes.kubernetes_request_factory import KubernetesRequestFactory -import logging class KubernetesResourceBuilder: @@ -31,7 +30,6 @@ class KubernetesResourceBuilder: self.cmds = cmds self.kub_req_factory = kub_req_factory self.namespace = namespace - self.logger = logging.getLogger(self.__class__.__name__) self.envs = {} self.labels = {} self.secrets = {} diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 7ea9bc223b..9cfd77f112 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -13,6 +13,7 @@ # limitations under the License. from abc import ABCMeta, abstractmethod +import six class KubernetesRequestFactory(): """ @@ -69,8 +70,9 @@ class KubernetesRequestFactoryHelper(object): @staticmethod def extract_labels(pod, req): - for k in pod.labels.keys(): - req['metadata']['labels'][k] = pod.labels[k] + req['metadata']['labels'] = req['metadata'].get('labels', {}) + for k, v in six.iteritems(pod.labels): + req['metadata']['labels'][k] = v @staticmethod def extract_cmds(pod, req): @@ -114,7 +116,7 @@ class KubernetesRequestFactoryHelper(object): }) @staticmethod - def extract_secrets(pod, req): + def extract_env_and_secrets(pod, req): env_secrets = [s for s in pod.secrets if s.deploy_type == 'env'] if len(pod.envs) > 0 or len(env_secrets) > 0: env = [] @@ -123,3 +125,20 @@ class KubernetesRequestFactoryHelper(object): for secret in env_secrets: KubernetesRequestFactory.add_secret_to_env(env, secret) req['spec']['containers'][0]['env'] = env + + @staticmethod + def extract_init_containers(pod, req): + if pod.init_containers: + req['spec']['initContainers'] = pod.init_containers + + @staticmethod + def extract_service_account_name(pod, req): + if pod.service_account_name: + req['spec']['serviceAccountName'] = pod.service_account_name + + @staticmethod + def extract_image_pull_secrets(pod, req): + if pod.image_pull_secrets: + req['spec']['imagePullSecrets'] = [{ + 'name': pull_secret + } for pull_secret in pod.image_pull_secrets.split(',')] diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index 89631e0f77..dfa247fb84 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and import yaml -import airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory as kreq +from airflow.contrib.kubernetes.kubernetes_request_factory.kubernetes_request_factory import KubernetesRequestFactory from airflow.contrib.kubernetes.pod import Pod -class SimplePodRequestFactory(kreq.KubernetesRequestFactory): +class SimplePodRequestFactory(KubernetesRequestFactory): """ Request generator for a simple pod. """ @@ -41,16 +41,18 @@ spec: def create(self, pod): # type: (Pod) -> dict req = yaml.load(self._yaml) - kreq.extract_name(pod, req) - kreq.extract_labels(pod, req) - kreq.extract_image(pod, req) - if pod.image_pull_policy: - kreq.extract_image_pull_policy(pod, req) - kreq.extract_cmds(pod, req) - kreq.extract_args(pod, req) - if len(pod.node_selectors) > 0: - self.extract_node_selector(pod, req) - self.extract_secrets(pod, req) + self.extract_name(pod, req) + self.extract_labels(pod, req) + self.extract_image(pod, req) + self.extract_image_pull_policy(pod, req) + self.extract_cmds(pod, req) + self.extract_args(pod, req) + self.extract_node_selector(pod, req) + self.extract_env_and_secrets(pod, req) self.extract_volume_secrets(pod, req) - self.attach_volume_mounts(req=req, pod=pod) + self.attach_volumes(pod, req) + self.attach_volume_mounts(pod, req) + self.extract_service_account_name(pod, req) + self.extract_init_containers(pod, req) + self.extract_image_pull_secrets(pod, req) return req diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 0200afacf6..be99bbffb4 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging class Pod: """ @@ -43,7 +42,11 @@ class Pod: name=None, volumes = [], namespace='default', - result=None): + result=None, + image_pull_policy="IfNotPresent", + image_pull_secrets=None, + init_containers=None, + service_account_name=None): self.image = image self.envs = envs if envs else {} self.cmds = cmds @@ -54,5 +57,7 @@ class Pod: self.volumes = volumes self.node_selectors = node_selectors if node_selectors else [] self.namespace = namespace - self.logger = logging.getLogger(self.__class__.__name__) - + self.image_pull_policy = image_pull_policy + self.image_pull_secrets = image_pull_secrets + self.init_containers = init_containers + self.service_account_name = service_account_name diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index e435a12ae5..1fcdb1081d 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -11,28 +11,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import json + from airflow.contrib.kubernetes.pod import Pod from airflow.contrib.kubernetes.kubernetes_request_factory.pod_request_factory import SimplePodRequestFactory +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State from kubernetes import watch from kubernetes.client import V1Pod -from airflow.utils.state import State -import json -import logging +from kubernetes.client.rest import ApiException from .kube_client import get_kube_client -class PodLauncher: +class PodLauncher(LoggingMixin): def __init__(self, kube_client=None): self.kube_req_factory = SimplePodRequestFactory() self._client = kube_client or get_kube_client() self._watch = watch.Watch() - self.logger = logging.getLogger(__name__) def run_pod_async(self, pod): req = self.kube_req_factory.create(pod) - print(json.dumps(req)) - resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace) + self.log.debug('Pod Creation Request: \n{}'.format(json.dumps(req, indent=2))) + try: + resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace) + self.log.debug('Pod Creation Response: {}'.format(resp)) + except ApiException: + self.log.exception('Exception when attempting to create Namespaced Pod.') + raise return resp def run_pod(self, pod): @@ -54,7 +60,7 @@ class PodLauncher: def _task_status(self, event): # type: (V1Pod) -> State task = event['object'] - self.logger.info( + self.log.info( "Event: {} had an event of type {}".format(task.metadata.name, event['type'])) status = self.process_status(task.metadata.name, task.status.phase) @@ -67,13 +73,13 @@ class PodLauncher: if status == 'Pending': return State.QUEUED elif status == 'Failed': - self.logger.info("Event: {} Failed".format(job_id)) + self.log.info("Event: {} Failed".format(job_id)) return State.FAILED elif status == 'Succeeded': - self.logger.info("Event: {} Succeeded".format(job_id)) + self.log.info("Event: {} Succeeded".format(job_id)) return State.SUCCESS elif status == 'Running': return State.RUNNING else: - self.logger.info("Event: Invalid state {} on job {}".format(status, job_id)) + self.log.info("Event: Invalid state {} on job {}".format(status, job_id)) return State.FAILED diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index ec5d51cdb2..a798a24731 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -1,30 +1,25 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- # -# http://www.apache.org/licenses/LICENSE-2.0 +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. class Secret: - """Defines Kubernetes Secret Volume""" + """Defines Kubernetes Secret Containers""" def __init__(self, deploy_type, deploy_target, secret, key): - """Initialize a Kubernetes Secret Object. Used to track requested secrets from - the user. + """Initialize a Kubernetes Secret Object. Used to track requested secrets from the user. - :param deploy_type: The type of secret deploy in Kubernetes, either `env` or - `volume` + :param deploy_type: The type of secret deploy in Kubernetes, either `env` or `volume` :type deploy_type: ``str`` :param deploy_target: The environment variable to be created in the worker. :type deploy_target: ``str`` diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py new file mode 100644 index 0000000000..5e87941a6c --- /dev/null +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import os +import six + +from airflow.contrib.kubernetes.pod import Pod +from airflow.contrib.kubernetes.secret import Secret + + +class WorkerConfiguration: + """Contains Kubernetes Airflow Worker configuration logic""" + + def __init__(self, kube_config): + self.kube_config = kube_config + + def _get_init_containers(self, volume_mounts): + """When using git to retrieve the DAGs, use the GitSync Init Container""" + # If we're using volume claims to mount the dags, no init container is needed + if self.kube_config.dags_volume_claim: + return [] + + # Otherwise, define a git-sync init container + init_environment = [{ + 'name': 'GIT_SYNC_REPO', + 'value': self.kube_config.git_repo + }, { + 'name': 'GIT_SYNC_BRANCH', + 'value': self.kube_config.git_branch + }, { + 'name': 'GIT_SYNC_ROOT', + 'value': '/tmp' + }, { + 'name': 'GIT_SYNC_DEST', + 'value': 'dags' + }, { + 'name': 'GIT_SYNC_ONE_TIME', + 'value': 'true' + }] + if self.kube_config.git_user: + init_environment.append({ + 'name': 'GIT_SYNC_USERNAME', + 'value': self.kube_config.git_user + }) + if self.kube_config.git_password: + init_environment.append({ + 'name': 'GIT_SYNC_PASSWORD', + 'value': self.kube_config.git_password + }) + + volume_mounts[0]['readOnly'] = False + return [{ + 'name': self.kube_config.git_sync_init_container_name, + 'image': self.kube_config.git_sync_container, + 'securityContext': {'runAsUser': 0}, + 'env': init_environment, + 'volumeMounts': volume_mounts + }] + + def _get_volumes_and_mounts(self): + """Determine volumes and volume mounts for Airflow workers""" + dags_volume_name = "airflow-dags" + dags_path = os.path.join(self.kube_config.dags_folder, self.kube_config.git_subpath) + volumes = [{ + 'name': dags_volume_name + }] + volume_mounts = [{ + 'name': dags_volume_name, + 'mountPath': dags_path, + 'readOnly': True + }] + + # Mount the airflow.cfg file via a configmap the user has specified + if self.kube_config.airflow_configmap: + config_volume_name = "airflow-config" + config_path = '{}/airflow.cfg'.format(self.kube_config.airflow_home) + volumes.append({ + 'name': config_volume_name, + 'configMap': { + 'name': self.kube_config.airflow_configmap + } + }) + volume_mounts.append({ + 'name': config_volume_name, + 'mountPath': config_path, + 'subPath': 'airflow.cfg', + 'readOnly': True + }) + + # A PV with the DAGs should be mounted + if self.kube_config.dags_volume_claim: + volumes[0]['persistentVolumeClaim'] = {"claimName": self.kube_config.dags_volume_claim} + if self.kube_config.dags_volume_subpath: + volume_mounts[0]["subPath"] = self.kube_config.dags_volume_subpath + else: + # Create a Shared Volume for the Git-Sync module to populate + volumes[0]["emptyDir"] = {} + return volumes, volume_mounts + + def _get_environment(self): + """Defines any necessary environment variables for the pod executor""" + env = { + 'AIRFLOW__CORE__DAGS_FOLDER': '/tmp/dags', + 'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor' + } + if self.kube_config.airflow_configmap: + env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.kube_config.airflow_home + return env + + def _get_secrets(self): + """Defines any necessary secrets for the pod executor""" + worker_secrets = [] + for env_var_name, obj_key_pair in six.iteritems(self.kube_config.kube_secrets): + k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=') + worker_secrets.append(Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key)) + return worker_secrets + + def _get_image_pull_secrets(self): + """Extracts any image pull secrets for fetching container(s)""" + if not self.kube_config.image_pull_secrets: + return [] + return self.kube_config.image_pull_secrets.split(',') + + def make_pod(self, namespace, pod_id, dag_id, task_id, execution_date, airflow_command): + volumes, volume_mounts = self._get_volumes_and_mounts() + worker_init_container_spec = self._get_init_containers(copy.deepcopy(volume_mounts)) + return Pod( + namespace=namespace, + name=pod_id, + image=self.kube_config.kube_image, + cmds=["bash", "-cx", "--"], + args=[airflow_command], + labels={ + "airflow-slave": "", + "dag_id": dag_id, + "task_id": task_id, + "execution_date": execution_date + }, + envs=self._get_environment(), + secrets=self._get_secrets(), + service_account_name=self.kube_config.worker_service_account_name, + image_pull_secrets=self.kube_config.image_pull_secrets, + init_containers=worker_init_container_spec, + volumes=volumes, + volume_mounts=volume_mounts + ) diff --git a/airflow/jobs.py b/airflow/jobs.py index 9754415857..fe76fbdbf4 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1587,7 +1587,9 @@ class SchedulerJob(BaseJob): timeout = 5 self.log.info("Waiting up to %s seconds for processes to exit...", timeout) try: - psutil.wait_procs(child_processes, timeout) + psutil.wait_procs( + child_processes, timeout=timeout, + callback=lambda x: self.log.info('Terminated PID %s', x.pid)) except psutil.TimeoutExpired: self.log.debug("Ran out of time while waiting for processes to exit") @@ -1595,6 +1597,7 @@ class SchedulerJob(BaseJob): child_processes = [x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill] if len(child_processes) > 0: + self.log.info("SIGKILL processes that did not terminate gracefully") for child in child_processes: self.log.info("Killing child PID: %s", child.pid) child.kill() diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index f75b75674f..543eb41692 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -461,7 +461,7 @@ class DagFileProcessorManager(LoggingMixin): def heartbeat(self): """ This should be periodically called by the scheduler. This method will - kick of new processes to process DAG definition files and read the + kick off new processes to process DAG definition files and read the results from the finished processors. :return: a list of SimpleDags that were produced by processors that diff --git a/scripts/ci/kubernetes/docker/Dockerfile b/scripts/ci/kubernetes/docker/Dockerfile index a3b05b0ca6..b1bc493dff 100644 --- a/scripts/ci/kubernetes/docker/Dockerfile +++ b/scripts/ci/kubernetes/docker/Dockerfile @@ -41,12 +41,6 @@ RUN pip install kubernetes && \ COPY airflow.tar.gz /tmp/airflow.tar.gz RUN pip install /tmp/airflow.tar.gz -# prep airflow -ENV AIRFLOW_HOME=/root/airflow -ENV AIRFLOW__CORE__EXECUTOR=KubernetesExecutor -ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://root:root@postgres-airflow:5432/airflow - - COPY bootstrap.sh /bootstrap.sh RUN chmod +x /bootstrap.sh diff --git a/scripts/ci/kubernetes/kube/airflow.yaml.template b/scripts/ci/kubernetes/kube/airflow.yaml.template index a297b95649..af54175b25 100644 --- a/scripts/ci/kubernetes/kube/airflow.yaml.template +++ b/scripts/ci/kubernetes/kube/airflow.yaml.template @@ -52,29 +52,31 @@ spec: metadata: labels: name: airflow - annotations: - pod.beta.kubernetes.io/init-containers: '[ - { - "name": "init", - "image": "{{docker_image}}", - "imagePullPolicy": "IfNotPresent", - "command": [ - "bash", "-cx", "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* $AIRFLOW_HOME/dags/ && airflow initdb && alembic upgrade head" - ], - "env": [ - {"name": "AIRFLOW__KUBERNETES__CONTAINER_IMAGE", "value": ""}, - {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM", "value": "airflow-dags"}, - {"name": "AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH", "value": "git"} - ], - "volumeMounts": [ - {"name": "airflow-dags", "mountPath": "/root/airflow/dags"} - ] - } - ]' spec: + initContainers: + - name: "init" + image: "{{docker_image}}:{{docker_tag}}" + imagePullPolicy: "IfNotPresent" + volumeMounts: + - name: airflow-configmap + mountPath: /root/airflow/airflow.cfg + subPath: airflow.cfg + - name: airflow-dags + mountPath: /root/airflow/dags + env: + - name: SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: airflow-secrets + key: sql_alchemy_conn + command: + - "bash" + args: + - "-cx" + - "cd /usr/local/lib/python2.7/dist-packages/airflow && cp -R example_dags/* /root/airflow/dags/ && airflow initdb && alembic upgrade head" containers: - name: web - image: {{docker_image}} + image: {{docker_image}}:{{docker_tag}} imagePullPolicy: IfNotPresent ports: - name: web @@ -85,22 +87,15 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace - - name: AIRFLOW__CORE__EXECUTOR - value: KubernetesExecutor - - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE - value: {{docker_image}} - - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS - value: "True" - # set these two confs - - name: AIRFLOW__KUBERNETES__GIT_REPO - value: https://github.com/grantnicholas/testdags.git - - name: AIRFLOW__KUBERNETES__GIT_BRANCH - value: master - # or this one - - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM - value: airflow-dags - # + - name: SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: airflow-secrets + key: sql_alchemy_conn volumeMounts: + - name: airflow-configmap + mountPath: /root/airflow/airflow.cfg + subPath: airflow.cfg - name: airflow-dags mountPath: /root/airflow/dags readinessProbe: @@ -126,30 +121,24 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace - - name: AIRFLOW__CORE__EXECUTOR - value: KubernetesExecutor - - name: AIRFLOW__KUBERNETES__CONTAINER_IMAGE - value: {{docker_image}} - - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS - value: "True" - # set these two confs - - name: AIRFLOW__KUBERNETES__GIT_REPO - value: https://github.com/grantnicholas/testdags.git - - name: AIRFLOW__KUBERNETES__GIT_BRANCH - value: master - # or set this one - - name: AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM - value: airflow-dags - # - - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL - value: "60" + - name: SQL_ALCHEMY_CONN + valueFrom: + secretKeyRef: + name: airflow-secrets + key: sql_alchemy_conn volumeMounts: + - name: airflow-configmap + mountPath: /root/airflow/airflow.cfg + subPath: airflow.cfg - name: airflow-dags mountPath: /root/airflow/dags volumes: - name: airflow-dags persistentVolumeClaim: claimName: airflow-dags + - name: airflow-configmap + configMap: + name: airflow-configmap --- apiVersion: v1 kind: Service @@ -162,4 +151,45 @@ spec: nodePort: 30809 selector: name: airflow +--- +apiVersion: v1 +kind: Secret +metadata: + name: airflow-secrets +type: Opaque +data: + # The sql_alchemy_conn value is a base64 encoded represenation of this connection string: + # postgresql+psycopg2://root:root@postgres-airflow:5432/airflow + sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6cm9vdEBwb3N0Z3Jlcy1haXJmbG93OjU0MzIvYWlyZmxvdwo= +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: airflow-configmap +data: + airflow.cfg: | + [core] + airflow_home = /root/airflow + dags_folder = /root/airflow/dags + base_log_folder = /root/airflow/logs + logging_level = INFO + executor = KubernetesExecutor + parallelism = 32 + plugins_folder = /root/airflow/plugins + sql_alchemy_conn = $SQL_ALCHEMY_CONN + [scheduler] + dag_dir_list_interval = 60 + child_process_log_directory = /root/airflow/logs/scheduler + + [kubernetes] + airflow_configmap = airflow-configmap + worker_container_repository = {{docker_image}} + worker_container_tag = {{docker_tag}} + delete_worker_pods = True + git_repo = https://github.com/grantnicholas/testdags.git + git_branch = master + dags_volume_claim = airflow-dags + + [kubernetes_secrets] + SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn diff --git a/scripts/ci/kubernetes/kube/deploy.sh b/scripts/ci/kubernetes/kube/deploy.sh index 2532d831d9..953f50fd78 100755 --- a/scripts/ci/kubernetes/kube/deploy.sh +++ b/scripts/ci/kubernetes/kube/deploy.sh @@ -24,8 +24,7 @@ sudo mkdir -p /data/postgres-airflow mkdir -p $DIRNAME/.generated kubectl apply -f $DIRNAME/postgres.yaml -sed "s#{{docker_image}}#$IMAGE:$TAG#g" $DIRNAME/airflow.yaml.template > $DIRNAME/.generated/airflow.yaml && kubectl apply -f $DIRNAME/.generated/airflow.yaml - +sed -e "s#{{docker_image}}#$IMAGE#g" -e "s#{{docker_tag}}#$TAG#g" $DIRNAME/airflow.yaml.template > $DIRNAME/.generated/airflow.yaml && kubectl apply -f $DIRNAME/.generated/airflow.yaml # wait for up to 10 minutes for everything to be deployed for i in {1..150} @@ -33,8 +32,8 @@ do echo "------- Running kubectl get pods -------" PODS=$(kubectl get pods | awk 'NR>1 {print $0}') echo "$PODS" - NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l) - NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l) + NUM_AIRFLOW_READY=$(echo $PODS | grep airflow | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l | xargs) + NUM_POSTGRES_READY=$(echo $PODS | grep postgres | awk '{print $2}' | grep -E '([0-9])\/(\1)' | wc -l | xargs) if [ "$NUM_AIRFLOW_READY" == "1" ] && [ "$NUM_POSTGRES_READY" == "1" ]; then break fi