diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index 51f443b404..a76598601f 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -54,7 +54,7 @@ class PodLauncher(LoggingMixin): raise return resp - def run_pod(self, pod, startup_timeout=120): + def run_pod(self, pod, startup_timeout=120, get_logs=True): # type: (Pod) -> State """ Launches the pod synchronously and waits for completion. @@ -74,15 +74,25 @@ class PodLauncher(LoggingMixin): time.sleep(1) self.log.debug('Pod not yet started') - final_status = self._monitor_pod(pod) + final_status = self._monitor_pod(pod, get_logs) return final_status - def _monitor_pod(self, pod): + def _monitor_pod(self, pod, get_logs): # type: (Pod) -> State - while self.pod_is_running(pod): - self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING)) - time.sleep(2) + if get_logs: + logs = self._client.read_namespaced_pod_log( + name=pod.name, + namespace=pod.namespace, + follow=True, + tail_lines=10, + _preload_content=False) + for line in logs: + self.log.info(line) + else: + while self.pod_is_running(pod): + self.log.info("Pod {} has state {}".format(pod.name, State.RUNNING)) + time.sleep(2) return self._task_status(self.read_pod(pod)) def _task_status(self, event): diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 5d03875888..82dfa52ba5 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -43,12 +43,16 @@ class KubernetesPodOperator(BaseOperator): ) launcher = pod_launcher.PodLauncher(client) - final_state = launcher.run_pod(pod, self.startup_timeout_seconds) + final_state = launcher.run_pod( + pod, + startup_timeout=self.startup_timeout_seconds, + get_logs=self.get_logs) if final_state != State.SUCCESS: raise AirflowException("Pod returned a failure") except AirflowException as ex: raise AirflowException("Pod Launching failed: {error}".format(error=ex)) + @apply_defaults def __init__(self, namespace, @@ -60,6 +64,7 @@ class KubernetesPodOperator(BaseOperator): labels=None, startup_timeout_seconds=120, kube_executor_config=None, + get_logs=True, *args, **kwargs): super(KubernetesPodOperator, self).__init__(*args, **kwargs) @@ -72,3 +77,4 @@ class KubernetesPodOperator(BaseOperator): self.startup_timeout_seconds = startup_timeout_seconds self.name = name self.in_cluster = in_cluster + self.get_logs = get_logs diff --git a/airflow/example_dags/example_kubernetes_operator.py b/airflow/example_dags/example_kubernetes_operator.py new file mode 100644 index 0000000000..9b8632145c --- /dev/null +++ b/airflow/example_dags/example_kubernetes_operator.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import airflow +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +from airflow.models import DAG + +args = { + 'owner': 'airflow', + 'start_date': airflow.utils.dates.days_ago(2) +} + +dag = DAG( + dag_id='example_kubernetes_operator', + default_args=args, + schedule_interval=None) + +k = KubernetesPodOperator(namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="airflow-test-pod", + in_cluster=False, + task_id="task", + get_logs=True, + dag=dag + ) diff --git a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py index 4bbde8f93b..321f01f4f3 100644 --- a/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube_tests/test_kubernetes_pod_operator.py @@ -19,7 +19,8 @@ import unittest from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow import AirflowException from subprocess import check_call - +import mock +from airflow.contrib.kubernetes.pod_launcher import PodLauncher try: check_call(["kubectl", "get", "pods"]) @@ -31,7 +32,6 @@ except Exception as e: class KubernetesPodOperatorTest(unittest.TestCase): - def test_working_pod(self): k = KubernetesPodOperator(namespace='default', image="ubuntu:16.04", @@ -44,6 +44,20 @@ class KubernetesPodOperatorTest(unittest.TestCase): k.execute(None) + def test_logging(self): + with mock.patch.object(PodLauncher, 'log') as mock_logger: + k = KubernetesPodOperator(namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + get_logs=True + ) + k.execute(None) + mock_logger.info.assert_any_call("+ echo\n") + def test_faulty_image(self): bad_image_name = "foobar" k = KubernetesPodOperator(namespace='default', diff --git a/tests/core.py b/tests/core.py index f25d0e7ff2..b79c1ff5cd 100644 --- a/tests/core.py +++ b/tests/core.py @@ -66,7 +66,7 @@ from jinja2 import UndefinedError import six -NUM_EXAMPLE_DAGS = 18 +NUM_EXAMPLE_DAGS = 19 DEV_NULL = '/dev/null' TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags') diff --git a/tests/jobs.py b/tests/jobs.py index e522de54d6..aa78721292 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -156,6 +156,7 @@ class BackfillJobTest(unittest.TestCase): 'example_trigger_target_dag', 'example_trigger_controller_dag', # tested above 'test_utils', # sleeps forever + 'example_kubernetes_operator', # only works with k8s cluster ] logger = logging.getLogger('BackfillJobTest.test_backfill_examples')