diff --git a/.gitignore b/.gitignore index dffce06c..59b7212d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,9 @@ venv logs unittests.cfg +airflow-webserver.pid .config +.credentials /dags/bigquery-etl-dags /dags/bigquery-etl-dags/* diff --git a/dags/clean_gke_pods.py b/dags/clean_gke_pods.py index 27deb083..f166b9de 100644 --- a/dags/clean_gke_pods.py +++ b/dags/clean_gke_pods.py @@ -4,6 +4,23 @@ from operators.gcp_container_operator import GKEPodOperator from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +docs = """ +### Clean GKE Pods + +Built from cloudops-infra repo, projects/airflow/pod-clean + +#### Purpose + +This DAG executes a GKEPodOperator to clean out old completed pods +on the shared derived-datasets gke cluster. We need to do this periodically +because GCP has a 1500 object limit quota. + +#### Owner + +hwoo@mozilla.com +""" + + default_args = { 'owner': 'hwoo@mozilla.com', 'depends_on_past': False, @@ -14,11 +31,9 @@ default_args = { 'retry_delay': timedelta(minutes=30), } -dag = DAG("clean-gke-pods", default_args=default_args, schedule_interval="@daily") - -# Built from cloudops-infra repo, projects/airflow/pod-clean -docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/gke-pod-clean:1.0' +dag = DAG("clean-gke-pods", default_args=default_args, schedule_interval="@daily", doc_md = docs) +docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/gke-pod-clean:1.3' gke_cluster_name='bq-load-gke-1' gke_location='us-central1-a' @@ -35,4 +50,3 @@ clean_gke_pods = GKEPodOperator( image=docker_image, arguments=docker_args, dag=dag) - diff --git a/dags/operators/backport/README.md b/dags/operators/backport/README.md index 87c79b17..6fa5a321 100644 --- a/dags/operators/backport/README.md +++ b/dags/operators/backport/README.md @@ -1,5 +1,8 @@ ### Using kube_client.py from 1.10.2 -We include the airflow/contrib/kubernetes/kube_client.py from 1.10.2 +We used to include the airflow/contrib/kubernetes/kube_client.py from 1.10.2 because the 1.10.7 kube_client.py has some configuration issues when trying to push xcom from gkepodoperator. if do_push_xcom is set to False, the upstream GkePodOperator works fine. + +### As of 1.10.12 I've removed the backported 1.10.7 gcp_container_operator, +kubernetes_pod_operator, and the 1.10.2 kube_client diff --git a/dags/operators/backport/gcp_container_operator_1_10_7.py b/dags/operators/backport/gcp_container_operator_1_10_7.py deleted file mode 100644 index cd06aaa4..00000000 --- a/dags/operators/backport/gcp_container_operator_1_10_7.py +++ /dev/null @@ -1,327 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import os -import subprocess -import tempfile - -from google.auth.environment_vars import CREDENTIALS - -from airflow import AirflowException -from airflow.contrib.hooks.gcp_container_hook import GKEClusterHook - -# Modified to import KubernetesPodOperator which imports 1.10.2 kube_client -from .kubernetes_pod_operator_1_10_7 import KubernetesPodOperator - -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults - - -class GKEClusterDeleteOperator(BaseOperator): - """ - Deletes the cluster, including the Kubernetes endpoint and all worker nodes. - - To delete a certain cluster, you must specify the ``project_id``, the ``name`` - of the cluster, the ``location`` that the cluster is in, and the ``task_id``. - - **Operator Creation**: :: - - operator = GKEClusterDeleteOperator( - task_id='cluster_delete', - project_id='my-project', - location='cluster-location' - name='cluster-name') - - .. seealso:: - For more detail about deleting clusters have a look at the reference: - https://google-cloud-python.readthedocs.io/en/latest/container/gapic/v1/api.html#google.cloud.container_v1.ClusterManagerClient.delete_cluster - - :param project_id: The Google Developers Console [project ID or project number] - :type project_id: str - :param name: The name of the resource to delete, in this case cluster name - :type name: str - :param location: The name of the Google Compute Engine zone in which the cluster - resides. - :type location: str - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: str - :param api_version: The api version to use - :type api_version: str - """ - template_fields = ['project_id', 'gcp_conn_id', 'name', 'location', 'api_version'] - - @apply_defaults - def __init__(self, - project_id, - name, - location, - gcp_conn_id='google_cloud_default', - api_version='v2', - *args, - **kwargs): - super(GKEClusterDeleteOperator, self).__init__(*args, **kwargs) - - self.project_id = project_id - self.gcp_conn_id = gcp_conn_id - self.location = location - self.api_version = api_version - self.name = name - - def _check_input(self): - if not all([self.project_id, self.name, self.location]): - self.log.error( - 'One of (project_id, name, location) is missing or incorrect') - raise AirflowException('Operator has incorrect or missing input.') - - def execute(self, context): - self._check_input() - hook = GKEClusterHook(gcp_conn_id=self.gcp_conn_id, location=self.location) - delete_result = hook.delete_cluster(name=self.name, project_id=self.project_id) - return delete_result - - -class GKEClusterCreateOperator(BaseOperator): - """ - Create a Google Kubernetes Engine Cluster of specified dimensions - The operator will wait until the cluster is created. - - The **minimum** required to define a cluster to create is: - - ``dict()`` :: - cluster_def = {'name': 'my-cluster-name', - 'initial_node_count': 1} - - or - - ``Cluster`` proto :: - from google.cloud.container_v1.types import Cluster - - cluster_def = Cluster(name='my-cluster-name', initial_node_count=1) - - **Operator Creation**: :: - - operator = GKEClusterCreateOperator( - task_id='cluster_create', - project_id='my-project', - location='my-location' - body=cluster_def) - - .. seealso:: - For more detail on about creating clusters have a look at the reference: - :class:`google.cloud.container_v1.types.Cluster` - - :param project_id: The Google Developers Console [project ID or project number] - :type project_id: str - :param location: The name of the Google Compute Engine zone in which the cluster - resides. - :type location: str - :param body: The Cluster definition to create, can be protobuf or python dict, if - dict it must match protobuf message Cluster - :type body: dict or google.cloud.container_v1.types.Cluster - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: str - :param api_version: The api version to use - :type api_version: str - """ - template_fields = ['project_id', 'gcp_conn_id', 'location', 'api_version', 'body'] - - @apply_defaults - def __init__(self, - project_id, - location, - body=None, - gcp_conn_id='google_cloud_default', - api_version='v2', - *args, - **kwargs): - super(GKEClusterCreateOperator, self).__init__(*args, **kwargs) - - if body is None: - body = {} - self.project_id = project_id - self.gcp_conn_id = gcp_conn_id - self.location = location - self.api_version = api_version - self.body = body - - def _check_input(self): - if all([self.project_id, self.location, self.body]): - if isinstance(self.body, dict) \ - and 'name' in self.body \ - and 'initial_node_count' in self.body: - # Don't throw error - return - # If not dict, then must - elif self.body.name and self.body.initial_node_count: - return - - self.log.error( - 'One of (project_id, location, body, body[\'name\'], ' - 'body[\'initial_node_count\']) is missing or incorrect') - raise AirflowException('Operator has incorrect or missing input.') - - def execute(self, context): - self._check_input() - hook = GKEClusterHook(gcp_conn_id=self.gcp_conn_id, location=self.location) - create_op = hook.create_cluster(cluster=self.body, project_id=self.project_id) - return create_op - - -KUBE_CONFIG_ENV_VAR = "KUBECONFIG" - - -class GKEPodOperator(KubernetesPodOperator): - """ - Executes a task in a Kubernetes pod in the specified Google Kubernetes - Engine cluster - - This Operator assumes that the system has gcloud installed and either - has working default application credentials or has configured a - connection id with a service account. - - The **minimum** required to define a cluster to create are the variables - ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``, - ``namespace``, and ``image`` - - **Operator Creation**: :: - - operator = GKEPodOperator(task_id='pod_op', - project_id='my-project', - location='us-central1-a', - cluster_name='my-cluster-name', - name='task-name', - namespace='default', - image='perl') - - .. seealso:: - For more detail about application authentication have a look at the reference: - https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application - - :param project_id: The Google Developers Console project id - :type project_id: str - :param location: The name of the Google Kubernetes Engine zone in which the - cluster resides, e.g. 'us-central1-a' - :type location: str - :param cluster_name: The name of the Google Kubernetes Engine cluster the pod - should be spawned in - :type cluster_name: str - :param gcp_conn_id: The google cloud connection id to use. This allows for - users to specify a service account. - :type gcp_conn_id: str - """ - template_fields = ('project_id', 'location', - 'cluster_name') + KubernetesPodOperator.template_fields - - @apply_defaults - def __init__(self, - project_id, - location, - cluster_name, - gcp_conn_id='google_cloud_default', - *args, - **kwargs): - super(GKEPodOperator, self).__init__(*args, **kwargs) - self.project_id = project_id - self.location = location - self.cluster_name = cluster_name - self.gcp_conn_id = gcp_conn_id - - def execute(self, context): - # Specifying a service account file allows the user to using non default - # authentication for creating a Kubernetes Pod. This is done by setting the - # environment variable `GOOGLE_APPLICATION_CREDENTIALS` that gcloud looks at. - key_file = None - - # If gcp_conn_id is not specified gcloud will use the default - # service account credentials. - if self.gcp_conn_id: - from airflow.hooks.base_hook import BaseHook - # extras is a deserialized json object - extras = BaseHook.get_connection(self.gcp_conn_id).extra_dejson - # key_file only gets set if a json file is created from a JSON string in - # the web ui, else none - key_file = self._set_env_from_extras(extras=extras) - - # Write config to a temp file and set the environment variable to point to it. - # This is to avoid race conditions of reading/writing a single file - with tempfile.NamedTemporaryFile() as conf_file: - os.environ[KUBE_CONFIG_ENV_VAR] = conf_file.name - # Attempt to get/update credentials - # We call gcloud directly instead of using google-cloud-python api - # because there is no way to write kubernetes config to a file, which is - # required by KubernetesPodOperator. - # The gcloud command looks at the env variable `KUBECONFIG` for where to save - # the kubernetes config file. - subprocess.check_call( - ["gcloud", "container", "clusters", "get-credentials", - self.cluster_name, - "--zone", self.location, - "--project", self.project_id]) - - # Since the key file is of type mkstemp() closing the file will delete it from - # the file system so it cannot be accessed after we don't need it anymore - if key_file: - key_file.close() - - # Tell `KubernetesPodOperator` where the config file is located - self.config_file = os.environ[KUBE_CONFIG_ENV_VAR] - super(GKEPodOperator, self).execute(context) - - def _set_env_from_extras(self, extras): - """ - Sets the environment variable `GOOGLE_APPLICATION_CREDENTIALS` with either: - - - The path to the keyfile from the specified connection id - - A generated file's path if the user specified JSON in the connection id. The - file is assumed to be deleted after the process dies due to how mkstemp() - works. - - The environment variable is used inside the gcloud command to determine correct - service account to use. - """ - key_path = self._get_field(extras, 'key_path', False) - keyfile_json_str = self._get_field(extras, 'keyfile_dict', False) - - if not key_path and not keyfile_json_str: - self.log.info('Using gcloud with application default credentials.') - elif key_path: - os.environ[CREDENTIALS] = key_path - return None - else: - # Write service account JSON to secure file for gcloud to reference - service_key = tempfile.NamedTemporaryFile(delete=False) - service_key.write(keyfile_json_str.encode('utf-8')) - os.environ[CREDENTIALS] = service_key.name - # Return file object to have a pointer to close after use, - # thus deleting from file system. - return service_key - - def _get_field(self, extras, field, default=None): - """ - Fetches a field from extras, and returns it. This is some Airflow - magic. The google_cloud_platform hook type adds custom UI elements - to the hook page, which allow admins to specify service_account, - key_path, etc. They get formatted as shown below. - """ - long_f = 'extra__google_cloud_platform__{}'.format(field) - if long_f in extras: - return extras[long_f] - else: - self.log.info('Field %s not found in extras.', field) - return default diff --git a/dags/operators/backport/kube_client_1_10_2.py b/dags/operators/backport/kube_client_1_10_2.py deleted file mode 100644 index 4b8fa171..00000000 --- a/dags/operators/backport/kube_client_1_10_2.py +++ /dev/null @@ -1,51 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from airflow.configuration import conf -from six import PY2 - -try: - from kubernetes import config, client - from kubernetes.client.rest import ApiException - has_kubernetes = True -except ImportError as e: - # We need an exception class to be able to use it in ``except`` elsewhere - # in the code base - ApiException = BaseException - has_kubernetes = False - _import_err = e - - -def _load_kube_config(in_cluster, cluster_context, config_file): - if not has_kubernetes: - raise _import_err - if in_cluster: - config.load_incluster_config() - else: - config.load_kube_config(config_file=config_file, context=cluster_context) - if PY2: - # For connect_get_namespaced_pod_exec - from kubernetes.client import Configuration - configuration = Configuration() - configuration.assert_hostname = False - Configuration.set_default(configuration) - return client.CoreV1Api() - - -def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), - cluster_context=None, - config_file=None): - return _load_kube_config(in_cluster, cluster_context, config_file) diff --git a/dags/operators/backport/kubernetes_pod_operator_1_10_7.py b/dags/operators/backport/kubernetes_pod_operator_1_10_7.py deleted file mode 100644 index d0d17f8f..00000000 --- a/dags/operators/backport/kubernetes_pod_operator_1_10_7.py +++ /dev/null @@ -1,270 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Executes task in a Kubernetes POD""" -import re -import warnings - -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.contrib.kubernetes import pod_generator, pod_launcher - -# import our own kube_client from 1.10.2. We also add pod name label to the pod. -from .kube_client_1_10_2 import get_kube_client - -from airflow.contrib.kubernetes.pod import Resources -from airflow.utils.helpers import validate_key -from airflow.utils.state import State -from airflow.version import version as airflow_version - - -class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes - """ - Execute a task in a Kubernetes Pod - - :param image: Docker image you wish to launch. Defaults to hub.docker.com, - but fully qualified URLS will point to custom repositories. - :type image: str - :param namespace: the namespace to run within kubernetes. - :type namespace: str - :param cmds: entrypoint of the container. (templated) - The docker images's entrypoint is used if this is not provided. - :type cmds: list[str] - :param arguments: arguments of the entrypoint. (templated) - The docker image's CMD is used if this is not provided. - :type arguments: list[str] - :param image_pull_policy: Specify a policy to cache or always pull an image. - :type image_pull_policy: str - :param image_pull_secrets: Any image pull secrets to be given to the pod. - If more than one secret is required, provide a - comma separated list: secret_a,secret_b - :type image_pull_secrets: str - :param ports: ports for launched pod. - :type ports: list[airflow.contrib.kubernetes.pod.Port] - :param volume_mounts: volumeMounts for launched pod. - :type volume_mounts: list[airflow.contrib.kubernetes.volume_mount.VolumeMount] - :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes. - :type volumes: list[airflow.contrib.kubernetes.volume.Volume] - :param labels: labels to apply to the Pod. - :type labels: dict - :param startup_timeout_seconds: timeout in seconds to startup the pod. - :type startup_timeout_seconds: int - :param name: name of the pod in which the task will run, will be used to - generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]). - :type name: str - :param env_vars: Environment variables initialized in the container. (templated) - :type env_vars: dict - :param secrets: Kubernetes secrets to inject in the container. - They can be exposed as environment vars or files in a volume - :type secrets: list[airflow.contrib.kubernetes.secret.Secret] - :param in_cluster: run kubernetes client with in_cluster configuration. - :type in_cluster: bool - :param cluster_context: context that points to kubernetes cluster. - Ignored when in_cluster is True. If None, current-context is used. - :type cluster_context: str - :param get_logs: get the stdout of the container as logs of the tasks. - :type get_logs: bool - :param annotations: non-identifying metadata you can attach to the Pod. - Can be a large range of data, and can include characters - that are not permitted by labels. - :type annotations: dict - :param resources: A dict containing resources requests and limits. - Possible keys are request_memory, request_cpu, limit_memory, limit_cpu, - and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources. - See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container - :type resources: dict - :param affinity: A dict containing a group of affinity scheduling rules. - :type affinity: dict - :param node_selectors: A dict containing a group of scheduling rules. - :type node_selectors: dict - :param config_file: The path to the Kubernetes config file. (templated) - :type config_file: str - :param do_xcom_push: If do_xcom_push is True, the content of the file - /airflow/xcom/return.json in the container will also be pushed to an - XCom when the container completes. - :type do_xcom_push: bool - :param is_delete_operator_pod: What to do when the pod reaches its final - state, or the execution is interrupted. - If False (default): do nothing, If True: delete the pod - :type is_delete_operator_pod: bool - :param hostnetwork: If True enable host networking on the pod. - :type hostnetwork: bool - :param tolerations: A list of kubernetes tolerations. - :type tolerations: list tolerations - :param configmaps: A list of configmap names objects that we - want mount as env variables. - :type configmaps: list[str] - :param pod_runtime_info_envs: environment variables about - pod runtime information (ip, namespace, nodeName, podName). - :type pod_runtime_info_envs: list[PodRuntimeEnv] - :param security_context: security options the pod should run with (PodSecurityContext). - :type security_context: dict - :param dnspolicy: dnspolicy for the pod. - :type dnspolicy: str - """ - template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') - - @apply_defaults - def __init__(self, # pylint: disable=too-many-arguments,too-many-locals - namespace, - image, - name, - cmds=None, - arguments=None, - ports=None, - volume_mounts=None, - volumes=None, - env_vars=None, - secrets=None, - in_cluster=True, - cluster_context=None, - labels=None, - startup_timeout_seconds=120, - get_logs=True, - image_pull_policy='IfNotPresent', - annotations=None, - resources=None, - affinity=None, - config_file=None, - node_selectors=None, - image_pull_secrets=None, - service_account_name='default', - is_delete_operator_pod=False, - hostnetwork=False, - tolerations=None, - configmaps=None, - security_context=None, - pod_runtime_info_envs=None, - dnspolicy=None, - *args, - **kwargs): - # https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag - if kwargs.get('xcom_push') is not None: - kwargs['do_xcom_push'] = kwargs.pop('xcom_push') - warnings.warn( - "`xcom_push` will be deprecated. Use `do_xcom_push` instead.", - DeprecationWarning, stacklevel=2 - ) - super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs) - self.image = image - self.namespace = namespace - self.cmds = cmds or [] - self.arguments = arguments or [] - self.labels = labels or {} - self.startup_timeout_seconds = startup_timeout_seconds - self.name = self._set_name(name) - self.env_vars = env_vars or {} - self.ports = ports or [] - self.volume_mounts = volume_mounts or [] - self.volumes = volumes or [] - self.secrets = secrets or [] - self.in_cluster = in_cluster - self.cluster_context = cluster_context - self.get_logs = get_logs - self.image_pull_policy = image_pull_policy - self.node_selectors = node_selectors or {} - self.annotations = annotations or {} - self.affinity = affinity or {} - self.resources = self._set_resources(resources) - self.config_file = config_file - self.image_pull_secrets = image_pull_secrets - self.service_account_name = service_account_name - self.is_delete_operator_pod = is_delete_operator_pod - self.hostnetwork = hostnetwork - self.tolerations = tolerations or [] - self.configmaps = configmaps or [] - self.security_context = security_context or {} - self.pod_runtime_info_envs = pod_runtime_info_envs or [] - self.dnspolicy = dnspolicy - - def execute(self, context): - try: - # Moz specific - client = get_kube_client(in_cluster=self.in_cluster, - cluster_context=self.cluster_context, - config_file=self.config_file) - - # Add Airflow Version to the label - # And a label to identify that pod is launched by KubernetesPodOperator - self.labels.update( - { - 'airflow_version': airflow_version.replace('+', '-'), - 'kubernetes_pod_operator': 'True', - 'name': self.name, - } - ) - - gen = pod_generator.PodGenerator() - - for port in self.ports: - gen.add_port(port) - for mount in self.volume_mounts: - gen.add_mount(mount) - for volume in self.volumes: - gen.add_volume(volume) - - pod = gen.make_pod( - namespace=self.namespace, - image=self.image, - pod_id=self.name, - cmds=self.cmds, - arguments=self.arguments, - labels=self.labels, - ) - - pod.service_account_name = self.service_account_name - pod.secrets = self.secrets - pod.envs = self.env_vars - pod.image_pull_policy = self.image_pull_policy - pod.image_pull_secrets = self.image_pull_secrets - pod.annotations = self.annotations - pod.resources = self.resources - pod.affinity = self.affinity - pod.node_selectors = self.node_selectors - pod.hostnetwork = self.hostnetwork - pod.tolerations = self.tolerations - pod.configmaps = self.configmaps - pod.security_context = self.security_context - pod.pod_runtime_info_envs = self.pod_runtime_info_envs - pod.dnspolicy = self.dnspolicy - - launcher = pod_launcher.PodLauncher(kube_client=client, - extract_xcom=self.do_xcom_push) - try: - (final_state, result) = launcher.run_pod( - pod, - startup_timeout=self.startup_timeout_seconds, - get_logs=self.get_logs) - finally: - if self.is_delete_operator_pod: - launcher.delete_pod(pod) - - if final_state != State.SUCCESS: - raise AirflowException( - 'Pod returned a failure: {state}'.format(state=final_state) - ) - if self.do_xcom_push: - return result - except AirflowException as ex: - raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) - - def _set_resources(self, resources): - return Resources(**resources) if resources else Resources() - - def _set_name(self, name): - validate_key(name, max_length=63) - return re.sub(r'[^a-z0-9.-]+', '-', name.lower()) diff --git a/dags/operators/gcp_container_operator.py b/dags/operators/gcp_container_operator.py index 2300eba3..bec7d417 100644 --- a/dags/operators/gcp_container_operator.py +++ b/dags/operators/gcp_container_operator.py @@ -8,9 +8,7 @@ from airflow import AirflowException from airflow.contrib.hooks.gcp_container_hook import GKEClusterHook -# We import upstream GKEPodOperator/KubernetesPodOperator from 1.10.7, modified to point to kube_client -# from 1.10.2, because of some Xcom push breaking changes when using GKEPodOperator. -from .backport.gcp_container_operator_1_10_7 import GKEPodOperator as UpstreamGKEPodOperator +from airflow.contrib.operators.gcp_container_operator import GKEPodOperator as UpstreamGKEPodOperator KUBE_CONFIG_ENV_VAR = "KUBECONFIG" GCLOUD_APP_CRED = "CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE" @@ -28,15 +26,17 @@ class GKEPodOperator(UpstreamGKEPodOperator): - Adjust when NamedTemporaryFile file descriptor is closed. - - Preserve XCOM result when xcom_push is True. + - Preserve XCOM result when do_xcom_push is True. - - Override init to default image_pull_policy=Always, in_cluster=False, xcom_push=False and GKE params + - Override init to default image_pull_policy=Always, in_cluster=False, do_xcom_push=False and GKE params + + - set reattach_on_restart=False when do_xcom_push=True to address an error (details below) """ def __init__(self, image_pull_policy='Always', in_cluster=False, - xcom_push=False, + do_xcom_push=False, # Defined in Airflow's UI -> Admin -> Connections gcp_conn_id='google_cloud_derived_datasets', project_id='moz-fx-data-derived-datasets', @@ -46,10 +46,19 @@ class GKEPodOperator(UpstreamGKEPodOperator): *args, **kwargs): + """ + Retrying a failed task with do_xcom_push=True causes airflow to reattach to the pod + eventually causing a 'Handshake status 500 Internal Server Error'. Logs will indicate + 'found a running pod with ... different try_number. Will attach to this pod and monitor + instead of starting new one' + """ + reattach_on_restart = False if do_xcom_push else True + super(GKEPodOperator, self).__init__( image_pull_policy=image_pull_policy, in_cluster=in_cluster, - xcom_push=xcom_push, + do_xcom_push=do_xcom_push, + reattach_on_restart=reattach_on_restart, gcp_conn_id=gcp_conn_id, project_id=project_id, location=location, @@ -94,7 +103,7 @@ class GKEPodOperator(UpstreamGKEPodOperator): # Tell `KubernetesPodOperator` where the config file is located self.config_file = os.environ[KUBE_CONFIG_ENV_VAR] result = super(UpstreamGKEPodOperator, self).execute(context) # Moz specific - if self.xcom_push: # Moz specific + if self.do_xcom_push: # Moz specific return result # Moz specific diff --git a/dags/utils/gcp.py b/dags/utils/gcp.py index 24096714..488034b4 100644 --- a/dags/utils/gcp.py +++ b/dags/utils/gcp.py @@ -714,7 +714,7 @@ def gke_command( namespace=gke_namespace, image=docker_image, arguments=command, - xcom_push=xcom_push, + do_xcom_push=xcom_push, env_vars=context_env_vars, **kwargs ) diff --git a/requirements.in b/requirements.in index 67bda6e9..51427460 100644 --- a/requirements.in +++ b/requirements.in @@ -1,7 +1,8 @@ boto3 kombu==4.6.3 # CeleryExecutor issues with 1.10.2 supposedly fixed in 1.10.5 airflow, but still observed issues on 1.10.7 # removed hdfs -apache-airflow[celery,postgres,hive,jdbc,async,password,crypto,github_enterprise,datadog,statsd,s3,mysql,google_auth,gcp_api,kubernetes]==1.10.10 +apache-airflow[celery,postgres,hive,jdbc,async,password,crypto,github_enterprise,datadog,statsd,s3,mysql,google_auth,gcp_api,kubernetes]==1.10.12 +cryptography>=3.2 mozlogging retrying newrelic diff --git a/requirements.txt b/requirements.txt index 8d054784..8df22758 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ # alembic==1.4.2 # via apache-airflow amqp==2.6.0 # via kombu -apache-airflow[async,celery,crypto,datadog,gcp_api,github_enterprise,google_auth,hive,jdbc,kubernetes,mysql,password,postgres,s3,statsd]==1.10.10 # via -r requirements.in +apache-airflow[async,celery,crypto,datadog,gcp_api,github_enterprise,google_auth,hive,jdbc,kubernetes,mysql,password,postgres,s3,statsd]==1.10.12 # via -r requirements.in apispec[yaml]==1.3.3 # via flask-appbuilder argcomplete==1.11.1 # via apache-airflow attrs==19.3.0 # via apache-airflow, cattrs, jsonschema @@ -17,7 +17,7 @@ boto3==1.13.20 # via -r requirements.in, apache-airflow botocore==1.16.20 # via boto3, s3transfer cached-property==1.5.1 # via apache-airflow cachetools==4.1.0 # via google-auth -cattrs==0.9.2 # via apache-airflow +cattrs==1.0.0 # via apache-airflow celery==4.3.0 # via apache-airflow, flower certifi==2020.4.5.1 # via kubernetes, requests cffi==1.14.0 # via bcrypt, cryptography @@ -27,14 +27,14 @@ colorama==0.4.3 # via flask-appbuilder colorlog==4.0.2 # via apache-airflow configparser==3.5.3 # via apache-airflow croniter==0.3.32 # via apache-airflow -cryptography==2.9.2 # via apache-airflow, pyopenssl +cryptography==3.2.1 # via -r requirements.in, apache-airflow, pyopenssl datadog==0.36.0 # via apache-airflow decorator==4.4.2 # via datadog defusedxml==0.6.0 # via python3-openid dill==0.3.1.1 # via apache-airflow dnspython==1.16.0 # via email-validator, eventlet docutils==0.15.2 # via botocore, python-daemon -email-validator==1.1.1 # via flask-appbuilder +email-validator==1.1.1 # via apache-airflow, flask-appbuilder eventlet==0.25.2 # via apache-airflow flask-admin==1.5.4 # via apache-airflow flask-appbuilder==2.3.4 # via apache-airflow @@ -69,7 +69,7 @@ google-cloud-spanner==1.17.0 # via apache-airflow google-cloud-speech==1.3.2 # via apache-airflow google-cloud-storage==1.28.1 # via apache-airflow google-cloud-texttospeech==1.0.1 # via apache-airflow -google-cloud-translate==2.0.1 # via apache-airflow +google-cloud-translate==1.7.0 # via apache-airflow google-cloud-videointelligence==1.14.0 # via apache-airflow google-cloud-vision==1.0.0 # via apache-airflow google-resumable-media==0.5.0 # via google-cloud-bigquery, google-cloud-storage @@ -82,14 +82,14 @@ grpcio==1.29.0 # via google-api-core, googleapis-common-protos, grpc- gunicorn==19.10.0 # via apache-airflow hiredis==1.0.1 # via -r requirements.in hmsclient==0.1.1 # via apache-airflow -httplib2==0.18.1 # via apache-airflow, google-api-python-client, google-auth-httplib2 +httplib2==0.18.1 # via google-api-python-client, google-auth-httplib2 humanize==0.5.1 # via flower idna==2.9 # via email-validator, requests importlib-metadata==1.6.0 # via argcomplete, jsonschema iso8601==0.1.12 # via apache-airflow itsdangerous==1.1.0 # via flask, flask-wtf jaydebeapi==1.2.1 # via apache-airflow -jinja2==2.10.3 # via apache-airflow, flask, flask-babel +jinja2==2.10.3 # via apache-airflow, flask, flask-babel, python-nvd3 jmespath==0.10.0 # via boto3, botocore jpype1==0.7.1 # via -r requirements.in, apache-airflow, jaydebeapi json-merge-patch==0.2 # via apache-airflow @@ -123,13 +123,15 @@ pyasn1==0.4.8 # via pyasn1-modules, rsa pycparser==2.20 # via cffi pydata-google-auth==1.1.0 # via pandas-gbq pygments==2.6.1 # via apache-airflow -pyhive==0.6.2 # via apache-airflow +pyhive[hive]==0.6.2 # via apache-airflow pyjwt==1.7.1 # via flask-appbuilder, flask-jwt-extended pyopenssl==19.1.0 # via apache-airflow pyrsistent==0.16.0 # via jsonschema python-daemon==2.1.2 # via apache-airflow python-dateutil==2.8.1 # via alembic, apache-airflow, botocore, croniter, flask-appbuilder, kubernetes, pandas, pendulum, pyhive python-editor==1.0.4 # via alembic +python-nvd3==0.15.0 # via apache-airflow +python-slugify==4.0.1 # via apache-airflow, python-nvd3 python3-openid==3.1.0 # via flask-openid pytz==2020.1 # via -r requirements.in, babel, celery, flask-babel, flower, google-api-core, pandas, tzlocal pytzdata==2019.3 # via pendulum @@ -140,18 +142,19 @@ requests==2.23.0 # via -r requirements.in, apache-airflow, datadog, goo retrying==1.3.3 # via -r requirements.in rsa==4.0 # via google-auth s3transfer==0.3.3 # via boto3 +sasl==0.2.1 # via pyhive, thrift-sasl setproctitle==1.1.10 # via apache-airflow shelljob==0.5.6 # via -r requirements.in -six==1.15.0 # via bcrypt, cryptography, eventlet, flask-jwt-extended, google-api-core, google-api-python-client, google-auth, google-cloud-bigquery, google-resumable-media, grpcio, jsonschema, kubernetes, prison, protobuf, pyopenssl, pyrsistent, python-dateutil, retrying, sqlalchemy-utils, tenacity, thrift, websocket-client +six==1.15.0 # via bcrypt, cryptography, eventlet, flask-jwt-extended, google-api-core, google-api-python-client, google-auth, google-cloud-bigquery, google-resumable-media, grpcio, jsonschema, kubernetes, prison, protobuf, pyopenssl, pyrsistent, python-dateutil, retrying, sasl, sqlalchemy-utils, tenacity, thrift, thrift-sasl, websocket-client sqlalchemy-jsonfield==0.9.0 # via apache-airflow sqlalchemy-utils==0.36.6 # via flask-appbuilder sqlalchemy==1.3.15 # via -r requirements.in, alembic, apache-airflow, flask-sqlalchemy, marshmallow-sqlalchemy, sqlalchemy-jsonfield, sqlalchemy-utils statsd==3.3.0 # via apache-airflow tabulate==0.8.7 # via apache-airflow tenacity==4.12.0 # via apache-airflow -termcolor==1.1.0 # via apache-airflow -text-unidecode==1.2 # via apache-airflow -thrift==0.13.0 # via apache-airflow, hmsclient +text-unidecode==1.3 # via python-slugify +thrift-sasl==0.4.2 # via pyhive +thrift==0.13.0 # via apache-airflow, hmsclient, pyhive, thrift-sasl tornado==5.1.1 # via apache-airflow, flower typing-extensions==3.7.4.2 # via apache-airflow tzlocal==1.5.1 # via apache-airflow, pendulum