Upgrade airflow WTMO to 1.10.12

This commit is contained in:
Harold Woo 2020-10-28 13:51:15 -07:00 коммит произвёл haroldwoo
Родитель eabe3b6038
Коммит e6456b0132
10 изменённых файлов: 60 добавлений и 676 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -6,7 +6,9 @@ venv
logs
unittests.cfg
airflow-webserver.pid
.config
.credentials
/dags/bigquery-etl-dags
/dags/bigquery-etl-dags/*

Просмотреть файл

@ -4,6 +4,23 @@ from operators.gcp_container_operator import GKEPodOperator
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
docs = """
### Clean GKE Pods
Built from cloudops-infra repo, projects/airflow/pod-clean
#### Purpose
This DAG executes a GKEPodOperator to clean out old completed pods
on the shared derived-datasets gke cluster. We need to do this periodically
because GCP has a 1500 object limit quota.
#### Owner
hwoo@mozilla.com
"""
default_args = {
'owner': 'hwoo@mozilla.com',
'depends_on_past': False,
@ -14,11 +31,9 @@ default_args = {
'retry_delay': timedelta(minutes=30),
}
dag = DAG("clean-gke-pods", default_args=default_args, schedule_interval="@daily")
# Built from cloudops-infra repo, projects/airflow/pod-clean
docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/gke-pod-clean:1.0'
dag = DAG("clean-gke-pods", default_args=default_args, schedule_interval="@daily", doc_md = docs)
docker_image='gcr.io/moz-fx-data-airflow-prod-88e0/gke-pod-clean:1.3'
gke_cluster_name='bq-load-gke-1'
gke_location='us-central1-a'
@ -35,4 +50,3 @@ clean_gke_pods = GKEPodOperator(
image=docker_image,
arguments=docker_args,
dag=dag)

Просмотреть файл

@ -1,5 +1,8 @@
### Using kube_client.py from 1.10.2
We include the airflow/contrib/kubernetes/kube_client.py from 1.10.2
We used to include the airflow/contrib/kubernetes/kube_client.py from 1.10.2
because the 1.10.7 kube_client.py has some configuration issues when
trying to push xcom from gkepodoperator. if do_push_xcom is set to False,
the upstream GkePodOperator works fine.
### As of 1.10.12 I've removed the backported 1.10.7 gcp_container_operator,
kubernetes_pod_operator, and the 1.10.2 kube_client

Просмотреть файл

@ -1,327 +0,0 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import subprocess
import tempfile
from google.auth.environment_vars import CREDENTIALS
from airflow import AirflowException
from airflow.contrib.hooks.gcp_container_hook import GKEClusterHook
# Modified to import KubernetesPodOperator which imports 1.10.2 kube_client
from .kubernetes_pod_operator_1_10_7 import KubernetesPodOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class GKEClusterDeleteOperator(BaseOperator):
"""
Deletes the cluster, including the Kubernetes endpoint and all worker nodes.
To delete a certain cluster, you must specify the ``project_id``, the ``name``
of the cluster, the ``location`` that the cluster is in, and the ``task_id``.
**Operator Creation**: ::
operator = GKEClusterDeleteOperator(
task_id='cluster_delete',
project_id='my-project',
location='cluster-location'
name='cluster-name')
.. seealso::
For more detail about deleting clusters have a look at the reference:
https://google-cloud-python.readthedocs.io/en/latest/container/gapic/v1/api.html#google.cloud.container_v1.ClusterManagerClient.delete_cluster
:param project_id: The Google Developers Console [project ID or project number]
:type project_id: str
:param name: The name of the resource to delete, in this case cluster name
:type name: str
:param location: The name of the Google Compute Engine zone in which the cluster
resides.
:type location: str
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: The api version to use
:type api_version: str
"""
template_fields = ['project_id', 'gcp_conn_id', 'name', 'location', 'api_version']
@apply_defaults
def __init__(self,
project_id,
name,
location,
gcp_conn_id='google_cloud_default',
api_version='v2',
*args,
**kwargs):
super(GKEClusterDeleteOperator, self).__init__(*args, **kwargs)
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.location = location
self.api_version = api_version
self.name = name
def _check_input(self):
if not all([self.project_id, self.name, self.location]):
self.log.error(
'One of (project_id, name, location) is missing or incorrect')
raise AirflowException('Operator has incorrect or missing input.')
def execute(self, context):
self._check_input()
hook = GKEClusterHook(gcp_conn_id=self.gcp_conn_id, location=self.location)
delete_result = hook.delete_cluster(name=self.name, project_id=self.project_id)
return delete_result
class GKEClusterCreateOperator(BaseOperator):
"""
Create a Google Kubernetes Engine Cluster of specified dimensions
The operator will wait until the cluster is created.
The **minimum** required to define a cluster to create is:
``dict()`` ::
cluster_def = {'name': 'my-cluster-name',
'initial_node_count': 1}
or
``Cluster`` proto ::
from google.cloud.container_v1.types import Cluster
cluster_def = Cluster(name='my-cluster-name', initial_node_count=1)
**Operator Creation**: ::
operator = GKEClusterCreateOperator(
task_id='cluster_create',
project_id='my-project',
location='my-location'
body=cluster_def)
.. seealso::
For more detail on about creating clusters have a look at the reference:
:class:`google.cloud.container_v1.types.Cluster`
:param project_id: The Google Developers Console [project ID or project number]
:type project_id: str
:param location: The name of the Google Compute Engine zone in which the cluster
resides.
:type location: str
:param body: The Cluster definition to create, can be protobuf or python dict, if
dict it must match protobuf message Cluster
:type body: dict or google.cloud.container_v1.types.Cluster
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: The api version to use
:type api_version: str
"""
template_fields = ['project_id', 'gcp_conn_id', 'location', 'api_version', 'body']
@apply_defaults
def __init__(self,
project_id,
location,
body=None,
gcp_conn_id='google_cloud_default',
api_version='v2',
*args,
**kwargs):
super(GKEClusterCreateOperator, self).__init__(*args, **kwargs)
if body is None:
body = {}
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.location = location
self.api_version = api_version
self.body = body
def _check_input(self):
if all([self.project_id, self.location, self.body]):
if isinstance(self.body, dict) \
and 'name' in self.body \
and 'initial_node_count' in self.body:
# Don't throw error
return
# If not dict, then must
elif self.body.name and self.body.initial_node_count:
return
self.log.error(
'One of (project_id, location, body, body[\'name\'], '
'body[\'initial_node_count\']) is missing or incorrect')
raise AirflowException('Operator has incorrect or missing input.')
def execute(self, context):
self._check_input()
hook = GKEClusterHook(gcp_conn_id=self.gcp_conn_id, location=self.location)
create_op = hook.create_cluster(cluster=self.body, project_id=self.project_id)
return create_op
KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
class GKEPodOperator(KubernetesPodOperator):
"""
Executes a task in a Kubernetes pod in the specified Google Kubernetes
Engine cluster
This Operator assumes that the system has gcloud installed and either
has working default application credentials or has configured a
connection id with a service account.
The **minimum** required to define a cluster to create are the variables
``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``,
``namespace``, and ``image``
**Operator Creation**: ::
operator = GKEPodOperator(task_id='pod_op',
project_id='my-project',
location='us-central1-a',
cluster_name='my-cluster-name',
name='task-name',
namespace='default',
image='perl')
.. seealso::
For more detail about application authentication have a look at the reference:
https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application
:param project_id: The Google Developers Console project id
:type project_id: str
:param location: The name of the Google Kubernetes Engine zone in which the
cluster resides, e.g. 'us-central1-a'
:type location: str
:param cluster_name: The name of the Google Kubernetes Engine cluster the pod
should be spawned in
:type cluster_name: str
:param gcp_conn_id: The google cloud connection id to use. This allows for
users to specify a service account.
:type gcp_conn_id: str
"""
template_fields = ('project_id', 'location',
'cluster_name') + KubernetesPodOperator.template_fields
@apply_defaults
def __init__(self,
project_id,
location,
cluster_name,
gcp_conn_id='google_cloud_default',
*args,
**kwargs):
super(GKEPodOperator, self).__init__(*args, **kwargs)
self.project_id = project_id
self.location = location
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
# Specifying a service account file allows the user to using non default
# authentication for creating a Kubernetes Pod. This is done by setting the
# environment variable `GOOGLE_APPLICATION_CREDENTIALS` that gcloud looks at.
key_file = None
# If gcp_conn_id is not specified gcloud will use the default
# service account credentials.
if self.gcp_conn_id:
from airflow.hooks.base_hook import BaseHook
# extras is a deserialized json object
extras = BaseHook.get_connection(self.gcp_conn_id).extra_dejson
# key_file only gets set if a json file is created from a JSON string in
# the web ui, else none
key_file = self._set_env_from_extras(extras=extras)
# Write config to a temp file and set the environment variable to point to it.
# This is to avoid race conditions of reading/writing a single file
with tempfile.NamedTemporaryFile() as conf_file:
os.environ[KUBE_CONFIG_ENV_VAR] = conf_file.name
# Attempt to get/update credentials
# We call gcloud directly instead of using google-cloud-python api
# because there is no way to write kubernetes config to a file, which is
# required by KubernetesPodOperator.
# The gcloud command looks at the env variable `KUBECONFIG` for where to save
# the kubernetes config file.
subprocess.check_call(
["gcloud", "container", "clusters", "get-credentials",
self.cluster_name,
"--zone", self.location,
"--project", self.project_id])
# Since the key file is of type mkstemp() closing the file will delete it from
# the file system so it cannot be accessed after we don't need it anymore
if key_file:
key_file.close()
# Tell `KubernetesPodOperator` where the config file is located
self.config_file = os.environ[KUBE_CONFIG_ENV_VAR]
super(GKEPodOperator, self).execute(context)
def _set_env_from_extras(self, extras):
"""
Sets the environment variable `GOOGLE_APPLICATION_CREDENTIALS` with either:
- The path to the keyfile from the specified connection id
- A generated file's path if the user specified JSON in the connection id. The
file is assumed to be deleted after the process dies due to how mkstemp()
works.
The environment variable is used inside the gcloud command to determine correct
service account to use.
"""
key_path = self._get_field(extras, 'key_path', False)
keyfile_json_str = self._get_field(extras, 'keyfile_dict', False)
if not key_path and not keyfile_json_str:
self.log.info('Using gcloud with application default credentials.')
elif key_path:
os.environ[CREDENTIALS] = key_path
return None
else:
# Write service account JSON to secure file for gcloud to reference
service_key = tempfile.NamedTemporaryFile(delete=False)
service_key.write(keyfile_json_str.encode('utf-8'))
os.environ[CREDENTIALS] = service_key.name
# Return file object to have a pointer to close after use,
# thus deleting from file system.
return service_key
def _get_field(self, extras, field, default=None):
"""
Fetches a field from extras, and returns it. This is some Airflow
magic. The google_cloud_platform hook type adds custom UI elements
to the hook page, which allow admins to specify service_account,
key_path, etc. They get formatted as shown below.
"""
long_f = 'extra__google_cloud_platform__{}'.format(field)
if long_f in extras:
return extras[long_f]
else:
self.log.info('Field %s not found in extras.', field)
return default

Просмотреть файл

@ -1,51 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.configuration import conf
from six import PY2
try:
from kubernetes import config, client
from kubernetes.client.rest import ApiException
has_kubernetes = True
except ImportError as e:
# We need an exception class to be able to use it in ``except`` elsewhere
# in the code base
ApiException = BaseException
has_kubernetes = False
_import_err = e
def _load_kube_config(in_cluster, cluster_context, config_file):
if not has_kubernetes:
raise _import_err
if in_cluster:
config.load_incluster_config()
else:
config.load_kube_config(config_file=config_file, context=cluster_context)
if PY2:
# For connect_get_namespaced_pod_exec
from kubernetes.client import Configuration
configuration = Configuration()
configuration.assert_hostname = False
Configuration.set_default(configuration)
return client.CoreV1Api()
def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'),
cluster_context=None,
config_file=None):
return _load_kube_config(in_cluster, cluster_context, config_file)

Просмотреть файл

@ -1,270 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""
import re
import warnings
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.kubernetes import pod_generator, pod_launcher
# import our own kube_client from 1.10.2. We also add pod name label to the pod.
from .kube_client_1_10_2 import get_kube_client
from airflow.contrib.kubernetes.pod import Resources
from airflow.utils.helpers import validate_key
from airflow.utils.state import State
from airflow.version import version as airflow_version
class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes
"""
Execute a task in a Kubernetes Pod
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories.
:type image: str
:param namespace: the namespace to run within kubernetes.
:type namespace: str
:param cmds: entrypoint of the container. (templated)
The docker images's entrypoint is used if this is not provided.
:type cmds: list[str]
:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.
:type arguments: list[str]
:param image_pull_policy: Specify a policy to cache or always pull an image.
:type image_pull_policy: str
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
:type image_pull_secrets: str
:param ports: ports for launched pod.
:type ports: list[airflow.contrib.kubernetes.pod.Port]
:param volume_mounts: volumeMounts for launched pod.
:type volume_mounts: list[airflow.contrib.kubernetes.volume_mount.VolumeMount]
:param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
:type volumes: list[airflow.contrib.kubernetes.volume.Volume]
:param labels: labels to apply to the Pod.
:type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:type startup_timeout_seconds: int
:param name: name of the pod in which the task will run, will be used to
generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
:type name: str
:param env_vars: Environment variables initialized in the container. (templated)
:type env_vars: dict
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume
:type secrets: list[airflow.contrib.kubernetes.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration.
:type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
:type cluster_context: str
:param get_logs: get the stdout of the container as logs of the tasks.
:type get_logs: bool
:param annotations: non-identifying metadata you can attach to the Pod.
Can be a large range of data, and can include characters
that are not permitted by labels.
:type annotations: dict
:param resources: A dict containing resources requests and limits.
Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
:type resources: dict
:param affinity: A dict containing a group of affinity scheduling rules.
:type affinity: dict
:param node_selectors: A dict containing a group of scheduling rules.
:type node_selectors: dict
:param config_file: The path to the Kubernetes config file. (templated)
:type config_file: str
:param do_xcom_push: If do_xcom_push is True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type do_xcom_push: bool
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted.
If False (default): do nothing, If True: delete the pod
:type is_delete_operator_pod: bool
:param hostnetwork: If True enable host networking on the pod.
:type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations.
:type tolerations: list tolerations
:param configmaps: A list of configmap names objects that we
want mount as env variables.
:type configmaps: list[str]
:param pod_runtime_info_envs: environment variables about
pod runtime information (ip, namespace, nodeName, podName).
:type pod_runtime_info_envs: list[PodRuntimeEnv]
:param security_context: security options the pod should run with (PodSecurityContext).
:type security_context: dict
:param dnspolicy: dnspolicy for the pod.
:type dnspolicy: str
"""
template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
@apply_defaults
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
image,
name,
cmds=None,
arguments=None,
ports=None,
volume_mounts=None,
volumes=None,
env_vars=None,
secrets=None,
in_cluster=True,
cluster_context=None,
labels=None,
startup_timeout_seconds=120,
get_logs=True,
image_pull_policy='IfNotPresent',
annotations=None,
resources=None,
affinity=None,
config_file=None,
node_selectors=None,
image_pull_secrets=None,
service_account_name='default',
is_delete_operator_pod=False,
hostnetwork=False,
tolerations=None,
configmaps=None,
security_context=None,
pod_runtime_info_envs=None,
dnspolicy=None,
*args,
**kwargs):
# https://github.com/apache/airflow/blob/2d0eff4ee4fafcf8c7978ac287a8fb968e56605f/UPDATING.md#unification-of-do_xcom_push-flag
if kwargs.get('xcom_push') is not None:
kwargs['do_xcom_push'] = kwargs.pop('xcom_push')
warnings.warn(
"`xcom_push` will be deprecated. Use `do_xcom_push` instead.",
DeprecationWarning, stacklevel=2
)
super(KubernetesPodOperator, self).__init__(*args, resources=None, **kwargs)
self.image = image
self.namespace = namespace
self.cmds = cmds or []
self.arguments = arguments or []
self.labels = labels or {}
self.startup_timeout_seconds = startup_timeout_seconds
self.name = self._set_name(name)
self.env_vars = env_vars or {}
self.ports = ports or []
self.volume_mounts = volume_mounts or []
self.volumes = volumes or []
self.secrets = secrets or []
self.in_cluster = in_cluster
self.cluster_context = cluster_context
self.get_logs = get_logs
self.image_pull_policy = image_pull_policy
self.node_selectors = node_selectors or {}
self.annotations = annotations or {}
self.affinity = affinity or {}
self.resources = self._set_resources(resources)
self.config_file = config_file
self.image_pull_secrets = image_pull_secrets
self.service_account_name = service_account_name
self.is_delete_operator_pod = is_delete_operator_pod
self.hostnetwork = hostnetwork
self.tolerations = tolerations or []
self.configmaps = configmaps or []
self.security_context = security_context or {}
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
def execute(self, context):
try:
# Moz specific
client = get_kube_client(in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file)
# Add Airflow Version to the label
# And a label to identify that pod is launched by KubernetesPodOperator
self.labels.update(
{
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_pod_operator': 'True',
'name': self.name,
}
)
gen = pod_generator.PodGenerator()
for port in self.ports:
gen.add_port(port)
for mount in self.volume_mounts:
gen.add_mount(mount)
for volume in self.volumes:
gen.add_volume(volume)
pod = gen.make_pod(
namespace=self.namespace,
image=self.image,
pod_id=self.name,
cmds=self.cmds,
arguments=self.arguments,
labels=self.labels,
)
pod.service_account_name = self.service_account_name
pod.secrets = self.secrets
pod.envs = self.env_vars
pod.image_pull_policy = self.image_pull_policy
pod.image_pull_secrets = self.image_pull_secrets
pod.annotations = self.annotations
pod.resources = self.resources
pod.affinity = self.affinity
pod.node_selectors = self.node_selectors
pod.hostnetwork = self.hostnetwork
pod.tolerations = self.tolerations
pod.configmaps = self.configmaps
pod.security_context = self.security_context
pod.pod_runtime_info_envs = self.pod_runtime_info_envs
pod.dnspolicy = self.dnspolicy
launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
try:
(final_state, result) = launcher.run_pod(
pod,
startup_timeout=self.startup_timeout_seconds,
get_logs=self.get_logs)
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)
if final_state != State.SUCCESS:
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state)
)
if self.do_xcom_push:
return result
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
def _set_resources(self, resources):
return Resources(**resources) if resources else Resources()
def _set_name(self, name):
validate_key(name, max_length=63)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())

Просмотреть файл

@ -8,9 +8,7 @@ from airflow import AirflowException
from airflow.contrib.hooks.gcp_container_hook import GKEClusterHook
# We import upstream GKEPodOperator/KubernetesPodOperator from 1.10.7, modified to point to kube_client
# from 1.10.2, because of some Xcom push breaking changes when using GKEPodOperator.
from .backport.gcp_container_operator_1_10_7 import GKEPodOperator as UpstreamGKEPodOperator
from airflow.contrib.operators.gcp_container_operator import GKEPodOperator as UpstreamGKEPodOperator
KUBE_CONFIG_ENV_VAR = "KUBECONFIG"
GCLOUD_APP_CRED = "CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE"
@ -28,15 +26,17 @@ class GKEPodOperator(UpstreamGKEPodOperator):
- Adjust when NamedTemporaryFile file descriptor is closed.
- Preserve XCOM result when xcom_push is True.
- Preserve XCOM result when do_xcom_push is True.
- Override init to default image_pull_policy=Always, in_cluster=False, xcom_push=False and GKE params
- Override init to default image_pull_policy=Always, in_cluster=False, do_xcom_push=False and GKE params
- set reattach_on_restart=False when do_xcom_push=True to address an error (details below)
"""
def __init__(self,
image_pull_policy='Always',
in_cluster=False,
xcom_push=False,
do_xcom_push=False,
# Defined in Airflow's UI -> Admin -> Connections
gcp_conn_id='google_cloud_derived_datasets',
project_id='moz-fx-data-derived-datasets',
@ -46,10 +46,19 @@ class GKEPodOperator(UpstreamGKEPodOperator):
*args,
**kwargs):
"""
Retrying a failed task with do_xcom_push=True causes airflow to reattach to the pod
eventually causing a 'Handshake status 500 Internal Server Error'. Logs will indicate
'found a running pod with ... different try_number. Will attach to this pod and monitor
instead of starting new one'
"""
reattach_on_restart = False if do_xcom_push else True
super(GKEPodOperator, self).__init__(
image_pull_policy=image_pull_policy,
in_cluster=in_cluster,
xcom_push=xcom_push,
do_xcom_push=do_xcom_push,
reattach_on_restart=reattach_on_restart,
gcp_conn_id=gcp_conn_id,
project_id=project_id,
location=location,
@ -94,7 +103,7 @@ class GKEPodOperator(UpstreamGKEPodOperator):
# Tell `KubernetesPodOperator` where the config file is located
self.config_file = os.environ[KUBE_CONFIG_ENV_VAR]
result = super(UpstreamGKEPodOperator, self).execute(context) # Moz specific
if self.xcom_push: # Moz specific
if self.do_xcom_push: # Moz specific
return result # Moz specific

Просмотреть файл

@ -714,7 +714,7 @@ def gke_command(
namespace=gke_namespace,
image=docker_image,
arguments=command,
xcom_push=xcom_push,
do_xcom_push=xcom_push,
env_vars=context_env_vars,
**kwargs
)

Просмотреть файл

@ -1,7 +1,8 @@
boto3
kombu==4.6.3 # CeleryExecutor issues with 1.10.2 supposedly fixed in 1.10.5 airflow, but still observed issues on 1.10.7
# removed hdfs
apache-airflow[celery,postgres,hive,jdbc,async,password,crypto,github_enterprise,datadog,statsd,s3,mysql,google_auth,gcp_api,kubernetes]==1.10.10
apache-airflow[celery,postgres,hive,jdbc,async,password,crypto,github_enterprise,datadog,statsd,s3,mysql,google_auth,gcp_api,kubernetes]==1.10.12
cryptography>=3.2
mozlogging
retrying
newrelic

Просмотреть файл

@ -6,7 +6,7 @@
#
alembic==1.4.2 # via apache-airflow
amqp==2.6.0 # via kombu
apache-airflow[async,celery,crypto,datadog,gcp_api,github_enterprise,google_auth,hive,jdbc,kubernetes,mysql,password,postgres,s3,statsd]==1.10.10 # via -r requirements.in
apache-airflow[async,celery,crypto,datadog,gcp_api,github_enterprise,google_auth,hive,jdbc,kubernetes,mysql,password,postgres,s3,statsd]==1.10.12 # via -r requirements.in
apispec[yaml]==1.3.3 # via flask-appbuilder
argcomplete==1.11.1 # via apache-airflow
attrs==19.3.0 # via apache-airflow, cattrs, jsonschema
@ -17,7 +17,7 @@ boto3==1.13.20 # via -r requirements.in, apache-airflow
botocore==1.16.20 # via boto3, s3transfer
cached-property==1.5.1 # via apache-airflow
cachetools==4.1.0 # via google-auth
cattrs==0.9.2 # via apache-airflow
cattrs==1.0.0 # via apache-airflow
celery==4.3.0 # via apache-airflow, flower
certifi==2020.4.5.1 # via kubernetes, requests
cffi==1.14.0 # via bcrypt, cryptography
@ -27,14 +27,14 @@ colorama==0.4.3 # via flask-appbuilder
colorlog==4.0.2 # via apache-airflow
configparser==3.5.3 # via apache-airflow
croniter==0.3.32 # via apache-airflow
cryptography==2.9.2 # via apache-airflow, pyopenssl
cryptography==3.2.1 # via -r requirements.in, apache-airflow, pyopenssl
datadog==0.36.0 # via apache-airflow
decorator==4.4.2 # via datadog
defusedxml==0.6.0 # via python3-openid
dill==0.3.1.1 # via apache-airflow
dnspython==1.16.0 # via email-validator, eventlet
docutils==0.15.2 # via botocore, python-daemon
email-validator==1.1.1 # via flask-appbuilder
email-validator==1.1.1 # via apache-airflow, flask-appbuilder
eventlet==0.25.2 # via apache-airflow
flask-admin==1.5.4 # via apache-airflow
flask-appbuilder==2.3.4 # via apache-airflow
@ -69,7 +69,7 @@ google-cloud-spanner==1.17.0 # via apache-airflow
google-cloud-speech==1.3.2 # via apache-airflow
google-cloud-storage==1.28.1 # via apache-airflow
google-cloud-texttospeech==1.0.1 # via apache-airflow
google-cloud-translate==2.0.1 # via apache-airflow
google-cloud-translate==1.7.0 # via apache-airflow
google-cloud-videointelligence==1.14.0 # via apache-airflow
google-cloud-vision==1.0.0 # via apache-airflow
google-resumable-media==0.5.0 # via google-cloud-bigquery, google-cloud-storage
@ -82,14 +82,14 @@ grpcio==1.29.0 # via google-api-core, googleapis-common-protos, grpc-
gunicorn==19.10.0 # via apache-airflow
hiredis==1.0.1 # via -r requirements.in
hmsclient==0.1.1 # via apache-airflow
httplib2==0.18.1 # via apache-airflow, google-api-python-client, google-auth-httplib2
httplib2==0.18.1 # via google-api-python-client, google-auth-httplib2
humanize==0.5.1 # via flower
idna==2.9 # via email-validator, requests
importlib-metadata==1.6.0 # via argcomplete, jsonschema
iso8601==0.1.12 # via apache-airflow
itsdangerous==1.1.0 # via flask, flask-wtf
jaydebeapi==1.2.1 # via apache-airflow
jinja2==2.10.3 # via apache-airflow, flask, flask-babel
jinja2==2.10.3 # via apache-airflow, flask, flask-babel, python-nvd3
jmespath==0.10.0 # via boto3, botocore
jpype1==0.7.1 # via -r requirements.in, apache-airflow, jaydebeapi
json-merge-patch==0.2 # via apache-airflow
@ -123,13 +123,15 @@ pyasn1==0.4.8 # via pyasn1-modules, rsa
pycparser==2.20 # via cffi
pydata-google-auth==1.1.0 # via pandas-gbq
pygments==2.6.1 # via apache-airflow
pyhive==0.6.2 # via apache-airflow
pyhive[hive]==0.6.2 # via apache-airflow
pyjwt==1.7.1 # via flask-appbuilder, flask-jwt-extended
pyopenssl==19.1.0 # via apache-airflow
pyrsistent==0.16.0 # via jsonschema
python-daemon==2.1.2 # via apache-airflow
python-dateutil==2.8.1 # via alembic, apache-airflow, botocore, croniter, flask-appbuilder, kubernetes, pandas, pendulum, pyhive
python-editor==1.0.4 # via alembic
python-nvd3==0.15.0 # via apache-airflow
python-slugify==4.0.1 # via apache-airflow, python-nvd3
python3-openid==3.1.0 # via flask-openid
pytz==2020.1 # via -r requirements.in, babel, celery, flask-babel, flower, google-api-core, pandas, tzlocal
pytzdata==2019.3 # via pendulum
@ -140,18 +142,19 @@ requests==2.23.0 # via -r requirements.in, apache-airflow, datadog, goo
retrying==1.3.3 # via -r requirements.in
rsa==4.0 # via google-auth
s3transfer==0.3.3 # via boto3
sasl==0.2.1 # via pyhive, thrift-sasl
setproctitle==1.1.10 # via apache-airflow
shelljob==0.5.6 # via -r requirements.in
six==1.15.0 # via bcrypt, cryptography, eventlet, flask-jwt-extended, google-api-core, google-api-python-client, google-auth, google-cloud-bigquery, google-resumable-media, grpcio, jsonschema, kubernetes, prison, protobuf, pyopenssl, pyrsistent, python-dateutil, retrying, sqlalchemy-utils, tenacity, thrift, websocket-client
six==1.15.0 # via bcrypt, cryptography, eventlet, flask-jwt-extended, google-api-core, google-api-python-client, google-auth, google-cloud-bigquery, google-resumable-media, grpcio, jsonschema, kubernetes, prison, protobuf, pyopenssl, pyrsistent, python-dateutil, retrying, sasl, sqlalchemy-utils, tenacity, thrift, thrift-sasl, websocket-client
sqlalchemy-jsonfield==0.9.0 # via apache-airflow
sqlalchemy-utils==0.36.6 # via flask-appbuilder
sqlalchemy==1.3.15 # via -r requirements.in, alembic, apache-airflow, flask-sqlalchemy, marshmallow-sqlalchemy, sqlalchemy-jsonfield, sqlalchemy-utils
statsd==3.3.0 # via apache-airflow
tabulate==0.8.7 # via apache-airflow
tenacity==4.12.0 # via apache-airflow
termcolor==1.1.0 # via apache-airflow
text-unidecode==1.2 # via apache-airflow
thrift==0.13.0 # via apache-airflow, hmsclient
text-unidecode==1.3 # via python-slugify
thrift-sasl==0.4.2 # via pyhive
thrift==0.13.0 # via apache-airflow, hmsclient, pyhive, thrift-sasl
tornado==5.1.1 # via apache-airflow, flower
typing-extensions==3.7.4.2 # via apache-airflow
tzlocal==1.5.1 # via apache-airflow, pendulum